use super::router::Router;
use super::Result;
use crate::actor::Actor;
use crate::authz;
use crate::capability::Capability;
use crate::dispatch::WasccNativeDispatcher;
use crate::errors;
use crate::middleware;
use crate::middleware::Middleware;
use crate::router::InvokerPair;
use crossbeam::{Receiver, Sender};
use crossbeam_channel as channel;
use crossbeam_utils::sync::WaitGroup;
use prost::Message;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
use std::thread;
use wapc::prelude::*;
use wascap::jwt::Claims;
use wascc_codec::core::CapabilityConfiguration;
use wascc_codec::core::OP_CONFIGURE;
pub use authz::set_auth_hook;
lazy_static! {
pub static ref ROUTER: Arc<RwLock<Router>> = {
wapc::set_host_callback(host_callback);
Arc::new(RwLock::new(Router::default()))
};
}
pub fn add_middleware(mid: impl Middleware) {
middleware::MIDDLEWARES.write().unwrap().push(Box::new(mid));
}
pub fn add_capability(actor: Actor, wasi: WasiParams) -> Result<()> {
let wg = WaitGroup::new();
let router = ROUTER.clone();
listen_for_invocations(
wg.clone(),
actor.token.claims,
router,
actor.bytes.clone(),
Some(wasi),
false,
)?;
wg.wait();
Ok(())
}
pub fn add_actor(actor: Actor) -> Result<()> {
let wg = WaitGroup::new();
info!("Adding actor {} to host", actor.public_key());
let router = ROUTER.clone();
listen_for_invocations(
wg.clone(),
actor.token.claims,
router,
actor.bytes.clone(),
None,
true,
)?;
wg.wait();
Ok(())
}
pub fn add_native_capability(capability: Capability) -> Result<()> {
let capid = capability.capid.clone();
crate::plugins::PLUGMAN
.write()
.unwrap()
.add_plugin(capability)?;
let wg = WaitGroup::new();
let router = ROUTER.clone();
if router.read().unwrap().get_pair(&capid).is_some() {
return Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
"Attempt to register the same capability provider multiple times: {}",
capid
))));
}
listen_for_native_invocations(wg.clone(), router, &capid)?;
wg.wait();
Ok(())
}
pub fn configure(module: &str, capid: &str, config: HashMap<String, String>) -> Result<()> {
if !authz::can_invoke(module, capid) {
return Err(errors::new(errors::ErrorKind::Authorization(format!(
"Actor {} is not authorized to use capability {}, configuration rejected",
module, capid
))));
}
info!(
"Attempting to configure actor {} for capability {}",
module, capid
);
let capid = capid.to_string();
let module = module.to_string();
let router = ROUTER.clone();
let pair = router.read().unwrap().get_pair(&capid);
match pair {
Some(pair) => {
trace!("Sending configuration to {}", capid);
let res = invoke(
&pair,
"system".to_string(),
&format!("{}!{}", capid, OP_CONFIGURE),
&gen_config_proto(&module, config),
)?;
if let Some(e) = res.error {
Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
"Failed to configure {} - {}",
capid, e
))))
} else {
Ok(())
}
}
None => Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
"No such capability provider: {}",
capid
)))),
}
}
fn listen_for_native_invocations(
wg: WaitGroup,
router: Arc<RwLock<Router>>,
capid: &str,
) -> Result<()> {
let capid = capid.to_string();
thread::spawn(move || {
let (inv_s, inv_r): (Sender<Invocation>, Receiver<Invocation>) = channel::unbounded();
let (resp_s, resp_r): (Sender<InvocationResponse>, Receiver<InvocationResponse>) =
channel::unbounded();
let dispatcher = WasccNativeDispatcher::new(resp_r.clone(), inv_s.clone(), &capid);
crate::plugins::PLUGMAN
.write()
.unwrap()
.register_dispatcher(&capid, dispatcher)
.unwrap();
{
let mut lock = router.write().unwrap();
lock.add_route(capid.to_string(), inv_s, resp_r);
}
info!("Native capability provider '{}' ready", capid);
drop(wg);
loop {
if let Ok(inv) = inv_r.recv() {
let v: Vec<_> = inv.operation.split('!').collect();
let target = v[0];
info!(
"Capability {} received invocation for target {}",
capid, target
);
let inv_r = if target == capid {
middleware::invoke_capability(inv).unwrap()
} else {
if !authz::can_invoke(target, &capid) {
InvocationResponse::error(&format!(
"Dispatch between actor and unauthorized capability: {} <-> {}",
target, capid
))
} else {
let pair = router.read().unwrap().get_pair(target);
match pair {
Some(ref p) => {
invoke(p, capid.clone(), &inv.operation, &inv.msg).unwrap()
}
None => InvocationResponse::error("Dispatch to unknown actor"),
}
}
};
resp_s.send(inv_r).unwrap();
}
}
});
Ok(())
}
fn listen_for_invocations(
wg: WaitGroup,
claims: Claims,
router: Arc<RwLock<Router>>,
buf: Vec<u8>,
wasi: Option<WasiParams>,
actor: bool,
) -> Result<()> {
thread::spawn(move || {
info!(
"Loading {} module...",
if actor { "actor" } else { "capability" }
);
let mut guest = WapcHost::new(&buf, wasi).unwrap();
authz::map_claims(guest.id(), &claims.subject);
let (inv_s, inv_r): (Sender<Invocation>, Receiver<Invocation>) = channel::unbounded();
let (resp_s, resp_r): (Sender<InvocationResponse>, Receiver<InvocationResponse>) =
channel::unbounded();
{
let route_key = if actor {
claims.subject
} else {
claims.caps.unwrap()[0].to_string()
};
let mut lock = router.write().unwrap();
lock.add_route(route_key.clone(), inv_s, resp_r);
info!(
"Actor {} ready for communications, capability: {}",
route_key, !actor
);
}
drop(wg);
loop {
if let Ok(inv) = inv_r.recv() {
let v: Vec<_> = inv.operation.split('!').collect();
let inv = Invocation::new(inv.origin, v[1], inv.msg);
let inv_r = middleware::invoke_actor(inv, &mut guest).unwrap();
resp_s.send(inv_r).unwrap();
}
}
});
Ok(())
}
fn host_callback(
id: u64,
op: &str,
payload: &[u8],
) -> std::result::Result<Vec<u8>, Box<dyn std::error::Error>> {
info!("Guest {} invoking {}", id, op);
let v: Vec<_> = op.split('!').collect();
let capability_id = v[0];
if !authz::can_id_invoke(id, capability_id) {
return Err(Box::new(errors::new(errors::ErrorKind::Authorization(
format!(
"Actor {} does not have permission to use capability {}",
id, capability_id
),
))));
}
let pair = ROUTER.read().unwrap().get_pair(capability_id);
match pair {
Some((inv_s, resp_r)) => {
info!("invoking");
inv_s.send(Invocation::new(authz::pk_for_id(id), op, payload.to_vec()))?;
match resp_r.recv() {
Ok(ir) => Ok(ir.msg),
Err(e) => Err(Box::new(errors::new(errors::ErrorKind::HostCallFailure(
e.into(),
)))),
}
}
None => Err(Box::new(errors::new(errors::ErrorKind::HostCallFailure(
"dispatch to non-existent capability".into(),
)))),
}
}
fn invoke(
pair: &InvokerPair,
origin: String,
op: &str,
payload: &[u8],
) -> Result<InvocationResponse> {
trace!("invoking: {} from {}", op, origin);
let (inv_s, resp_r) = pair;
inv_s
.send(Invocation::new(origin, op, payload.to_vec()))
.unwrap();
Ok(resp_r.recv().unwrap())
}
fn gen_config_proto(module: &str, values: HashMap<String, String>) -> Vec<u8> {
let mut buf = Vec::new();
let cfgvals = CapabilityConfiguration {
module: module.to_string(),
values,
};
cfgvals.encode(&mut buf).unwrap();
buf
}
#[derive(Debug, Clone)]
pub struct Invocation {
pub origin: String,
pub operation: String,
pub msg: Vec<u8>,
}
impl Invocation {
pub fn new(origin: String, op: &str, msg: Vec<u8>) -> Invocation {
Invocation {
origin,
operation: op.to_string(),
msg,
}
}
}
#[derive(Debug, Clone)]
pub struct InvocationResponse {
pub msg: Vec<u8>,
pub error: Option<String>,
}
impl InvocationResponse {
pub fn success(msg: Vec<u8>) -> InvocationResponse {
InvocationResponse { msg, error: None }
}
pub fn error(err: &str) -> InvocationResponse {
InvocationResponse {
msg: Vec::new(),
error: Some(err.to_string()),
}
}
}