#![doc(html_logo_url = "https://avatars0.githubusercontent.com/u/52050279?s=200&v=4")]
#[macro_use]
extern crate log;
#[macro_use]
extern crate crossbeam;
mod actor;
mod authz;
mod bus;
mod capability;
mod dispatch;
pub mod errors;
mod extras;
mod inthost;
#[cfg(feature = "manifest")]
mod manifest;
pub mod middleware;
mod plugins;
mod spawns;
pub const VERSION: &str = env!("CARGO_PKG_VERSION");
pub const REVISION: u32 = 2;
pub type Result<T> = std::result::Result<T, errors::Error>;
pub use actor::Actor;
pub use capability::NativeCapability;
pub use inthost::{Invocation, InvocationResponse, WasccEntity};
#[cfg(feature = "manifest")]
pub use manifest::{BindingEntry, HostManifest};
#[cfg(feature = "prometheus_middleware")]
pub use middleware::prometheus;
pub use middleware::Middleware;
pub use wapc::{prelude::WasiParams, WapcHost};
pub type SubjectClaimsPair = (String, Claims<wascap::jwt::Actor>);
use authz::AuthHook;
use bus::MessageBus;
use crossbeam::Sender;
use plugins::PluginManager;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use wascap::jwt::{Claims, Token};
use wascap::prelude::KeyPair;
use wascc_codec::{
capabilities::CapabilityDescriptor, core::CapabilityConfiguration, SYSTEM_ACTOR,
};
type BindingsList = HashMap<BindingTuple, CapabilityConfiguration>;
type BindingTuple = (String, String, String);
pub(crate) type RouteKey = (String, String);
#[derive(Clone)]
pub struct WasccHost {
bus: Arc<MessageBus>,
claims: Arc<RwLock<HashMap<String, Claims<wascap::jwt::Actor>>>>,
plugins: Arc<RwLock<PluginManager>>,
auth_hook: Arc<RwLock<Option<Box<AuthHook>>>>,
bindings: Arc<RwLock<BindingsList>>,
caps: Arc<RwLock<HashMap<RouteKey, CapabilityDescriptor>>>,
middlewares: Arc<RwLock<Vec<Box<dyn Middleware>>>>,
terminators: Arc<RwLock<HashMap<String, Sender<bool>>>>,
#[cfg(feature = "gantry")]
gantry_client: Arc<RwLock<Option<gantryclient::Client>>>,
key: KeyPair,
}
impl WasccHost {
pub fn new() -> Self {
let key = KeyPair::new_server();
let claims = Arc::new(RwLock::new(HashMap::new()));
let caps = Arc::new(RwLock::new(HashMap::new()));
let bindings = Arc::new(RwLock::new(HashMap::new()));
#[cfg(feature = "lattice")]
let bus = bus::new(
key.public_key(),
claims.clone(),
caps.clone(),
bindings.clone(),
);
#[cfg(not(feature = "lattice"))]
let bus = bus::new();
#[cfg(feature = "gantry")]
let host = WasccHost {
terminators: Arc::new(RwLock::new(HashMap::new())),
bus: Arc::new(bus),
claims,
plugins: Arc::new(RwLock::new(PluginManager::default())),
auth_hook: Arc::new(RwLock::new(None)),
bindings,
caps,
middlewares: Arc::new(RwLock::new(vec![])),
gantry_client: Arc::new(RwLock::new(None)),
key: key,
};
#[cfg(not(feature = "gantry"))]
let host = WasccHost {
terminators: Arc::new(RwLock::new(HashMap::new())),
bus: Arc::new(bus),
claims,
plugins: Arc::new(RwLock::new(PluginManager::default())),
auth_hook: Arc::new(RwLock::new(None)),
bindings,
middlewares: Arc::new(RwLock::new(vec![])),
caps,
key: key,
};
info!("Host ID is {} (v{})", host.key.public_key(), VERSION,);
host.ensure_extras().unwrap();
host
}
pub fn add_actor(&self, actor: Actor) -> Result<()> {
if self
.claims
.read()
.unwrap()
.contains_key(&actor.public_key())
{
return Err(errors::new(errors::ErrorKind::MiscHost(
format!("Actor {} is already in this host. Cannot host multiple instances of the same actor in the same host", actor.public_key())
)));
}
authz::enforce_validation(&actor.token.jwt)?;
if !self.check_auth(&actor.token) {
return Err(errors::new(errors::ErrorKind::Authorization(
"Authorization hook denied access to module".into(),
)));
}
authz::register_claims(
self.claims.clone(),
&actor.token.claims.subject,
actor.token.claims.clone(),
);
let wg = crossbeam_utils::sync::WaitGroup::new();
spawns::spawn_actor(
wg.clone(),
actor.token.claims.clone(),
actor.bytes.clone(),
None,
true,
None,
self.bus.clone(),
self.middlewares.clone(),
self.caps.clone(),
self.bindings.clone(),
self.claims.clone(),
self.terminators.clone(),
self.key.clone(),
)?;
wg.wait();
if actor.capabilities().contains(&extras::CAPABILITY_ID.into()) {
self.bind_actor(
&actor.public_key(),
extras::CAPABILITY_ID,
None,
HashMap::new(),
)?;
}
Ok(())
}
#[cfg(feature = "gantry")]
pub fn add_actor_from_gantry(&self, actor: &str) -> Result<()> {
{
let lock = self.gantry_client.read().unwrap();
if lock.as_ref().is_none() {
return Err(errors::new(errors::ErrorKind::MiscHost(
"No gantry client configured".to_string(),
)));
}
}
use crossbeam_channel::unbounded;
let (s, r) = unbounded();
let bytevec = Arc::new(RwLock::new(Vec::new()));
let b = bytevec.clone();
let _ack = self
.gantry_client
.read()
.unwrap()
.as_ref()
.unwrap()
.download_actor(actor, move |chunk| {
bytevec
.write()
.unwrap()
.extend_from_slice(&chunk.chunk_bytes);
if chunk.sequence_no == chunk.total_chunks {
s.send(true).unwrap();
}
Ok(())
});
let _ = r.recv().unwrap();
let vec = b.read().unwrap();
self.add_actor(Actor::from_bytes(vec.clone())?)
}
pub fn add_capability(
&self,
actor: Actor,
binding: Option<&str>,
wasi: WasiParams,
) -> Result<()> {
let binding = binding.unwrap_or("default");
let wg = crossbeam_utils::sync::WaitGroup::new();
spawns::spawn_actor(
wg.clone(),
actor.token.claims,
actor.bytes.clone(),
Some(wasi),
false,
Some(binding.to_string()),
self.bus.clone(),
self.middlewares.clone(),
self.caps.clone(),
self.bindings.clone(),
self.claims.clone(),
self.terminators.clone(),
self.key.clone(),
)?;
wg.wait();
Ok(())
}
pub fn remove_actor(&self, pk: &str) -> Result<()> {
self.terminators.read().unwrap()[&bus::actor_subject(pk)]
.send(true)
.unwrap();
Ok(())
}
pub fn replace_actor(&self, new_actor: Actor) -> Result<()> {
crate::inthost::replace_actor(&self.key, self.bus.clone(), new_actor)
}
pub fn add_middleware(&self, mid: impl Middleware) {
self.middlewares.write().unwrap().push(Box::new(mid));
}
pub fn set_auth_hook<F>(&self, hook: F)
where
F: Fn(&Token<wascap::jwt::Actor>) -> bool + Sync + Send + 'static,
{
*self.auth_hook.write().unwrap() = Some(Box::new(hook));
}
pub fn add_native_capability(&self, capability: NativeCapability) -> Result<()> {
let capid = capability.id();
if self
.caps
.read()
.unwrap()
.contains_key(&(capability.binding_name.to_string(), capability.id()))
{
return Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
"Capability provider {} cannot be bound to the same name ({}) twice, loading failed.", capid, capability.binding_name
))));
}
self.caps.write().unwrap().insert(
(
capability.binding_name.to_string(),
capability.descriptor.id.to_string(),
),
capability.descriptor().clone(),
);
let wg = crossbeam_utils::sync::WaitGroup::new();
spawns::spawn_native_capability(
capability,
self.bus.clone(),
self.middlewares.clone(),
self.bindings.clone(),
self.terminators.clone(),
self.plugins.clone(),
wg.clone(),
Arc::new(self.key.clone()),
)?;
wg.wait();
Ok(())
}
pub fn remove_native_capability(
&self,
capability_id: &str,
binding_name: Option<String>,
) -> Result<()> {
let b = binding_name.unwrap_or("default".to_string());
let subject = bus::provider_subject(capability_id, &b);
if let Some(terminator) = self.terminators.read().unwrap().get(&subject) {
terminator.send(true).unwrap();
Ok(())
} else {
Err(errors::new(errors::ErrorKind::MiscHost(
"No such capability".into(),
)))
}
}
pub fn bind_actor(
&self,
actor: &str,
capid: &str,
binding_name: Option<String>,
config: HashMap<String, String>,
) -> Result<()> {
let claims = self.claims.read().unwrap().get(actor).cloned();
if claims.is_none() {
return Err(errors::new(errors::ErrorKind::MiscHost(
"Attempted to bind non-existent actor".to_string(),
)));
}
let c = claims.unwrap().clone();
if !authz::can_invoke(&c, capid) {
return Err(errors::new(errors::ErrorKind::Authorization(format!(
"Unauthorized binding: actor {} is not authorized to use capability {}.",
actor, capid
))));
}
let binding = binding_name.unwrap_or("default".to_string());
info!(
"Attempting to bind actor {} to {},{}",
actor, &binding, capid
);
let tgt_subject = if (actor == capid || actor == SYSTEM_ACTOR) && capid.starts_with("M") {
bus::actor_subject(actor)
} else {
bus::provider_subject(capid, &binding)
};
info!("Binding subject: {}", tgt_subject);
let inv = inthost::gen_config_invocation(
&self.key,
actor,
capid,
c.clone(),
binding.clone(),
config.clone(),
);
match self.bus.invoke(&tgt_subject, inv) {
Ok(inv_r) => {
if let Some(e) = inv_r.error {
Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
"Failed to configure {},{} - {}",
binding, capid, e
))))
} else {
self.record_binding(
actor,
capid,
&binding,
&CapabilityConfiguration {
module: actor.to_string(),
values: config,
},
)?;
Ok(())
}
}
Err(e) => Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
"Failed to configure {},{} - {}",
binding, capid, e
)))),
}
}
#[cfg(feature = "gantry")]
pub fn configure_gantry(&self, nats_urls: Vec<String>, jwt: &str, seed: &str) -> Result<()> {
*self.gantry_client.write().unwrap() =
Some(gantryclient::Client::new(nats_urls, jwt, seed));
Ok(())
}
pub fn call_actor(&self, actor: &str, operation: &str, msg: &[u8]) -> Result<Vec<u8>> {
if !self.claims.read().unwrap().contains_key(actor) {
return Err(errors::new(errors::ErrorKind::MiscHost(
"No such actor".into(),
)));
}
let inv = Invocation::new(
&self.key,
WasccEntity::Actor(SYSTEM_ACTOR.to_string()),
WasccEntity::Actor(actor.to_string()),
operation,
msg.to_vec(),
);
let tgt_subject = bus::actor_subject(actor);
match self.bus.invoke(&tgt_subject, inv) {
Ok(resp) => Ok(resp.msg),
Err(e) => Err(e),
}
}
pub fn claims_for_actor(&self, pk: &str) -> Option<Claims<wascap::jwt::Actor>> {
self.claims.read().unwrap().get(pk).cloned()
}
#[cfg(feature = "manifest")]
pub fn apply_manifest(&self, manifest: HostManifest) -> Result<()> {
for actor in manifest.actors {
#[cfg(feature = "gantry")]
self.add_actor_gantry_first(&actor)?;
#[cfg(not(feature = "gantry"))]
self.add_actor(Actor::from_file(&actor)?)?;
}
for cap in manifest.capabilities {
self.add_native_capability(NativeCapability::from_file(cap.path, cap.binding_name)?)?;
}
for config in manifest.bindings {
self.bind_actor(
&config.actor,
&config.capability,
config.binding,
config.values.unwrap_or(HashMap::new()),
)?;
}
Ok(())
}
#[cfg(feature = "gantry")]
fn add_actor_gantry_first(&self, actor: &str) -> Result<()> {
if actor.len() == 56 && actor.starts_with('M') {
self.add_actor_from_gantry(actor)
} else {
self.add_actor(Actor::from_file(&actor)?)
}
}
pub fn actors(&self) -> Vec<SubjectClaimsPair> {
authz::get_all_claims(self.claims.clone())
}
pub fn capabilities(&self) -> HashMap<(String, String), CapabilityDescriptor> {
let lock = self.caps.read().unwrap();
lock.clone()
}
pub fn actors_by_tag(&self, tags: &[&str]) -> Vec<String> {
let mut actors = vec![];
for (actor, claims) in self.claims.read().unwrap().iter() {
if let Some(actor_tags) = claims.metadata.as_ref().and_then(|m| m.tags.as_ref()) {
if tags.iter().all(|&t| actor_tags.contains(&t.to_string())) {
actors.push(actor.to_string())
}
}
}
actors
}
}
pub(crate) fn route_key(binding: &str, id: &str) -> RouteKey {
(binding.to_string(), id.to_string())
}