use super::WasccHost;
use crate::Result;
use data_encoding::HEXUPPER;
use ring::digest::{Context, Digest, SHA256};
use crate::bus;
use crate::bus::MessageBus;
use crate::BindingsList;
use crate::{authz, errors, Actor, NativeCapability};
use errors::ErrorKind;
use std::{
collections::HashMap,
io::Read,
sync::{Arc, RwLock},
};
use uuid::Uuid;
use wapc::prelude::*;
use wascap::{jwt::Claims, prelude::KeyPair};
use wascc_codec::{
capabilities::{CapabilityDescriptor, OP_GET_CAPABILITY_DESCRIPTOR},
core::{CapabilityConfiguration, OP_PERFORM_LIVE_UPDATE, OP_REMOVE_ACTOR},
deserialize, serialize, SYSTEM_ACTOR,
};
pub(crate) fn unsub_all_bindings(
bindings: Arc<RwLock<BindingsList>>,
bus: Arc<MessageBus>,
capid: &str,
) {
bindings
.read()
.unwrap()
.keys()
.filter(|(_a, c, _b)| c == capid)
.for_each(|(a, c, b)| {
let _ = bus.unsubscribe(&bus::provider_subject_bound_actor(c, b, a));
});
}
impl WasccHost {
pub(crate) fn record_binding(
&self,
actor: &str,
capid: &str,
binding: &str,
config: &CapabilityConfiguration,
) -> Result<()> {
let mut lock = self.bindings.write().unwrap();
lock.insert(
(actor.to_string(), capid.to_string(), binding.to_string()),
config.clone(),
);
trace!(
"Actor {} successfully bound to {},{}",
actor,
binding,
capid
);
Ok(())
}
pub(crate) fn ensure_extras(&self) -> Result<()> {
self.add_native_capability(NativeCapability::from_instance(
crate::extras::ExtrasCapabilityProvider::default(),
None,
)?)?;
Ok(())
}
}
pub(crate) fn get_descriptor(host: &mut WapcHost) -> Result<CapabilityDescriptor> {
let msg = wascc_codec::core::HealthRequest { placeholder: false };
let res = host.call(OP_GET_CAPABILITY_DESCRIPTOR, &serialize(&msg)?)?;
deserialize(&res).map_err(|e| e.into())
}
pub(crate) fn remove_cap(
caps: Arc<RwLock<HashMap<crate::RouteKey, CapabilityDescriptor>>>,
capid: &str,
binding: &str,
) {
caps.write()
.unwrap()
.remove(&(binding.to_string(), capid.to_string()));
}
pub(crate) fn replace_actor(
hostkey: &KeyPair,
bus: Arc<MessageBus>,
new_actor: Actor,
) -> Result<()> {
let public_key = new_actor.token.claims.subject;
let tgt_subject = crate::bus::actor_subject(&public_key);
let inv = gen_liveupdate_invocation(hostkey, &public_key, new_actor.bytes);
match bus.invoke(&tgt_subject, inv) {
Ok(_) => {
info!("Actor {} replaced", public_key);
Ok(())
}
Err(e) => Err(e),
}
}
pub(crate) fn live_update(guest: &mut WapcHost, inv: &Invocation) -> InvocationResponse {
match guest.replace_module(&inv.msg) {
Ok(_) => InvocationResponse::success(inv, vec![]),
Err(e) => {
error!("Failed to perform hot swap, ignoring message: {}", e);
InvocationResponse::error(inv, "Failed to perform hot swap")
}
}
}
fn gen_liveupdate_invocation(hostkey: &KeyPair, target: &str, bytes: Vec<u8>) -> Invocation {
Invocation::new(
hostkey,
WasccEntity::Actor(SYSTEM_ACTOR.to_string()),
WasccEntity::Actor(target.to_string()),
OP_PERFORM_LIVE_UPDATE,
bytes,
)
}
pub(crate) fn deconfigure_actor(
hostkey: KeyPair,
bus: Arc<MessageBus>,
bindings: Arc<RwLock<BindingsList>>,
key: &str,
) {
let cfg = CapabilityConfiguration {
module: key.to_string(),
values: HashMap::new(),
};
let buf = serialize(&cfg).unwrap();
let nbindings: Vec<_> = {
let lock = bindings.read().unwrap();
lock.keys()
.filter(|(a, _cap, _bind)| a == key)
.cloned()
.collect()
};
for (actor, capid, binding) in nbindings {
info!("Unbinding actor {} from {},{}", actor, binding, capid);
let _inv_r = bus.invoke(
&bus::provider_subject_bound_actor(&capid, &binding, &actor),
gen_remove_actor(&hostkey, buf.clone(), &binding, &capid),
);
remove_binding(bindings.clone(), key, &binding, &capid);
}
}
pub(crate) fn unbind_all_from_cap(bindings: Arc<RwLock<BindingsList>>, capid: &str, binding: &str) {
let mut lock = bindings.write().unwrap();
lock.retain(|k, _| !(k.1 == capid) && (k.2 == binding));
}
pub(crate) fn remove_binding(
bindings: Arc<RwLock<BindingsList>>,
actor: &str,
binding: &str,
capid: &str,
) {
let mut lock = bindings.write().unwrap();
lock.remove(&(actor.to_string(), capid.to_string(), binding.to_string()));
}
pub(crate) fn gen_remove_actor(
hostkey: &KeyPair,
msg: Vec<u8>,
binding: &str,
capid: &str,
) -> Invocation {
Invocation::new(
hostkey,
WasccEntity::Actor(SYSTEM_ACTOR.to_string()),
WasccEntity::Capability {
capid: capid.to_string(),
binding: binding.to_string(),
},
OP_REMOVE_ACTOR,
msg,
)
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "lattice", derive(serde::Serialize, serde::Deserialize))]
pub struct Invocation {
pub origin: WasccEntity,
pub target: WasccEntity,
pub operation: String,
pub msg: Vec<u8>,
pub id: String,
pub encoded_claims: String,
pub host_id: String,
}
#[derive(Debug, Clone, PartialEq)]
#[cfg_attr(feature = "lattice", derive(serde::Serialize, serde::Deserialize))]
pub enum WasccEntity {
Actor(String),
Capability { capid: String, binding: String },
}
impl WasccEntity {
pub fn url(&self) -> String {
match self {
WasccEntity::Actor(pk) => format!("{}://{}", bus::URL_SCHEME, pk),
WasccEntity::Capability { capid, binding } => format!(
"{}://{}/{}",
bus::URL_SCHEME,
capid.replace(":", "/").replace(" ", "_").to_lowercase(),
binding.replace(" ", "_").to_lowercase(),
),
}
}
}
impl Invocation {
pub fn new(
hostkey: &KeyPair,
origin: WasccEntity,
target: WasccEntity,
op: &str,
msg: Vec<u8>,
) -> Invocation {
let subject = format!("{}", Uuid::new_v4());
let issuer = hostkey.public_key();
let target_url = format!("{}/{}", target.url(), op);
let claims = Claims::<wascap::prelude::Invocation>::new(
issuer.to_string(),
subject.to_string(),
&target_url,
&origin.url(),
&invocation_hash(&target_url, &origin.url(), &msg),
);
Invocation {
origin,
target,
operation: op.to_string(),
msg,
id: subject,
encoded_claims: claims.encode(&hostkey).unwrap(),
host_id: issuer.to_string(),
}
}
pub fn origin_url(&self) -> String {
self.origin.url()
}
pub fn target_url(&self) -> String {
format!("{}/{}", self.target.url(), self.operation)
}
pub fn hash(&self) -> String {
invocation_hash(&self.target_url(), &self.origin_url(), &self.msg)
}
pub fn validate_antiforgery(&self) -> Result<()> {
let vr = wascap::jwt::validate_token::<wascap::prelude::Invocation>(&self.encoded_claims)?;
let claims = Claims::<wascap::prelude::Invocation>::decode(&self.encoded_claims)?;
if vr.expired {
return Err(errors::new(ErrorKind::Authorization(
"Invocation claims token expired".into(),
)));
}
if !vr.signature_valid {
return Err(errors::new(ErrorKind::Authorization(
"Invocation claims signature invalid".into(),
)));
}
if vr.cannot_use_yet {
return Err(errors::new(ErrorKind::Authorization(
"Attempt to use invocation before claims token allows".into(),
)));
}
let inv_claims = claims.metadata.unwrap();
if inv_claims.invocation_hash != self.hash() {
return Err(errors::new(ErrorKind::Authorization(
"Invocation hash does not match signed claims hash".into(),
)));
}
if claims.subject != self.id {
return Err(errors::new(ErrorKind::Authorization(
"Subject of invocation claims token does not match invocation ID".into(),
)));
}
if claims.issuer != self.host_id {
return Err(errors::new(ErrorKind::Authorization(
"Invocation claims issuer does not match invocation host".into(),
)));
}
if inv_claims.target_url != self.target_url() {
return Err(errors::new(ErrorKind::Authorization(
"Invocation claims and invocation target URL do not match".into(),
)));
}
if inv_claims.origin_url != self.origin_url() {
return Err(errors::new(ErrorKind::Authorization(
"Invocation claims and invocation origin URL do not match".into(),
)));
}
Ok(())
}
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "lattice", derive(serde::Serialize, serde::Deserialize))]
pub struct InvocationResponse {
pub msg: Vec<u8>,
pub error: Option<String>,
pub invocation_id: String,
}
impl InvocationResponse {
pub fn success(inv: &Invocation, msg: Vec<u8>) -> InvocationResponse {
InvocationResponse {
msg,
error: None,
invocation_id: inv.id.to_string(),
}
}
pub fn error(inv: &Invocation, err: &str) -> InvocationResponse {
InvocationResponse {
msg: Vec::new(),
error: Some(err.to_string()),
invocation_id: inv.id.to_string(),
}
}
}
pub(crate) fn wapc_host_callback(
hostkey: KeyPair,
claims: Claims<wascap::jwt::Actor>,
bus: Arc<MessageBus>,
binding: &str,
namespace: &str,
operation: &str,
payload: &[u8],
) -> std::result::Result<Vec<u8>, Box<dyn std::error::Error>> {
trace!(
"Guest {} invoking {}:{}",
claims.subject,
namespace,
operation
);
let capability_id = namespace;
let inv = invocation_from_callback(
&hostkey,
&claims.subject,
binding,
namespace,
operation,
payload,
);
if !authz::can_invoke(&claims, capability_id) {
return Err(Box::new(errors::new(errors::ErrorKind::Authorization(
format!(
"Actor {} attempted to call {} on {},{} - PERMISSION DENIED.",
claims.subject, operation, capability_id, binding
),
))));
}
let invoke_subject = match &inv.target {
WasccEntity::Actor(subject) => bus::actor_subject(subject),
WasccEntity::Capability { capid, binding } => {
bus::provider_subject_bound_actor(capid, binding, &claims.subject)
}
};
match bus.invoke(&invoke_subject, inv) {
Ok(inv_r) => Ok(inv_r.msg),
Err(e) => Err(Box::new(errors::new(errors::ErrorKind::HostCallFailure(
e.into(),
)))),
}
}
fn invocation_from_callback(
hostkey: &KeyPair,
origin: &str,
bd: &str,
ns: &str,
op: &str,
payload: &[u8],
) -> Invocation {
let binding = if bd.trim().is_empty() {
"default".to_string()
} else {
bd.to_string()
};
let target = if ns.len() == 56 && ns.starts_with("M") {
WasccEntity::Actor(ns.to_string())
} else {
WasccEntity::Capability {
binding,
capid: ns.to_string(),
}
};
Invocation::new(
hostkey,
WasccEntity::Actor(origin.to_string()),
target,
op,
payload.to_vec(),
)
}
pub(crate) fn gen_config_invocation(
hostkey: &KeyPair,
actor: &str,
capid: &str,
claims: Claims<wascap::jwt::Actor>,
binding: String,
values: HashMap<String, String>,
) -> Invocation {
use wascc_codec::core::*;
let mut values = values.clone();
values.insert(
CONFIG_WASCC_CLAIMS_ISSUER.to_string(),
claims.issuer.to_string(),
);
values.insert(
CONFIG_WASCC_CLAIMS_CAPABILITIES.to_string(),
claims
.metadata
.as_ref()
.unwrap()
.caps
.as_ref()
.unwrap_or(&Vec::new())
.join(","),
);
values.insert(CONFIG_WASCC_CLAIMS_NAME.to_string(), claims.name());
values.insert(
CONFIG_WASCC_CLAIMS_EXPIRES.to_string(),
claims.expires.unwrap_or(0).to_string(),
);
values.insert(
CONFIG_WASCC_CLAIMS_TAGS.to_string(),
claims
.metadata
.as_ref()
.unwrap()
.tags
.as_ref()
.unwrap_or(&Vec::new())
.join(","),
);
let cfgvals = CapabilityConfiguration {
module: actor.to_string(),
values,
};
let payload = serialize(&cfgvals).unwrap();
Invocation::new(
hostkey,
WasccEntity::Actor(SYSTEM_ACTOR.to_string()),
WasccEntity::Capability {
capid: capid.to_string(),
binding,
},
OP_BIND_ACTOR,
payload,
)
}
fn sha256_digest<R: Read>(mut reader: R) -> Result<Digest> {
let mut context = Context::new(&SHA256);
let mut buffer = [0; 1024];
loop {
let count = reader.read(&mut buffer)?;
if count == 0 {
break;
}
context.update(&buffer[..count]);
}
Ok(context.finish())
}
pub fn invocation_hash(target_url: &str, origin_url: &str, msg: &[u8]) -> String {
use std::io::Write;
let mut cleanbytes: Vec<u8> = Vec::new();
cleanbytes.write(origin_url.as_bytes()).unwrap();
cleanbytes.write(target_url.as_bytes()).unwrap();
cleanbytes.write(msg).unwrap();
let digest = sha256_digest(cleanbytes.as_slice()).unwrap();
HEXUPPER.encode(digest.as_ref())
}
#[cfg(test)]
mod test {
use super::Invocation;
use crate::WasccEntity;
use wascap::prelude::KeyPair;
#[test]
fn invocation_antiforgery() {
let hostkey = KeyPair::new_server();
let inv = Invocation::new(
&hostkey,
WasccEntity::Actor("testing".into()),
WasccEntity::Capability {
capid: "wascc:messaging".into(),
binding: "default".into(),
},
"OP_TESTING",
vec![1, 2, 3, 4],
);
let res = inv.validate_antiforgery();
println!("{:?}", res);
assert!(inv.validate_antiforgery().is_ok());
let mut bad_inv = inv.clone();
bad_inv.target = WasccEntity::Actor("BADACTOR-EXFILTRATOR".into());
assert!(bad_inv.validate_antiforgery().is_err());
let mut really_bad_inv = inv.clone();
really_bad_inv.msg = vec![5, 4, 3, 2];
assert!(really_bad_inv.validate_antiforgery().is_err());
assert_eq!(
inv.target_url(),
"wasmbus://wascc/messaging/default/OP_TESTING"
);
}
}