wascc_host/
lib.rs

1#![doc(html_logo_url = "https://avatars0.githubusercontent.com/u/52050279?s=200&v=4")]
2//! # waSCC Host
3//!
4//! The WebAssembly Secure Capabilities Connector (waSCC) host runtime manages actors
5//! written in WebAssembly (aka _nanoprocesses_) and capability providers written in
6//! WebAssembly (via WASI) or as OS-native plugin libraries. waSCC securely manages
7//! communications between actors and the capabilities they need.
8//!
9//! To start a runtime, simply add actors and capabilities to the host. For more information,
10//! take a look at the documentation and tutorials at [wascc.dev](https://wascc.dev).
11//!
12//! # Example
13//! ```
14//! use std::collections::HashMap;
15//! use wascc_host::{Host, Actor, NativeCapability};
16//!
17//! fn main() -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
18//!    env_logger::init();
19//!    let host = Host::new();
20//!    host.add_actor(Actor::from_file("./examples/.assets/echo.wasm")?)?;
21//!    host.add_actor(Actor::from_file("./examples/.assets/echo2.wasm")?)?;
22//!    host.add_native_capability(NativeCapability::from_file(
23//!        "./examples/.assets/libwascc_httpsrv.so", None
24//!    )?)?;
25//!
26//!    host.set_binding(
27//!        "MDFD7XZ5KBOPLPHQKHJEMPR54XIW6RAG5D7NNKN22NP7NSEWNTJZP7JN",
28//!        "wascc:http_server",
29//!        None,
30//!        generate_port_config(8085),
31//!    )?;
32//!
33//!    host.set_binding(
34//!        "MB4OLDIC3TCZ4Q4TGGOVAZC43VXFE2JQVRAXQMQFXUCREOOFEKOKZTY2",
35//!        "wascc:http_server",
36//!        None,
37//!        generate_port_config(8084),
38//!    )?;
39//!
40//!    assert_eq!(2, host.actors().len());
41//!    if let Some(ref claims) = host.claims_for_actor("MB4OLDIC3TCZ4Q4TGGOVAZC43VXFE2JQVRAXQMQFXUCREOOFEKOKZTY2") {
42//!        let md = claims.metadata.as_ref().unwrap();
43//!        assert!(md.caps.as_ref().unwrap().contains(&"wascc:http_server".to_string()));   
44//!    }
45//!    
46//!
47//! # std::thread::sleep(::std::time::Duration::from_millis(10));
48//!    // Need to keep the main thread from terminating immediately
49//!    // std::thread::park();
50//!
51//!    Ok(())
52//! }
53//!
54//! fn generate_port_config(port: u16) -> HashMap<String, String> {
55//!    let mut hm = HashMap::new();
56//!    hm.insert("PORT".to_string(), port.to_string());
57//!
58//!    hm
59//! }
60//!
61//! ```
62//!
63
64#[macro_use]
65extern crate log;
66
67#[macro_use]
68extern crate crossbeam;
69
70mod actor;
71mod authz;
72mod bus;
73mod capability;
74mod dispatch;
75pub mod errors;
76mod extras;
77mod inthost;
78#[cfg(feature = "manifest")]
79mod manifest;
80pub mod middleware;
81mod plugins;
82mod spawns;
83
84pub const VERSION: &str = env!("CARGO_PKG_VERSION");
85pub const REVISION: u32 = 2;
86
87pub type Result<T> = std::result::Result<T, errors::Error>;
88
89pub use actor::Actor;
90pub use capability::NativeCapability;
91pub use inthost::{Invocation, InvocationResponse, WasccEntity};
92
93#[cfg(feature = "manifest")]
94pub use manifest::{BindingEntry, HostManifest};
95
96#[cfg(feature = "prometheus_middleware")]
97pub use middleware::prometheus;
98
99#[cfg(feature = "lattice")]
100use latticeclient::BusEvent;
101
102#[cfg(feature = "lattice")]
103use bus::lattice::ControlCommand;
104
105pub use authz::Authorizer;
106pub use middleware::Middleware;
107pub use wapc::WasiParams;
108
109pub type SubjectClaimsPair = (String, Claims<wascap::jwt::Actor>);
110
111use crate::inthost::fetch_oci_bytes;
112use bus::{get_namespace_prefix, MessageBus};
113use crossbeam::Sender;
114#[cfg(feature = "lattice")]
115use crossbeam_channel as channel;
116use crossbeam_channel::Receiver;
117#[cfg(any(feature = "lattice", feature = "manifest"))]
118use inthost::RESTRICTED_LABELS;
119use plugins::PluginManager;
120use std::path::Path;
121use std::str::FromStr;
122use std::{
123    collections::HashMap,
124    sync::{Arc, RwLock},
125};
126use wascap::jwt::Claims;
127use wascap::prelude::KeyPair;
128use wascc_codec::{
129    capabilities::CapabilityDescriptor,
130    core::{CapabilityConfiguration, OP_BIND_ACTOR},
131    serialize, SYSTEM_ACTOR,
132};
133
134type BindingsList = HashMap<BindingTuple, CapabilityConfiguration>;
135type BindingTuple = (String, String, String); // (from-actor, to-capid, to-binding-name)
136
137/// A routing key is a combination of a capability ID and the binding name used for
138/// that capability. Think of it as a unique or primary key for a capid+binding.
139#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Clone)]
140pub(crate) struct RouteKey {
141    pub binding_name: String,
142    pub capid: String,
143}
144
145impl RouteKey {
146    pub fn new(binding_name: &str, capid: &str) -> RouteKey {
147        RouteKey {
148            binding_name: binding_name.to_string(),
149            capid: capid.to_string(),
150        }
151    }
152}
153
154/// A builder pattern implementation for creating a custom-configured host runtime
155pub struct HostBuilder {
156    labels: HashMap<String, String>,
157    ns: Option<String>,
158    authorizer: Box<dyn Authorizer + 'static>,
159}
160
161impl HostBuilder {
162    /// Creates a new host builder. This builder will initialize itself with some defaults
163    /// obtained from the environment. The labels list will pre-populate with the `hostcore.*`
164    /// labels, the namespace will be gleaned from the `LATTICE_NAMESPACE` environment variable
165    /// (if lattice mode is enabled), and the default authorizer will be set.
166    pub fn new() -> HostBuilder {
167        let b = HostBuilder {
168            labels: inthost::detect_core_host_labels(),
169            ns: get_namespace_prefix(),
170            authorizer: Box::new(authz::DefaultAuthorizer::new()),
171        };
172
173        b
174    }
175
176    /// Sets the lattice namespace for this host. A lattice namespace is a unit of multi-tenant
177    /// isolation on a network. To reduce the risk of conflicts or subscription failures, the
178    /// lattice namespace should not include any non-alphanumeric characters.
179    #[cfg(feature = "lattice")]
180    pub fn with_lattice_namespace(self, ns: &str) -> HostBuilder {
181        if !ns.chars().all(char::is_alphanumeric) {
182            panic!("Cannot use a non-alphanumeric lattice namespace name");
183        }
184        HostBuilder {
185            ns: Some(ns.to_lowercase().to_string()),
186            ..self
187        }
188    }
189
190    /// Sets a custom authorizer to be used for authorizing actors, capability providers,
191    /// and invocation requests. Note that the authorizer cannot be used to implement _less_
192    /// strict measures than the default authorizer, it can only be used to implement
193    /// _more_ strict rules
194    pub fn with_authorizer(self, authorizer: impl Authorizer + 'static) -> HostBuilder {
195        HostBuilder {
196            authorizer: Box::new(authorizer),
197            ..self
198        }
199    }
200
201    /// Adds an arbitrary label->value pair of metadata to the host. Cannot override
202    /// reserved labels such as those that begin with `hostcore.` Calling this twice
203    /// on the same label will have no effect after the first call.
204    pub fn with_label(self, key: &str, value: &str) -> HostBuilder {
205        let mut hm = self.labels.clone();
206        if !hm.contains_key(key) {
207            hm.insert(key.to_string(), value.to_string());
208        }
209        HostBuilder { labels: hm, ..self }
210    }
211
212    /// Converts the transient builder instance into a realized host runtime instance
213    pub fn build(self) -> Host {
214        #[cfg(not(feature = "lattice"))]
215        let h = Host::generate(self.authorizer, self.labels, self.ns.clone());
216        #[cfg(feature = "lattice")]
217        let h = Host::generate(self.authorizer, self.labels, self.ns.clone());
218        h
219    }
220}
221
222/// Represents an instance of a waSCC host runtime
223#[derive(Clone)]
224pub struct Host {
225    bus: Arc<MessageBus>,
226    claims: Arc<RwLock<HashMap<String, Claims<wascap::jwt::Actor>>>>,
227    plugins: Arc<RwLock<PluginManager>>,
228    bindings: Arc<RwLock<BindingsList>>,
229    caps: Arc<RwLock<HashMap<RouteKey, CapabilityDescriptor>>>,
230    middlewares: Arc<RwLock<Vec<Box<dyn Middleware>>>>,
231    // the key to this field is the subscription subject, and not either a pk or a capid
232    terminators: Arc<RwLock<HashMap<String, Sender<bool>>>>,
233    pk: String,
234    sk: String,
235    authorizer: Arc<RwLock<Box<dyn Authorizer>>>,
236    labels: Arc<RwLock<HashMap<String, String>>>,
237    // mapping between OCI registry image references and the associated unique identity (e.g. "Mxxx" and "Vxxx")
238    image_map: Arc<RwLock<HashMap<String, String>>>,
239    ns: Option<String>,
240}
241
242impl Host {
243    /// Creates a new runtime host using all of the default values. Use the host builder
244    /// if you want to provide more customization options
245    pub fn new() -> Self {
246        let h = Self::generate(
247            Box::new(authz::DefaultAuthorizer::new()),
248            inthost::detect_core_host_labels(),
249            get_namespace_prefix(),
250        );
251        h
252    }
253
254    pub(crate) fn generate(
255        authz: Box<dyn Authorizer + 'static>,
256        labels: HashMap<String, String>,
257        ns: Option<String>,
258    ) -> Self {
259        let key = KeyPair::new_server();
260        let claims = Arc::new(RwLock::new(HashMap::new()));
261        let caps = Arc::new(RwLock::new(HashMap::new()));
262        let bindings = Arc::new(RwLock::new(HashMap::new()));
263        let labels = Arc::new(RwLock::new(labels));
264        let terminators = Arc::new(RwLock::new(HashMap::new()));
265        let authz = Arc::new(RwLock::new(authz));
266        let image_map = Arc::new(RwLock::new(HashMap::new()));
267
268        #[cfg(feature = "lattice")]
269        let (com_s, com_r): (Sender<ControlCommand>, Receiver<ControlCommand>) =
270            channel::unbounded();
271
272        #[cfg(feature = "lattice")]
273        let bus = Arc::new(bus::new(
274            key.public_key(),
275            claims.clone(),
276            caps.clone(),
277            bindings.clone(),
278            labels.clone(),
279            terminators.clone(),
280            ns.clone(),
281            com_s,
282            authz.clone(),
283            image_map.clone(),
284        ));
285
286        #[cfg(not(feature = "lattice"))]
287        let bus = Arc::new(bus::new());
288
289        #[cfg(feature = "lattice")]
290        let _ = bus.publish_event(BusEvent::HostStarted(key.public_key()));
291
292        let host = Host {
293            terminators: terminators.clone(),
294            bus: bus.clone(),
295            claims: claims.clone(),
296            plugins: Arc::new(RwLock::new(PluginManager::default())),
297            bindings,
298            caps,
299            middlewares: Arc::new(RwLock::new(vec![])),
300            pk: key.public_key(),
301            sk: key.seed().unwrap(),
302            authorizer: authz,
303            labels,
304            ns,
305            image_map,
306        };
307
308        info!("Host ID is {} (v{})", key.public_key(), VERSION);
309
310        host.ensure_extras().unwrap();
311
312        #[cfg(feature = "lattice")]
313        let _ = bus::lattice::spawn_controlplane(&host, com_r);
314
315        host
316    }
317
318    fn add_actor_imgref(&self, actor: Actor, imgref: Option<String>) -> Result<()> {
319        if self
320            .claims
321            .read()
322            .unwrap()
323            .contains_key(&actor.public_key())
324        {
325            return Err(errors::new(errors::ErrorKind::MiscHost(
326                format!("Actor {} is already in this host. Cannot host multiple instances of the same actor in the same host", actor.public_key())
327            )));
328        }
329        authz::enforce_validation(&actor.token.jwt)?; // returns an `Err` if validation fails
330        if !self.check_auth(&actor.token) {
331            // invoke the auth hook, if there is one
332            return Err(errors::new(errors::ErrorKind::Authorization(
333                "Authorization hook denied access to module".into(),
334            )));
335        }
336
337        let c = self.claims.clone();
338
339        c.write().unwrap().insert(
340            actor.token.claims.subject.to_string(),
341            actor.token.claims.clone(),
342        );
343
344        let key = KeyPair::from_seed(&self.sk).unwrap();
345        let wg = crossbeam_utils::sync::WaitGroup::new();
346        // Spin up a new thread that listens to "wasmbus.Mxxxx" calls on the message bus
347        spawns::spawn_actor(
348            wg.clone(),
349            actor.token.claims.clone(),
350            actor.bytes.clone(),
351            None,
352            true,
353            None,
354            self.bus.clone(),
355            self.middlewares.clone(),
356            self.caps.clone(),
357            self.bindings.clone(),
358            c.clone(),
359            self.terminators.clone(),
360            key,
361            self.authorizer.clone(),
362            self.image_map.clone(),
363            imgref,
364        )?;
365        wg.wait();
366        if actor.capabilities().contains(&extras::CAPABILITY_ID.into()) {
367            // force a binding so that there's a private actor subject on the bus for the
368            // actor to communicate with the extras provider
369            self.set_binding(
370                &actor.public_key(),
371                extras::CAPABILITY_ID,
372                None,
373                HashMap::new(),
374            )?;
375        }
376
377        Ok(())
378    }
379
380    /// Adds an actor to the host. This will provision resources (such as a handler thread) for the actor. Actors
381    /// will not be able to make use of capability providers unless bindings are added (or existed prior to the actor
382    /// being added to a host, which is possible in `lattice` mode)
383    pub fn add_actor(&self, actor: Actor) -> Result<()> {
384        self.add_actor_imgref(actor, None)
385    }
386
387    /// Adds an actor to the host by attempting to retrieve it from an OCI
388    /// registry. This function takes an image reference as an argument, e.g.
389    /// myregistry.mycloud.io/actor:v1
390    /// If OCI credentials are supplied in environment variables, those will be used.
391    pub fn add_actor_from_registry(&self, image: &str) -> Result<()> {
392        let bytes = inthost::fetch_oci_bytes(image)?;
393
394        self.add_actor_imgref(Actor::from_slice(&bytes)?, Some(image.to_string()))?;
395        Ok(())
396    }
397
398    /// Adds a portable capability provider (e.g. a WASI actor) to the waSCC host. Portable capability providers adhere
399    /// to the same contract as native capability providers, but they are implemented as "high-privilege WASM" modules
400    /// via WASI. Today, there is very little a WASI-based capability provider can do, but in the near future when
401    /// WASI gets a standardized networking stack, more providers can be written as portable modules.
402    pub fn add_capability(
403        &self,
404        actor: Actor,
405        binding: Option<&str>,
406        wasi: WasiParams,
407    ) -> Result<()> {
408        let binding = binding.unwrap_or("default");
409
410        let wg = crossbeam_utils::sync::WaitGroup::new();
411        let key = KeyPair::from_seed(&self.sk).unwrap();
412        // Spins up a new thread subscribed to the "wasmbus.{capid}.{binding}" subject
413        spawns::spawn_actor(
414            wg.clone(),
415            actor.token.claims,
416            actor.bytes.clone(),
417            Some(wasi),
418            false,
419            Some(binding.to_string()),
420            self.bus.clone(),
421            self.middlewares.clone(),
422            self.caps.clone(),
423            self.bindings.clone(),
424            self.claims.clone(),
425            self.terminators.clone(),
426            key,
427            self.authorizer.clone(),
428            self.image_map.clone(),
429            None,
430        )?;
431        wg.wait();
432        Ok(())
433    }
434
435    /// Removes an actor from the host. Notifies the actor's processing thread to terminate,
436    /// which will in turn attempt to unbind that actor from all previously bound capability providers
437    /// (in lattice mode, this unbinding only takes place if the actor is the last instance of its
438    /// kind in the lattice)
439    pub fn remove_actor(&self, pk: &str) -> Result<()> {
440        self.terminators.read().unwrap()
441            [&bus::actor_subject(self.ns.as_ref().map(String::as_str), pk)]
442            .send(true)
443            .unwrap();
444        Ok(())
445    }
446
447    /// Replaces one running actor with another live actor with no message loss. Note that
448    /// the time it takes to perform this replacement can cause pending messages from capability
449    /// providers (e.g. messages from subscriptions or HTTP requests) to build up in a backlog,
450    /// so make sure the new actor can handle this stream of these delayed messages. Also ensure that
451    /// the underlying WebAssembly driver (chosen via feature flag) supports hot-swapping module bytes.
452    pub fn replace_actor(&self, new_actor: Actor) -> Result<()> {
453        let key = KeyPair::from_seed(&self.sk).unwrap();
454        crate::inthost::replace_actor(&key, self.bus.clone(), new_actor)
455    }
456
457    /// Adds a middleware item to the middleware processing pipeline
458    pub fn add_middleware(&self, mid: impl Middleware) {
459        self.middlewares.write().unwrap().push(Box::new(mid));
460    }
461
462    /// Adds a native capability provider plugin to the host runtime. If running in lattice mode,
463    /// and at least one other instance of this same capability provider is running with previous
464    /// bindings, then the provider being added to this host will automatically reconstitute
465    /// the binding configuration. Note that because these capabilities are native,
466    /// cross-platform support is not always guaranteed.
467    pub fn add_native_capability(&self, capability: NativeCapability) -> Result<()> {
468        let capid = capability.id();
469        if self
470            .caps
471            .read()
472            .unwrap()
473            .contains_key(&RouteKey::new(&capability.binding_name, &capability.id()))
474        {
475            return Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
476                "Capability provider {} cannot be bound to the same name ({}) twice, loading failed.", capid, capability.binding_name
477            ))));
478        }
479        self.caps.write().unwrap().insert(
480            RouteKey::new(&capability.binding_name, &capability.descriptor.id),
481            capability.descriptor().clone(),
482        );
483        let wg = crossbeam_utils::sync::WaitGroup::new();
484        let key = KeyPair::from_seed(&self.sk).unwrap();
485        spawns::spawn_native_capability(
486            capability,
487            self.bus.clone(),
488            self.middlewares.clone(),
489            self.bindings.clone(),
490            self.terminators.clone(),
491            self.plugins.clone(),
492            wg.clone(),
493            Arc::new(key),
494        )?;
495        wg.wait();
496        Ok(())
497    }
498
499    /// Adds a native capability provider plugin to the host runtime by pulling the library from a provider archive
500    /// stored in an OCI-compliant registry. This file will be stored in the operating system's designated temporary
501    /// directory after being downloaded.
502    pub fn add_native_capability_from_registry(
503        &self,
504        image_ref: &str,
505        binding_name: Option<String>,
506    ) -> Result<()> {
507        let b = binding_name.unwrap_or("default".to_string());
508        match crate::inthost::fetch_provider(image_ref, &b, self.labels.clone()) {
509            Ok((prov, claims)) => {
510                self.add_native_capability(prov)?;
511                // Only write to the image map if the above add function succeeds
512                self.image_map
513                    .write()
514                    .unwrap()
515                    .insert(image_ref.to_string(), claims.subject.to_string());
516                Ok(())
517            }
518            Err(e) => Err(e),
519        }
520    }
521
522    /// Removes a native capability provider plugin from the waSCC runtime
523    pub fn remove_native_capability(
524        &self,
525        capability_id: &str,
526        binding_name: Option<String>,
527    ) -> Result<()> {
528        let b = binding_name.unwrap_or("default".to_string());
529        let subject =
530            bus::provider_subject(self.ns.as_ref().map(String::as_str), capability_id, &b);
531        if let Some(terminator) = self.terminators.read().unwrap().get(&subject) {
532            terminator.send(true).unwrap();
533            Ok(())
534        } else {
535            Err(errors::new(errors::ErrorKind::MiscHost(
536                "No such capability".into(),
537            )))
538        }
539    }
540
541    /// Removes a binding between an actor and the indicated capability provider. In lattice mode,
542    /// this operation has a _lattice global_ scope, and so all running instances of the indicated
543    /// capability provider will be asked to dispose of any resources provisioned for the given
544    /// actor.
545    pub fn remove_binding(
546        &self,
547        actor: &str,
548        capid: &str,
549        binding_name: Option<String>,
550    ) -> Result<()> {
551        let cfg = CapabilityConfiguration {
552            module: actor.to_string(),
553            values: HashMap::new(),
554        };
555        let buf = serialize(&cfg).unwrap();
556        let binding = binding_name.unwrap_or("default".to_string());
557        let key = KeyPair::from_seed(&self.sk).unwrap();
558        let inv_r = self.bus.invoke(
559            &self.bus.provider_subject(&capid, &binding), // The OP_REMOVE_ACTOR invocation should go to _all_ instances of the provider being unbound
560            crate::inthost::gen_remove_actor(&key, buf.clone(), &binding, &capid),
561        )?;
562        if let Some(s) = inv_r.error {
563            Err(format!("Failed to remove binding: {}", s).into())
564        } else {
565            Ok(())
566        }
567    }
568
569    /// Binds an actor to a capability provider with a given configuration. If the binding name
570    /// is `None` then the default binding name will be used (`default`). An actor can only have one named
571    /// binding per capability provider. In lattice mode, the call to this function has a _lattice global_
572    /// scope, and so all running instances of the indicated provider will be notified and provision
573    /// resources accordingly. For example, if you create a binding between an actor and an HTTP server
574    /// provider, and there are four instances of that provider running in the lattice, each of those
575    /// four hosts will start an HTTP server on the indicated port.
576    pub fn set_binding(
577        &self,
578        actor: &str,
579        capid: &str,
580        binding_name: Option<String>,
581        config: HashMap<String, String>,
582    ) -> Result<()> {
583        #[cfg(feature = "lattice")]
584        let claims = self.bus.discover_claims(actor);
585        #[cfg(not(feature = "lattice"))]
586        let claims = self.claims.read().unwrap().get(actor).cloned();
587
588        let key = KeyPair::from_seed(&self.sk).unwrap();
589
590        if claims.is_none() {
591            return Err(errors::new(errors::ErrorKind::MiscHost(
592                "Attempted to bind non-existent actor".to_string(),
593            )));
594        }
595        let c = claims.unwrap().clone();
596        let binding = binding_name.unwrap_or("default".to_string());
597        if !authz::can_invoke(&c, capid, OP_BIND_ACTOR) {
598            return Err(errors::new(errors::ErrorKind::Authorization(format!(
599                "Unauthorized binding: actor {} is not authorized to use capability {}.",
600                actor, capid
601            ))));
602        } else {
603            if !self.authorizer.read().unwrap().can_invoke(
604                &c,
605                &WasccEntity::Capability {
606                    capid: capid.to_string(),
607                    binding: binding.to_string(),
608                },
609                OP_BIND_ACTOR,
610            ) {
611                return Err(errors::new(errors::ErrorKind::Authorization(format!(
612                    "Unauthorized binding: actor {} is not authorized to use capability {}.",
613                    actor, capid
614                ))));
615            }
616        }
617
618        info!(
619            "Attempting to bind actor {} to {},{}",
620            actor, &binding, capid
621        );
622
623        let tgt_subject = if (actor == capid || actor == SYSTEM_ACTOR) && capid.starts_with("M") {
624            // manually injected actor configuration
625            bus::actor_subject(self.ns.as_ref().map(String::as_str), actor)
626        } else {
627            bus::provider_subject(self.ns.as_ref().map(String::as_str), capid, &binding)
628        };
629        trace!("Binding subject: {}", tgt_subject);
630        let inv = inthost::gen_config_invocation(
631            &key,
632            actor,
633            capid,
634            c.clone(),
635            binding.clone(),
636            config.clone(),
637        );
638        match self.bus.invoke(&tgt_subject, inv) {
639            Ok(inv_r) => {
640                if let Some(e) = inv_r.error {
641                    Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
642                        "Failed to configure {},{} - {}",
643                        binding, capid, e
644                    ))))
645                } else {
646                    self.record_binding(
647                        actor,
648                        capid,
649                        &binding,
650                        &CapabilityConfiguration {
651                            module: actor.to_string(),
652                            values: config,
653                        },
654                    )?;
655                    #[cfg(feature = "lattice")]
656                    let _ = self.bus.publish_event(BusEvent::ActorBindingCreated {
657                        actor: actor.to_string(),
658                        capid: capid.to_string(),
659                        instance_name: binding.to_string(),
660                        host: self.id(),
661                    });
662                    Ok(())
663                }
664            }
665            Err(e) => Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
666                "Failed to configure {},{} - {}",
667                binding, capid, e
668            )))),
669        }
670    }
671
672    /// Invoke an operation handler on an actor directly. The caller is responsible for
673    /// knowing ahead of time if the given actor supports the specified operation. In lattice
674    /// mode, this call will still only attempt a _local_ invocation on the host and will not
675    /// make a lattice-wide call. If you want to make lattice-wide invocations, please use
676    /// the lattice client library.
677    pub fn call_actor(&self, actor: &str, operation: &str, msg: &[u8]) -> Result<Vec<u8>> {
678        let key = KeyPair::from_seed(&self.sk).unwrap();
679        if !self.claims.read().unwrap().contains_key(actor) {
680            return Err(errors::new(errors::ErrorKind::MiscHost(
681                "No such actor".into(),
682            )));
683        }
684        let inv = Invocation::new(
685            &key,
686            WasccEntity::Actor(SYSTEM_ACTOR.to_string()),
687            WasccEntity::Actor(actor.to_string()),
688            operation,
689            msg.to_vec(),
690        );
691        let tgt_subject = bus::actor_subject(self.ns.as_ref().map(String::as_str), actor);
692        match self.bus.invoke(&tgt_subject, inv) {
693            Ok(resp) => match resp.error {
694                Some(e) => Err(format!("Invocation failure: {}", e).into()),
695                None => Ok(resp.msg),
696            },
697            Err(e) => Err(e),
698        }
699    }
700
701    /// Returns the full set of JWT claims for a given actor, if that actor is running in the host. This
702    /// call will not query other hosts in the lattice if lattice mode is enabled.
703    pub fn claims_for_actor(&self, pk: &str) -> Option<Claims<wascap::jwt::Actor>> {
704        let c = self.claims.read().unwrap().get(pk).cloned();
705
706        c
707    }
708
709    /// Applies a manifest JSON or YAML file to set up a host's actors, capability providers,
710    /// and actor bindings
711    #[cfg(feature = "manifest")]
712    pub fn apply_manifest(&self, manifest: HostManifest) -> Result<()> {
713        {
714            let mut labels = self.labels.write().unwrap();
715            for (label, label_value) in manifest.labels {
716                if !RESTRICTED_LABELS.contains(&label.as_ref()) {
717                    labels.insert(label.to_string(), label_value.to_string());
718                }
719            }
720        }
721        for actor in manifest.actors {
722            self.add_actor_file_first(&actor)?; // If file, add .wasm, otherwise assume it's an OCI ref
723        }
724        for cap in manifest.capabilities {
725            // for now, supports only file paths
726            if Path::new(&cap.path).exists() {
727                self.add_native_capability(NativeCapability::from_file(
728                    cap.path,
729                    cap.binding_name,
730                )?)?;
731            } else {
732                self.add_native_capability_from_registry(&cap.path, cap.binding_name)?;
733            }
734        }
735        for config in manifest.bindings {
736            self.set_binding(
737                &config.actor,
738                &config.capability,
739                config.binding,
740                config.values.unwrap_or(HashMap::new()),
741            )?;
742        }
743        Ok(())
744    }
745
746    fn add_actor_file_first(&self, actor: &str) -> Result<()> {
747        if std::path::Path::new(actor).exists() {
748            self.add_actor(Actor::from_file(&actor)?)
749        } else {
750            self.add_actor_from_registry(actor)
751        }
752    }
753
754    /// Returns the list of actors registered in the host. Even if lattice mode is enabled, this function
755    /// will only return the list of actors in this specific host
756    pub fn actors(&self) -> Vec<SubjectClaimsPair> {
757        authz::get_all_claims(self.claims.clone())
758    }
759
760    /// Returns the list of capability providers registered in the host. The key is a tuple of (binding, capability ID)
761    pub fn capabilities(&self) -> HashMap<(String, String), CapabilityDescriptor> {
762        let lock = self.caps.read().unwrap();
763        let mut res = HashMap::new();
764        for (rk, descriptor) in lock.iter() {
765            res.insert(
766                (rk.binding_name.to_string(), rk.capid.to_string()),
767                descriptor.clone(),
768            );
769        }
770        res
771    }
772
773    /// Returns the list of actors in the host that contain all of the tags in the
774    /// supplied parameter. This function will not make a lattice-wide tag query
775    pub fn actors_by_tag(&self, tags: &[&str]) -> Vec<String> {
776        let mut actors = vec![];
777
778        for (actor, claims) in self.claims.read().unwrap().iter() {
779            if let Some(actor_tags) = claims.metadata.as_ref().and_then(|m| m.tags.as_ref()) {
780                if tags.iter().all(|&t| actor_tags.contains(&t.to_string())) {
781                    actors.push(actor.to_string())
782                }
783            }
784        }
785
786        actors
787    }
788
789    /// Attempts to perform a graceful shutdown of the host by removing all actors in
790    /// the host and then removing all capability providers. This function is not guaranteed to
791    /// block and wait for the shutdown to finish
792    pub fn shutdown(&self) -> Result<()> {
793        {
794            let lock = self.claims.read().unwrap();
795            let actors: Vec<_> = lock.values().collect();
796            for claims in actors {
797                self.remove_actor(&claims.subject)?;
798            }
799        }
800        let caps = self.capabilities();
801        for (binding_name, capid) in caps.keys() {
802            self.remove_native_capability(&capid, Some(binding_name.to_string()))?;
803        }
804        self.bus.disconnect();
805        Ok(())
806    }
807
808    /// Returns the public key of the host
809    pub fn id(&self) -> String {
810        self.pk.to_string()
811    }
812}