#![doc(html_logo_url = "https://avatars0.githubusercontent.com/u/52050279?s=200&v=4")]
#[macro_use]
extern crate log;
#[macro_use]
extern crate crossbeam;
mod actor;
mod authz;
mod bus;
mod capability;
mod dispatch;
pub mod errors;
mod extras;
mod inthost;
#[cfg(feature = "manifest")]
mod manifest;
pub mod middleware;
mod plugins;
mod spawns;
pub const VERSION: &str = env!("CARGO_PKG_VERSION");
pub const REVISION: u32 = 2;
pub type Result<T> = std::result::Result<T, errors::Error>;
pub use actor::Actor;
pub use capability::NativeCapability;
pub use inthost::{Invocation, InvocationResponse, WasccEntity};
#[cfg(feature = "manifest")]
pub use manifest::{BindingEntry, HostManifest};
#[cfg(feature = "prometheus_middleware")]
pub use middleware::prometheus;
#[cfg(feature = "lattice")]
use latticeclient::BusEvent;
#[cfg(feature = "lattice")]
use bus::lattice::ControlCommand;
pub use authz::Authorizer;
pub use middleware::Middleware;
pub use wapc::WasiParams;
pub type SubjectClaimsPair = (String, Claims<wascap::jwt::Actor>);
use crate::inthost::fetch_oci_bytes;
use bus::{get_namespace_prefix, MessageBus};
use crossbeam::Sender;
#[cfg(feature = "lattice")]
use crossbeam_channel as channel;
use crossbeam_channel::Receiver;
#[cfg(any(feature = "lattice", feature = "manifest"))]
use inthost::RESTRICTED_LABELS;
use plugins::PluginManager;
use std::path::Path;
use std::str::FromStr;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use wascap::jwt::Claims;
use wascap::prelude::KeyPair;
use wascc_codec::{
capabilities::CapabilityDescriptor,
core::{CapabilityConfiguration, OP_BIND_ACTOR},
serialize, SYSTEM_ACTOR,
};
type BindingsList = HashMap<BindingTuple, CapabilityConfiguration>;
type BindingTuple = (String, String, String);
#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Clone)]
pub(crate) struct RouteKey {
pub binding_name: String,
pub capid: String,
}
impl RouteKey {
pub fn new(binding_name: &str, capid: &str) -> RouteKey {
RouteKey {
binding_name: binding_name.to_string(),
capid: capid.to_string(),
}
}
}
pub struct HostBuilder {
labels: HashMap<String, String>,
ns: Option<String>,
authorizer: Box<dyn Authorizer + 'static>,
}
impl HostBuilder {
pub fn new() -> HostBuilder {
let b = HostBuilder {
labels: inthost::detect_core_host_labels(),
ns: get_namespace_prefix(),
authorizer: Box::new(authz::DefaultAuthorizer::new()),
};
b
}
#[cfg(feature = "lattice")]
pub fn with_lattice_namespace(self, ns: &str) -> HostBuilder {
if !ns.chars().all(char::is_alphanumeric) {
panic!("Cannot use a non-alphanumeric lattice namespace name");
}
HostBuilder {
ns: Some(ns.to_lowercase().to_string()),
..self
}
}
pub fn with_authorizer(self, authorizer: impl Authorizer + 'static) -> HostBuilder {
HostBuilder {
authorizer: Box::new(authorizer),
..self
}
}
pub fn with_label(self, key: &str, value: &str) -> HostBuilder {
let mut hm = self.labels.clone();
if !hm.contains_key(key) {
hm.insert(key.to_string(), value.to_string());
}
HostBuilder { labels: hm, ..self }
}
pub fn build(self) -> Host {
#[cfg(not(feature = "lattice"))]
let h = Host::generate(self.authorizer, self.labels, self.ns.clone());
#[cfg(feature = "lattice")]
let h = Host::generate(self.authorizer, self.labels, self.ns.clone());
h
}
}
#[derive(Clone)]
pub struct Host {
bus: Arc<MessageBus>,
claims: Arc<RwLock<HashMap<String, Claims<wascap::jwt::Actor>>>>,
plugins: Arc<RwLock<PluginManager>>,
bindings: Arc<RwLock<BindingsList>>,
caps: Arc<RwLock<HashMap<RouteKey, CapabilityDescriptor>>>,
middlewares: Arc<RwLock<Vec<Box<dyn Middleware>>>>,
terminators: Arc<RwLock<HashMap<String, Sender<bool>>>>,
pk: String,
sk: String,
authorizer: Arc<RwLock<Box<dyn Authorizer>>>,
labels: Arc<RwLock<HashMap<String, String>>>,
image_map: Arc<RwLock<HashMap<String, String>>>,
ns: Option<String>,
}
impl Host {
pub fn new() -> Self {
let h = Self::generate(
Box::new(authz::DefaultAuthorizer::new()),
inthost::detect_core_host_labels(),
get_namespace_prefix(),
);
h
}
pub(crate) fn generate(
authz: Box<dyn Authorizer + 'static>,
labels: HashMap<String, String>,
ns: Option<String>,
) -> Self {
let key = KeyPair::new_server();
let claims = Arc::new(RwLock::new(HashMap::new()));
let caps = Arc::new(RwLock::new(HashMap::new()));
let bindings = Arc::new(RwLock::new(HashMap::new()));
let labels = Arc::new(RwLock::new(labels));
let terminators = Arc::new(RwLock::new(HashMap::new()));
let authz = Arc::new(RwLock::new(authz));
let image_map = Arc::new(RwLock::new(HashMap::new()));
#[cfg(feature = "lattice")]
let (com_s, com_r): (Sender<ControlCommand>, Receiver<ControlCommand>) =
channel::unbounded();
#[cfg(feature = "lattice")]
let bus = Arc::new(bus::new(
key.public_key(),
claims.clone(),
caps.clone(),
bindings.clone(),
labels.clone(),
terminators.clone(),
ns.clone(),
com_s,
authz.clone(),
image_map.clone(),
));
#[cfg(not(feature = "lattice"))]
let bus = Arc::new(bus::new());
#[cfg(feature = "lattice")]
let _ = bus.publish_event(BusEvent::HostStarted(key.public_key()));
let host = Host {
terminators: terminators.clone(),
bus: bus.clone(),
claims: claims.clone(),
plugins: Arc::new(RwLock::new(PluginManager::default())),
bindings,
caps,
middlewares: Arc::new(RwLock::new(vec![])),
pk: key.public_key(),
sk: key.seed().unwrap(),
authorizer: authz,
labels,
ns,
image_map,
};
info!("Host ID is {} (v{})", key.public_key(), VERSION);
host.ensure_extras().unwrap();
#[cfg(feature = "lattice")]
let _ = bus::lattice::spawn_controlplane(&host, com_r);
host
}
fn add_actor_imgref(&self, actor: Actor, imgref: Option<String>) -> Result<()> {
if self
.claims
.read()
.unwrap()
.contains_key(&actor.public_key())
{
return Err(errors::new(errors::ErrorKind::MiscHost(
format!("Actor {} is already in this host. Cannot host multiple instances of the same actor in the same host", actor.public_key())
)));
}
authz::enforce_validation(&actor.token.jwt)?;
if !self.check_auth(&actor.token) {
return Err(errors::new(errors::ErrorKind::Authorization(
"Authorization hook denied access to module".into(),
)));
}
let c = self.claims.clone();
c.write().unwrap().insert(
actor.token.claims.subject.to_string(),
actor.token.claims.clone(),
);
let key = KeyPair::from_seed(&self.sk).unwrap();
let wg = crossbeam_utils::sync::WaitGroup::new();
spawns::spawn_actor(
wg.clone(),
actor.token.claims.clone(),
actor.bytes.clone(),
None,
true,
None,
self.bus.clone(),
self.middlewares.clone(),
self.caps.clone(),
self.bindings.clone(),
c.clone(),
self.terminators.clone(),
key,
self.authorizer.clone(),
self.image_map.clone(),
imgref,
)?;
wg.wait();
if actor.capabilities().contains(&extras::CAPABILITY_ID.into()) {
self.set_binding(
&actor.public_key(),
extras::CAPABILITY_ID,
None,
HashMap::new(),
)?;
}
Ok(())
}
pub fn add_actor(&self, actor: Actor) -> Result<()> {
self.add_actor_imgref(actor, None)
}
pub fn add_actor_from_registry(&self, image: &str) -> Result<()> {
let bytes = inthost::fetch_oci_bytes(image)?;
self.add_actor_imgref(Actor::from_slice(&bytes)?, Some(image.to_string()))?;
Ok(())
}
pub fn add_capability(
&self,
actor: Actor,
binding: Option<&str>,
wasi: WasiParams,
) -> Result<()> {
let binding = binding.unwrap_or("default");
let wg = crossbeam_utils::sync::WaitGroup::new();
let key = KeyPair::from_seed(&self.sk).unwrap();
spawns::spawn_actor(
wg.clone(),
actor.token.claims,
actor.bytes.clone(),
Some(wasi),
false,
Some(binding.to_string()),
self.bus.clone(),
self.middlewares.clone(),
self.caps.clone(),
self.bindings.clone(),
self.claims.clone(),
self.terminators.clone(),
key,
self.authorizer.clone(),
self.image_map.clone(),
None,
)?;
wg.wait();
Ok(())
}
pub fn remove_actor(&self, pk: &str) -> Result<()> {
self.terminators.read().unwrap()
[&bus::actor_subject(self.ns.as_ref().map(String::as_str), pk)]
.send(true)
.unwrap();
Ok(())
}
pub fn replace_actor(&self, new_actor: Actor) -> Result<()> {
let key = KeyPair::from_seed(&self.sk).unwrap();
crate::inthost::replace_actor(&key, self.bus.clone(), new_actor)
}
pub fn add_middleware(&self, mid: impl Middleware) {
self.middlewares.write().unwrap().push(Box::new(mid));
}
pub fn add_native_capability(&self, capability: NativeCapability) -> Result<()> {
let capid = capability.id();
if self
.caps
.read()
.unwrap()
.contains_key(&RouteKey::new(&capability.binding_name, &capability.id()))
{
return Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
"Capability provider {} cannot be bound to the same name ({}) twice, loading failed.", capid, capability.binding_name
))));
}
self.caps.write().unwrap().insert(
RouteKey::new(&capability.binding_name, &capability.descriptor.id),
capability.descriptor().clone(),
);
let wg = crossbeam_utils::sync::WaitGroup::new();
let key = KeyPair::from_seed(&self.sk).unwrap();
spawns::spawn_native_capability(
capability,
self.bus.clone(),
self.middlewares.clone(),
self.bindings.clone(),
self.terminators.clone(),
self.plugins.clone(),
wg.clone(),
Arc::new(key),
)?;
wg.wait();
Ok(())
}
pub fn add_native_capability_from_registry(
&self,
image_ref: &str,
binding_name: Option<String>,
) -> Result<()> {
let b = binding_name.unwrap_or("default".to_string());
match crate::inthost::fetch_provider(image_ref, &b, self.labels.clone()) {
Ok((prov, claims)) => {
self.add_native_capability(prov)?;
self.image_map
.write()
.unwrap()
.insert(image_ref.to_string(), claims.subject.to_string());
Ok(())
}
Err(e) => Err(e),
}
}
pub fn remove_native_capability(
&self,
capability_id: &str,
binding_name: Option<String>,
) -> Result<()> {
let b = binding_name.unwrap_or("default".to_string());
let subject =
bus::provider_subject(self.ns.as_ref().map(String::as_str), capability_id, &b);
if let Some(terminator) = self.terminators.read().unwrap().get(&subject) {
terminator.send(true).unwrap();
Ok(())
} else {
Err(errors::new(errors::ErrorKind::MiscHost(
"No such capability".into(),
)))
}
}
pub fn remove_binding(
&self,
actor: &str,
capid: &str,
binding_name: Option<String>,
) -> Result<()> {
let cfg = CapabilityConfiguration {
module: actor.to_string(),
values: HashMap::new(),
};
let buf = serialize(&cfg).unwrap();
let binding = binding_name.unwrap_or("default".to_string());
let key = KeyPair::from_seed(&self.sk).unwrap();
let inv_r = self.bus.invoke(
&self.bus.provider_subject(&capid, &binding),
crate::inthost::gen_remove_actor(&key, buf.clone(), &binding, &capid),
)?;
if let Some(s) = inv_r.error {
Err(format!("Failed to remove binding: {}", s).into())
} else {
Ok(())
}
}
pub fn set_binding(
&self,
actor: &str,
capid: &str,
binding_name: Option<String>,
config: HashMap<String, String>,
) -> Result<()> {
#[cfg(feature = "lattice")]
let claims = self.bus.discover_claims(actor);
#[cfg(not(feature = "lattice"))]
let claims = self.claims.read().unwrap().get(actor).cloned();
let key = KeyPair::from_seed(&self.sk).unwrap();
if claims.is_none() {
return Err(errors::new(errors::ErrorKind::MiscHost(
"Attempted to bind non-existent actor".to_string(),
)));
}
let c = claims.unwrap().clone();
let binding = binding_name.unwrap_or("default".to_string());
if !authz::can_invoke(&c, capid, OP_BIND_ACTOR) {
return Err(errors::new(errors::ErrorKind::Authorization(format!(
"Unauthorized binding: actor {} is not authorized to use capability {}.",
actor, capid
))));
} else {
if !self.authorizer.read().unwrap().can_invoke(
&c,
&WasccEntity::Capability {
capid: capid.to_string(),
binding: binding.to_string(),
},
OP_BIND_ACTOR,
) {
return Err(errors::new(errors::ErrorKind::Authorization(format!(
"Unauthorized binding: actor {} is not authorized to use capability {}.",
actor, capid
))));
}
}
info!(
"Attempting to bind actor {} to {},{}",
actor, &binding, capid
);
let tgt_subject = if (actor == capid || actor == SYSTEM_ACTOR) && capid.starts_with("M") {
bus::actor_subject(self.ns.as_ref().map(String::as_str), actor)
} else {
bus::provider_subject(self.ns.as_ref().map(String::as_str), capid, &binding)
};
trace!("Binding subject: {}", tgt_subject);
let inv = inthost::gen_config_invocation(
&key,
actor,
capid,
c.clone(),
binding.clone(),
config.clone(),
);
match self.bus.invoke(&tgt_subject, inv) {
Ok(inv_r) => {
if let Some(e) = inv_r.error {
Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
"Failed to configure {},{} - {}",
binding, capid, e
))))
} else {
self.record_binding(
actor,
capid,
&binding,
&CapabilityConfiguration {
module: actor.to_string(),
values: config,
},
)?;
#[cfg(feature = "lattice")]
let _ = self.bus.publish_event(BusEvent::ActorBindingCreated {
actor: actor.to_string(),
capid: capid.to_string(),
instance_name: binding.to_string(),
host: self.id(),
});
Ok(())
}
}
Err(e) => Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
"Failed to configure {},{} - {}",
binding, capid, e
)))),
}
}
pub fn call_actor(&self, actor: &str, operation: &str, msg: &[u8]) -> Result<Vec<u8>> {
let key = KeyPair::from_seed(&self.sk).unwrap();
if !self.claims.read().unwrap().contains_key(actor) {
return Err(errors::new(errors::ErrorKind::MiscHost(
"No such actor".into(),
)));
}
let inv = Invocation::new(
&key,
WasccEntity::Actor(SYSTEM_ACTOR.to_string()),
WasccEntity::Actor(actor.to_string()),
operation,
msg.to_vec(),
);
let tgt_subject = bus::actor_subject(self.ns.as_ref().map(String::as_str), actor);
match self.bus.invoke(&tgt_subject, inv) {
Ok(resp) => match resp.error {
Some(e) => Err(format!("Invocation failure: {}", e).into()),
None => Ok(resp.msg),
},
Err(e) => Err(e),
}
}
pub fn claims_for_actor(&self, pk: &str) -> Option<Claims<wascap::jwt::Actor>> {
let c = self.claims.read().unwrap().get(pk).cloned();
c
}
#[cfg(feature = "manifest")]
pub fn apply_manifest(&self, manifest: HostManifest) -> Result<()> {
{
let mut labels = self.labels.write().unwrap();
for (label, label_value) in manifest.labels {
if !RESTRICTED_LABELS.contains(&label.as_ref()) {
labels.insert(label.to_string(), label_value.to_string());
}
}
}
for actor in manifest.actors {
self.add_actor_file_first(&actor)?;
}
for cap in manifest.capabilities {
if Path::new(&cap.path).exists() {
self.add_native_capability(NativeCapability::from_file(
cap.path,
cap.binding_name,
)?)?;
} else {
self.add_native_capability_from_registry(&cap.path, cap.binding_name)?;
}
}
for config in manifest.bindings {
self.set_binding(
&config.actor,
&config.capability,
config.binding,
config.values.unwrap_or(HashMap::new()),
)?;
}
Ok(())
}
fn add_actor_file_first(&self, actor: &str) -> Result<()> {
if std::path::Path::new(actor).exists() {
self.add_actor(Actor::from_file(&actor)?)
} else {
self.add_actor_from_registry(actor)
}
}
pub fn actors(&self) -> Vec<SubjectClaimsPair> {
authz::get_all_claims(self.claims.clone())
}
pub fn capabilities(&self) -> HashMap<(String, String), CapabilityDescriptor> {
let lock = self.caps.read().unwrap();
let mut res = HashMap::new();
for (rk, descriptor) in lock.iter() {
res.insert(
(rk.binding_name.to_string(), rk.capid.to_string()),
descriptor.clone(),
);
}
res
}
pub fn actors_by_tag(&self, tags: &[&str]) -> Vec<String> {
let mut actors = vec![];
for (actor, claims) in self.claims.read().unwrap().iter() {
if let Some(actor_tags) = claims.metadata.as_ref().and_then(|m| m.tags.as_ref()) {
if tags.iter().all(|&t| actor_tags.contains(&t.to_string())) {
actors.push(actor.to_string())
}
}
}
actors
}
pub fn shutdown(&self) -> Result<()> {
{
let lock = self.claims.read().unwrap();
let actors: Vec<_> = lock.values().collect();
for claims in actors {
self.remove_actor(&claims.subject)?;
}
}
let caps = self.capabilities();
for (binding_name, capid) in caps.keys() {
self.remove_native_capability(&capid, Some(binding_name.to_string()))?;
}
self.bus.disconnect();
Ok(())
}
pub fn id(&self) -> String {
self.pk.to_string()
}
}