#![doc(html_logo_url = "https://avatars0.githubusercontent.com/u/52050279?s=200&v=4")]
#[macro_use]
extern crate log;
#[macro_use]
extern crate crossbeam;
pub type Result<T> = std::result::Result<T, errors::Error>;
pub use actor::Actor;
pub use capability::{CapabilitySummary, NativeCapability};
pub use inthost::{Invocation, InvocationResponse, InvocationTarget};
#[cfg(feature = "manifest")]
pub use manifest::{BindingEntry, HostManifest};
pub use middleware::Middleware;
pub use wapc::prelude::WasiParams;
mod actor;
mod authz;
mod capability;
mod dispatch;
pub mod errors;
mod extras;
mod inthost;
#[cfg(feature = "manifest")]
mod manifest;
mod middleware;
mod plugins;
mod router;
pub type SubjectClaimsPair = (String, Claims<wascap::jwt::Actor>);
use authz::AuthHook;
use inthost::ACTOR_BINDING;
use plugins::PluginManager;
use router::Router;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use wascap::jwt::{Claims, Token};
use wascc_codec::{
core::{CapabilityConfiguration, OP_BIND_ACTOR},
serialize,
};
type BindingsList = Vec<(String, String, String)>;
#[derive(Clone)]
pub struct WasccHost {
claims: Arc<RwLock<HashMap<String, Claims<wascap::jwt::Actor>>>>,
router: Arc<RwLock<Router>>,
plugins: Arc<RwLock<PluginManager>>,
auth_hook: Arc<RwLock<Option<Box<AuthHook>>>>,
bindings: Arc<RwLock<BindingsList>>,
caps: Arc<RwLock<Vec<CapabilitySummary>>>,
middlewares: Arc<RwLock<Vec<Box<dyn Middleware>>>>,
#[cfg(feature = "gantry")]
gantry_client: Arc<RwLock<Option<gantryclient::Client>>>,
}
impl WasccHost {
pub fn new() -> Self {
#[cfg(feature = "gantry")]
let host = WasccHost {
claims: Arc::new(RwLock::new(HashMap::new())),
router: Arc::new(RwLock::new(Router::default())),
plugins: Arc::new(RwLock::new(PluginManager::default())),
auth_hook: Arc::new(RwLock::new(None)),
bindings: Arc::new(RwLock::new(vec![])),
caps: Arc::new(RwLock::new(vec![])),
middlewares: Arc::new(RwLock::new(vec![])),
gantry_client: Arc::new(RwLock::new(None)),
};
#[cfg(not(feature = "gantry"))]
let host = WasccHost {
claims: Arc::new(RwLock::new(HashMap::new())),
router: Arc::new(RwLock::new(Router::default())),
plugins: Arc::new(RwLock::new(PluginManager::default())),
auth_hook: Arc::new(RwLock::new(None)),
bindings: Arc::new(RwLock::new(vec![])),
middlewares: Arc::new(RwLock::new(vec![])),
caps: Arc::new(RwLock::new(vec![])),
};
host.ensure_extras().unwrap();
host
}
pub fn add_actor(&self, actor: Actor) -> Result<()> {
let wg = crossbeam_utils::sync::WaitGroup::new();
if self
.router
.read()
.unwrap()
.route_exists(ACTOR_BINDING, &actor.public_key())
{
return Err(errors::new(errors::ErrorKind::MiscHost(format!(
"Actor {} is already running in this host, failed to add.",
actor.public_key()
))));
}
authz::enforce_validation(&actor.token.jwt)?;
if !self.check_auth(&actor.token) {
return Err(errors::new(errors::ErrorKind::Authorization(
"Authorization hook denied access to module".into(),
)));
}
info!("Adding actor {} to host", actor.public_key());
self.spawn_actor_and_listen(
wg.clone(),
actor.token.claims,
actor.bytes.clone(),
None,
true,
ACTOR_BINDING.to_string(),
)?;
wg.wait();
Ok(())
}
#[cfg(feature = "gantry")]
pub fn add_actor_from_gantry(&self, actor: &str) -> Result<()> {
{
let lock = self.gantry_client.read().unwrap();
if lock.as_ref().is_none() {
return Err(errors::new(errors::ErrorKind::MiscHost(
"No gantry client configured".to_string()
)));
}
}
use crossbeam_channel::unbounded;
let (s, r) = unbounded();
let bytevec = Arc::new(RwLock::new(Vec::new()));
let b = bytevec.clone();
let _ack =
self.gantry_client.read().unwrap().as_ref().unwrap()
.download_actor(actor, move |chunk| {
bytevec
.write()
.unwrap()
.extend_from_slice(&chunk.chunk_bytes);
if chunk.sequence_no == chunk.total_chunks {
s.send(true).unwrap();
}
Ok(())
});
let _ = r.recv().unwrap();
let vec = b.read().unwrap();
self.add_actor(Actor::from_bytes(vec.clone())?)
}
pub fn add_capability(
&self,
actor: Actor,
binding: Option<&str>,
wasi: WasiParams,
) -> Result<()> {
let token = authz::extract_claims(&actor.bytes)?;
let capid = token.claims.metadata.unwrap().caps.unwrap()[0].clone();
let binding = binding.unwrap_or("default");
if self.router.read().unwrap().route_exists(&binding, &capid) {
return Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
"Capability provider {} cannot be bound to the same name ({}) twice, loading failed.", capid,
binding)
)));
}
let claims = actor.token.claims.clone();
let summary = CapabilitySummary {
id: capid.clone(),
name: claims
.metadata
.unwrap()
.name
.unwrap_or(capid.clone())
.to_string(),
binding: binding.to_string(),
portable: true,
};
self.caps.write().unwrap().push(summary);
info!("Adding portable capability to host: {},{}", binding, capid);
let wg = crossbeam_utils::sync::WaitGroup::new();
self.spawn_actor_and_listen(
wg.clone(),
actor.token.claims,
actor.bytes.clone(),
Some(wasi),
false,
binding.to_string(),
)?;
wg.wait();
Ok(())
}
pub fn remove_actor(&self, pk: &str) -> Result<()> {
self.router
.write()
.unwrap()
.terminate_route(crate::inthost::ACTOR_BINDING, pk)
}
pub fn replace_actor(&self, new_actor: Actor) -> Result<()> {
crate::inthost::replace_actor(self.router.clone(), new_actor)
}
pub fn add_middleware(&self, mid: impl Middleware) {
self.middlewares.write().unwrap().push(Box::new(mid));
}
pub fn set_auth_hook<F>(&self, hook: F)
where
F: Fn(&Token<wascap::jwt::Actor>) -> bool + Sync + Send + 'static,
{
*self.auth_hook.write().unwrap() = Some(Box::new(hook));
}
pub fn add_native_capability(&self, capability: NativeCapability) -> Result<()> {
let capid = capability.capid.clone();
if self
.router
.read()
.unwrap()
.route_exists(&capability.binding_name, &capability.id())
{
return Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
"Capability provider {} cannot be bound to the same name ({}) twice, loading failed.", capid, capability.binding_name
))));
}
let summary = CapabilitySummary {
id: capid.clone(),
name: capability.name(),
binding: capability.binding_name.to_string(),
portable: false,
};
let wg = crossbeam_utils::sync::WaitGroup::new();
self.spawn_capability_provider_and_listen(capability, summary, wg.clone())?;
wg.wait();
Ok(())
}
pub fn remove_native_capability(
&self,
capability_id: &str,
binding_name: Option<String>,
) -> Result<()> {
let b = binding_name.unwrap_or("default".to_string());
if let Some(entry) = self.router.read().unwrap().get_route(&b, capability_id) {
entry.terminate();
Ok(())
} else {
Err(errors::new(errors::ErrorKind::MiscHost(
"No such capability".into(),
)))
}
}
pub fn bind_actor(
&self,
actor: &str,
capid: &str,
binding_name: Option<String>,
config: HashMap<String, String>,
) -> Result<()> {
let claims = self.claims.read().unwrap().get(actor).cloned();
if claims.is_none() {
return Err(errors::new(errors::ErrorKind::MiscHost(
"Attempted to bind non-existent actor".to_string(),
)));
}
if !authz::can_invoke(&claims.unwrap(), capid) {
return Err(errors::new(errors::ErrorKind::Authorization(format!(
"Unauthorized binding: actor {} is not authorized to use capability {}.",
actor, capid
))));
}
let binding = binding_name.unwrap_or("default".to_string());
info!(
"Attempting to bind actor {} to {},{}",
actor, &binding, capid
);
match self.router.read().unwrap().get_route(&binding, capid) {
Some(entry) => {
let res = entry.invoke(inthost::gen_config_invocation(
actor,
capid,
binding.clone(),
config,
))?;
if let Some(e) = res.error {
Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
"Failed to configure {},{} - {}",
binding, capid, e
))))
} else {
self.record_binding(actor, capid, &binding)?;
Ok(())
}
}
None => {
if actor == capid {
let cfgvals = CapabilityConfiguration {
module: actor.to_string(),
values: config,
};
let payload = serialize(&cfgvals).unwrap();
self.call_actor(actor, OP_BIND_ACTOR, &payload).map(|_| ())
} else {
Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
"No such capability provider: {},{}",
binding, capid
))))
}
}
}
}
#[cfg(feature = "gantry")]
pub fn configure_gantry(&self, nats_urls: Vec<String>, jwt: &str, seed: &str) -> Result<()> {
*self.gantry_client.write().unwrap() = Some(gantryclient::Client::new(nats_urls, jwt, seed));
Ok(())
}
pub fn call_actor(&self, actor: &str, operation: &str, msg: &[u8]) -> Result<Vec<u8>> {
match self.router.read().unwrap().get_route(ACTOR_BINDING, actor) {
Some(entry) => {
match entry.invoke(Invocation::new(
"system".to_string(),
InvocationTarget::Actor(actor.to_string()),
operation,
msg.to_vec(),
)) {
Ok(resp) => Ok(resp.msg),
Err(e) => Err(e),
}
}
None => Err(errors::new(errors::ErrorKind::MiscHost(
"No such actor".into(),
))),
}
}
pub fn claims_for_actor(&self, pk: &str) -> Option<Claims<wascap::jwt::Actor>> {
self.claims.read().unwrap().get(pk).cloned()
}
#[cfg(feature = "manifest")]
pub fn apply_manifest(&self, manifest: HostManifest) -> Result<()> {
for actor in manifest.actors {
#[cfg(feature = "gantry")]
self.add_actor_gantry_first(&actor)?;
#[cfg(not(feature = "gantry"))]
self.add_actor(Actor::from_file(&actor)?)?;
}
for cap in manifest.capabilities {
self.add_native_capability(NativeCapability::from_file(cap.path, cap.binding_name)?)?;
}
for config in manifest.bindings {
self.bind_actor(
&config.actor,
&config.capability,
config.binding,
config.values,
)?;
}
Ok(())
}
#[cfg(feature = "gantry")]
fn add_actor_gantry_first(&self, actor: &str) -> Result<()> {
if actor.len() == 56 && actor.starts_with('M') {
self.add_actor_from_gantry(actor)
} else {
self.add_actor(Actor::from_file(&actor)?)
}
}
pub fn actors(&self) -> Vec<SubjectClaimsPair> {
authz::get_all_claims(self.claims.clone())
}
pub fn capabilities(&self) -> Vec<CapabilitySummary> {
let lock = self.caps.read().unwrap();
lock.iter().cloned().collect()
}
}