use super::router::Router;
use super::Result;
use crate::actor::Actor;
use crate::authz;
use crate::capability::Capability;
use crate::dispatch::WasccNativeDispatcher;
use crate::errors;
use crate::middleware;
use crate::middleware::Middleware;
use crate::router::InvokerPair;
use crossbeam::{Receiver, Sender};
use crossbeam_channel as channel;
use crossbeam_utils::sync::WaitGroup;
use prost::Message;
use std::collections::HashMap;
use std::sync::RwLock;
use std::thread;
use wapc::prelude::*;
use wascap::jwt::Claims;
use wascc_codec::core::CapabilityConfiguration;
use wascc_codec::core::OP_CONFIGURE;
use wascc_codec::core::OP_REMOVE_ACTOR;
pub use authz::set_auth_hook;
lazy_static! {
pub static ref ROUTER: RwLock<Router> = {
RwLock::new(Router::default())
};
pub static ref TERMINATORS: RwLock<HashMap<String, Sender<bool>>> =
{ RwLock::new(HashMap::new()) };
}
pub fn add_middleware(mid: impl Middleware) {
middleware::MIDDLEWARES.write().unwrap().push(Box::new(mid));
}
pub fn add_capability(actor: Actor, wasi: WasiParams) -> Result<()> {
let wg = WaitGroup::new();
listen_for_invocations(
wg.clone(),
actor.token.claims,
actor.bytes.clone(),
Some(wasi),
false,
)?;
wg.wait();
Ok(())
}
pub fn remove_capability(cap_id: &str) -> Result<()> {
if let Some(term_s) = TERMINATORS.read().unwrap().get(cap_id) {
term_s.send(true).unwrap();
Ok(())
} else {
Err(errors::new(errors::ErrorKind::MiscHost(
"No such capability".into(),
)))
}
}
pub fn add_actor(actor: Actor) -> Result<()> {
let wg = WaitGroup::new();
info!("Adding actor {} to host", actor.public_key());
listen_for_invocations(
wg.clone(),
actor.token.claims,
actor.bytes.clone(),
None,
true,
)?;
wg.wait();
Ok(())
}
pub fn remove_actor(pk: &str) -> Result<()> {
if let Some(term_s) = TERMINATORS.read().unwrap().get(pk) {
term_s.send(true).unwrap();
Ok(())
} else {
Err(errors::new(errors::ErrorKind::MiscHost(
"No such actor".into(),
)))
}
}
pub fn add_native_capability(capability: Capability) -> Result<()> {
let capid = capability.capid.clone();
crate::plugins::PLUGMAN
.write()
.unwrap()
.add_plugin(capability)?;
let wg = WaitGroup::new();
if ROUTER.read().unwrap().get_pair(&capid).is_some() {
return Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
"Attempt to register the same capability provider multiple times: {}",
capid
))));
}
listen_for_native_invocations(wg.clone(), &capid)?;
wg.wait();
Ok(())
}
pub fn remove_native_capabiltiy(capid: &str) -> Result<()> {
crate::plugins::PLUGMAN
.write()
.unwrap()
.remove_plugin(capid)?;
Ok(())
}
pub fn configure(module: &str, capid: &str, config: HashMap<String, String>) -> Result<()> {
if !authz::can_invoke(module, capid) {
return Err(errors::new(errors::ErrorKind::Authorization(format!(
"Actor {} is not authorized to use capability {}, configuration rejected",
module, capid
))));
}
info!(
"Attempting to configure actor {} for capability {}",
module, capid
);
let capid = capid.to_string();
let module = module.to_string();
let pair = ROUTER.read().unwrap().get_pair(&capid);
match pair {
Some(pair) => {
trace!("Sending configuration to {}", capid);
let res = invoke(
&pair,
"system".to_string(),
&format!("{}!{}", capid, OP_CONFIGURE),
&gen_config_proto(&module, config),
)?;
if let Some(e) = res.error {
Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
"Failed to configure {} - {}",
capid, e
))))
} else {
Ok(())
}
}
None => Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
"No such capability provider: {}",
capid
)))),
}
}
fn listen_for_native_invocations(wg: WaitGroup, capid: &str) -> Result<()> {
let capid = capid.to_string();
thread::spawn(move || {
let (inv_s, inv_r): (Sender<Invocation>, Receiver<Invocation>) = channel::unbounded();
let (resp_s, resp_r): (Sender<InvocationResponse>, Receiver<InvocationResponse>) =
channel::unbounded();
let (term_s, term_r): (Sender<bool>, Receiver<bool>) = channel::unbounded();
let dispatcher = WasccNativeDispatcher::new(resp_r.clone(), inv_s.clone(), &capid);
crate::plugins::PLUGMAN
.write()
.unwrap()
.register_dispatcher(&capid, dispatcher)
.unwrap();
ROUTER
.write()
.unwrap()
.add_route(capid.to_string(), inv_s, resp_r);
TERMINATORS.write().unwrap().insert(capid.clone(), term_s);
info!("Native capability provider '{}' ready", capid);
drop(wg);
loop {
select! {
recv(inv_r) -> inv => {
if let Ok(inv) = inv {
let v: Vec<_> = inv.operation.split('!').collect();
let target = v[0];
info!(
"Capability {} received invocation for target {}",
capid, target
);
let inv_r = if target == capid {
middleware::invoke_capability(inv).unwrap()
} else {
if !authz::can_invoke(target, &capid) {
InvocationResponse::error(&format!(
"Dispatch between actor and unauthorized capability: {} <-> {}",
target, capid
))
} else {
let pair = ROUTER.read().unwrap().get_pair(target);
match pair {
Some(ref p) => {
invoke(p, capid.clone(), &inv.operation, &inv.msg).unwrap()
}
None => InvocationResponse::error("Dispatch to unknown actor"),
}
}
};
resp_s.send(inv_r).unwrap();
}
},
recv(term_r) -> _term => {
info!("Terminating native capability provider {}", capid);
TERMINATORS.write().unwrap().remove(&capid);
ROUTER.write().unwrap().remove_route(&capid).unwrap();
break;
}
}
}
});
Ok(())
}
fn listen_for_invocations(
wg: WaitGroup,
claims: Claims,
buf: Vec<u8>,
wasi: Option<WasiParams>,
actor: bool,
) -> Result<()> {
thread::spawn(move || {
info!(
"Loading {} module...",
if actor { "actor" } else { "capability" }
);
let mut guest = WapcHost::new(host_callback, &buf, wasi).unwrap();
authz::map_claims(guest.id(), &claims.subject);
let (inv_s, inv_r): (Sender<Invocation>, Receiver<Invocation>) = channel::unbounded();
let (resp_s, resp_r): (Sender<InvocationResponse>, Receiver<InvocationResponse>) =
channel::unbounded();
let (term_s, term_r): (Sender<bool>, Receiver<bool>) = channel::unbounded();
let route_key = {
let route_key = if actor {
claims.subject
} else {
claims.caps.unwrap()[0].to_string()
};
ROUTER
.write()
.unwrap()
.add_route(route_key.clone(), inv_s, resp_r);
TERMINATORS
.write()
.unwrap()
.insert(route_key.clone(), term_s);
info!(
"Actor {} ready for communications, capability: {}",
route_key, !actor
);
route_key
};
drop(wg);
loop {
select! {
recv(inv_r) -> inv => {
if let Ok(inv) = inv {
let v: Vec<_> = inv.operation.split('!').collect();
let inv = Invocation::new(inv.origin, v[1], inv.msg);
let inv_r = middleware::invoke_actor(inv, &mut guest).unwrap();
resp_s.send(inv_r).unwrap();
}
},
recv(term_r) -> _term => {
info!("Terminating {} {}", if actor { "actor" } else { "capability" }, route_key);
if actor {
deconfigure_actor(&route_key);
}
TERMINATORS.write().unwrap().remove(&route_key);
ROUTER.write().unwrap().remove_route(&route_key).unwrap();
break;
}
}
}
});
Ok(())
}
fn deconfigure_actor(key: &str) {
let cfg = CapabilityConfiguration {
module: key.to_string(),
values: HashMap::new(),
};
let mut buf = Vec::new();
cfg.encode(&mut buf).unwrap();
ROUTER
.read()
.unwrap()
.all_capabilities()
.iter()
.for_each(|(capid, (sender, receiver))| {
let inv = Invocation {
origin: "system".to_string(),
msg: buf.clone(),
operation: format!("{}!{}", capid, OP_REMOVE_ACTOR),
};
sender.send(inv).unwrap();
let _res = receiver.recv().unwrap();
});
}
fn host_callback(
id: u64,
op: &str,
payload: &[u8],
) -> std::result::Result<Vec<u8>, Box<dyn std::error::Error>> {
info!("Guest {} invoking {}", id, op);
let v: Vec<_> = op.split('!').collect();
let capability_id = v[0];
if !authz::can_id_invoke(id, capability_id) {
return Err(Box::new(errors::new(errors::ErrorKind::Authorization(
format!(
"Actor {} does not have permission to use capability {}",
id, capability_id
),
))));
}
let pair = ROUTER.read().unwrap().get_pair(capability_id);
match pair {
Some((inv_s, resp_r)) => {
inv_s.send(Invocation::new(authz::pk_for_id(id), op, payload.to_vec()))?;
match resp_r.recv() {
Ok(ir) => Ok(ir.msg),
Err(e) => Err(Box::new(errors::new(errors::ErrorKind::HostCallFailure(
e.into(),
)))),
}
}
None => Err(Box::new(errors::new(errors::ErrorKind::HostCallFailure(
"Attempt to make host call into non-existent target".into(),
)))),
}
}
fn invoke(
pair: &InvokerPair,
origin: String,
op: &str,
payload: &[u8],
) -> Result<InvocationResponse> {
trace!("invoking: {} from {}", op, origin);
let (inv_s, resp_r) = pair;
inv_s
.send(Invocation::new(origin, op, payload.to_vec()))
.unwrap();
Ok(resp_r.recv().unwrap())
}
fn gen_config_proto(module: &str, values: HashMap<String, String>) -> Vec<u8> {
let mut buf = Vec::new();
let cfgvals = CapabilityConfiguration {
module: module.to_string(),
values,
};
cfgvals.encode(&mut buf).unwrap();
buf
}
#[derive(Debug, Clone)]
pub struct Invocation {
pub origin: String,
pub operation: String,
pub msg: Vec<u8>,
}
impl Invocation {
pub fn new(origin: String, op: &str, msg: Vec<u8>) -> Invocation {
Invocation {
origin,
operation: op.to_string(),
msg,
}
}
}
#[derive(Debug, Clone)]
pub struct InvocationResponse {
pub msg: Vec<u8>,
pub error: Option<String>,
}
impl InvocationResponse {
pub fn success(msg: Vec<u8>) -> InvocationResponse {
InvocationResponse { msg, error: None }
}
pub fn error(err: &str) -> InvocationResponse {
InvocationResponse {
msg: Vec::new(),
error: Some(err.to_string()),
}
}
}