1#![doc(html_logo_url = "https://avatars0.githubusercontent.com/u/52050279?s=200&v=4")]
2#[macro_use]
65extern crate log;
66
67#[macro_use]
68extern crate crossbeam;
69
70mod actor;
71mod authz;
72mod bus;
73mod capability;
74mod dispatch;
75pub mod errors;
76mod extras;
77mod inthost;
78#[cfg(feature = "manifest")]
79mod manifest;
80pub mod middleware;
81mod plugins;
82mod spawns;
83
84pub const VERSION: &str = env!("CARGO_PKG_VERSION");
85pub const REVISION: u32 = 2;
86
87pub type Result<T> = std::result::Result<T, errors::Error>;
88
89pub use actor::Actor;
90pub use capability::NativeCapability;
91pub use inthost::{Invocation, InvocationResponse, WasccEntity};
92
93#[cfg(feature = "manifest")]
94pub use manifest::{BindingEntry, HostManifest};
95
96#[cfg(feature = "prometheus_middleware")]
97pub use middleware::prometheus;
98
99#[cfg(feature = "lattice")]
100use latticeclient::BusEvent;
101
102#[cfg(feature = "lattice")]
103use bus::lattice::ControlCommand;
104
105pub use authz::Authorizer;
106pub use middleware::Middleware;
107pub use wapc::WasiParams;
108
109pub type SubjectClaimsPair = (String, Claims<wascap::jwt::Actor>);
110
111use crate::inthost::fetch_oci_bytes;
112use bus::{get_namespace_prefix, MessageBus};
113use crossbeam::Sender;
114#[cfg(feature = "lattice")]
115use crossbeam_channel as channel;
116use crossbeam_channel::Receiver;
117#[cfg(any(feature = "lattice", feature = "manifest"))]
118use inthost::RESTRICTED_LABELS;
119use plugins::PluginManager;
120use std::path::Path;
121use std::str::FromStr;
122use std::{
123 collections::HashMap,
124 sync::{Arc, RwLock},
125};
126use wascap::jwt::Claims;
127use wascap::prelude::KeyPair;
128use wascc_codec::{
129 capabilities::CapabilityDescriptor,
130 core::{CapabilityConfiguration, OP_BIND_ACTOR},
131 serialize, SYSTEM_ACTOR,
132};
133
134type BindingsList = HashMap<BindingTuple, CapabilityConfiguration>;
135type BindingTuple = (String, String, String); #[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Clone)]
140pub(crate) struct RouteKey {
141 pub binding_name: String,
142 pub capid: String,
143}
144
145impl RouteKey {
146 pub fn new(binding_name: &str, capid: &str) -> RouteKey {
147 RouteKey {
148 binding_name: binding_name.to_string(),
149 capid: capid.to_string(),
150 }
151 }
152}
153
154pub struct HostBuilder {
156 labels: HashMap<String, String>,
157 ns: Option<String>,
158 authorizer: Box<dyn Authorizer + 'static>,
159}
160
161impl HostBuilder {
162 pub fn new() -> HostBuilder {
167 let b = HostBuilder {
168 labels: inthost::detect_core_host_labels(),
169 ns: get_namespace_prefix(),
170 authorizer: Box::new(authz::DefaultAuthorizer::new()),
171 };
172
173 b
174 }
175
176 #[cfg(feature = "lattice")]
180 pub fn with_lattice_namespace(self, ns: &str) -> HostBuilder {
181 if !ns.chars().all(char::is_alphanumeric) {
182 panic!("Cannot use a non-alphanumeric lattice namespace name");
183 }
184 HostBuilder {
185 ns: Some(ns.to_lowercase().to_string()),
186 ..self
187 }
188 }
189
190 pub fn with_authorizer(self, authorizer: impl Authorizer + 'static) -> HostBuilder {
195 HostBuilder {
196 authorizer: Box::new(authorizer),
197 ..self
198 }
199 }
200
201 pub fn with_label(self, key: &str, value: &str) -> HostBuilder {
205 let mut hm = self.labels.clone();
206 if !hm.contains_key(key) {
207 hm.insert(key.to_string(), value.to_string());
208 }
209 HostBuilder { labels: hm, ..self }
210 }
211
212 pub fn build(self) -> Host {
214 #[cfg(not(feature = "lattice"))]
215 let h = Host::generate(self.authorizer, self.labels, self.ns.clone());
216 #[cfg(feature = "lattice")]
217 let h = Host::generate(self.authorizer, self.labels, self.ns.clone());
218 h
219 }
220}
221
222#[derive(Clone)]
224pub struct Host {
225 bus: Arc<MessageBus>,
226 claims: Arc<RwLock<HashMap<String, Claims<wascap::jwt::Actor>>>>,
227 plugins: Arc<RwLock<PluginManager>>,
228 bindings: Arc<RwLock<BindingsList>>,
229 caps: Arc<RwLock<HashMap<RouteKey, CapabilityDescriptor>>>,
230 middlewares: Arc<RwLock<Vec<Box<dyn Middleware>>>>,
231 terminators: Arc<RwLock<HashMap<String, Sender<bool>>>>,
233 pk: String,
234 sk: String,
235 authorizer: Arc<RwLock<Box<dyn Authorizer>>>,
236 labels: Arc<RwLock<HashMap<String, String>>>,
237 image_map: Arc<RwLock<HashMap<String, String>>>,
239 ns: Option<String>,
240}
241
242impl Host {
243 pub fn new() -> Self {
246 let h = Self::generate(
247 Box::new(authz::DefaultAuthorizer::new()),
248 inthost::detect_core_host_labels(),
249 get_namespace_prefix(),
250 );
251 h
252 }
253
254 pub(crate) fn generate(
255 authz: Box<dyn Authorizer + 'static>,
256 labels: HashMap<String, String>,
257 ns: Option<String>,
258 ) -> Self {
259 let key = KeyPair::new_server();
260 let claims = Arc::new(RwLock::new(HashMap::new()));
261 let caps = Arc::new(RwLock::new(HashMap::new()));
262 let bindings = Arc::new(RwLock::new(HashMap::new()));
263 let labels = Arc::new(RwLock::new(labels));
264 let terminators = Arc::new(RwLock::new(HashMap::new()));
265 let authz = Arc::new(RwLock::new(authz));
266 let image_map = Arc::new(RwLock::new(HashMap::new()));
267
268 #[cfg(feature = "lattice")]
269 let (com_s, com_r): (Sender<ControlCommand>, Receiver<ControlCommand>) =
270 channel::unbounded();
271
272 #[cfg(feature = "lattice")]
273 let bus = Arc::new(bus::new(
274 key.public_key(),
275 claims.clone(),
276 caps.clone(),
277 bindings.clone(),
278 labels.clone(),
279 terminators.clone(),
280 ns.clone(),
281 com_s,
282 authz.clone(),
283 image_map.clone(),
284 ));
285
286 #[cfg(not(feature = "lattice"))]
287 let bus = Arc::new(bus::new());
288
289 #[cfg(feature = "lattice")]
290 let _ = bus.publish_event(BusEvent::HostStarted(key.public_key()));
291
292 let host = Host {
293 terminators: terminators.clone(),
294 bus: bus.clone(),
295 claims: claims.clone(),
296 plugins: Arc::new(RwLock::new(PluginManager::default())),
297 bindings,
298 caps,
299 middlewares: Arc::new(RwLock::new(vec![])),
300 pk: key.public_key(),
301 sk: key.seed().unwrap(),
302 authorizer: authz,
303 labels,
304 ns,
305 image_map,
306 };
307
308 info!("Host ID is {} (v{})", key.public_key(), VERSION);
309
310 host.ensure_extras().unwrap();
311
312 #[cfg(feature = "lattice")]
313 let _ = bus::lattice::spawn_controlplane(&host, com_r);
314
315 host
316 }
317
318 fn add_actor_imgref(&self, actor: Actor, imgref: Option<String>) -> Result<()> {
319 if self
320 .claims
321 .read()
322 .unwrap()
323 .contains_key(&actor.public_key())
324 {
325 return Err(errors::new(errors::ErrorKind::MiscHost(
326 format!("Actor {} is already in this host. Cannot host multiple instances of the same actor in the same host", actor.public_key())
327 )));
328 }
329 authz::enforce_validation(&actor.token.jwt)?; if !self.check_auth(&actor.token) {
331 return Err(errors::new(errors::ErrorKind::Authorization(
333 "Authorization hook denied access to module".into(),
334 )));
335 }
336
337 let c = self.claims.clone();
338
339 c.write().unwrap().insert(
340 actor.token.claims.subject.to_string(),
341 actor.token.claims.clone(),
342 );
343
344 let key = KeyPair::from_seed(&self.sk).unwrap();
345 let wg = crossbeam_utils::sync::WaitGroup::new();
346 spawns::spawn_actor(
348 wg.clone(),
349 actor.token.claims.clone(),
350 actor.bytes.clone(),
351 None,
352 true,
353 None,
354 self.bus.clone(),
355 self.middlewares.clone(),
356 self.caps.clone(),
357 self.bindings.clone(),
358 c.clone(),
359 self.terminators.clone(),
360 key,
361 self.authorizer.clone(),
362 self.image_map.clone(),
363 imgref,
364 )?;
365 wg.wait();
366 if actor.capabilities().contains(&extras::CAPABILITY_ID.into()) {
367 self.set_binding(
370 &actor.public_key(),
371 extras::CAPABILITY_ID,
372 None,
373 HashMap::new(),
374 )?;
375 }
376
377 Ok(())
378 }
379
380 pub fn add_actor(&self, actor: Actor) -> Result<()> {
384 self.add_actor_imgref(actor, None)
385 }
386
387 pub fn add_actor_from_registry(&self, image: &str) -> Result<()> {
392 let bytes = inthost::fetch_oci_bytes(image)?;
393
394 self.add_actor_imgref(Actor::from_slice(&bytes)?, Some(image.to_string()))?;
395 Ok(())
396 }
397
398 pub fn add_capability(
403 &self,
404 actor: Actor,
405 binding: Option<&str>,
406 wasi: WasiParams,
407 ) -> Result<()> {
408 let binding = binding.unwrap_or("default");
409
410 let wg = crossbeam_utils::sync::WaitGroup::new();
411 let key = KeyPair::from_seed(&self.sk).unwrap();
412 spawns::spawn_actor(
414 wg.clone(),
415 actor.token.claims,
416 actor.bytes.clone(),
417 Some(wasi),
418 false,
419 Some(binding.to_string()),
420 self.bus.clone(),
421 self.middlewares.clone(),
422 self.caps.clone(),
423 self.bindings.clone(),
424 self.claims.clone(),
425 self.terminators.clone(),
426 key,
427 self.authorizer.clone(),
428 self.image_map.clone(),
429 None,
430 )?;
431 wg.wait();
432 Ok(())
433 }
434
435 pub fn remove_actor(&self, pk: &str) -> Result<()> {
440 self.terminators.read().unwrap()
441 [&bus::actor_subject(self.ns.as_ref().map(String::as_str), pk)]
442 .send(true)
443 .unwrap();
444 Ok(())
445 }
446
447 pub fn replace_actor(&self, new_actor: Actor) -> Result<()> {
453 let key = KeyPair::from_seed(&self.sk).unwrap();
454 crate::inthost::replace_actor(&key, self.bus.clone(), new_actor)
455 }
456
457 pub fn add_middleware(&self, mid: impl Middleware) {
459 self.middlewares.write().unwrap().push(Box::new(mid));
460 }
461
462 pub fn add_native_capability(&self, capability: NativeCapability) -> Result<()> {
468 let capid = capability.id();
469 if self
470 .caps
471 .read()
472 .unwrap()
473 .contains_key(&RouteKey::new(&capability.binding_name, &capability.id()))
474 {
475 return Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
476 "Capability provider {} cannot be bound to the same name ({}) twice, loading failed.", capid, capability.binding_name
477 ))));
478 }
479 self.caps.write().unwrap().insert(
480 RouteKey::new(&capability.binding_name, &capability.descriptor.id),
481 capability.descriptor().clone(),
482 );
483 let wg = crossbeam_utils::sync::WaitGroup::new();
484 let key = KeyPair::from_seed(&self.sk).unwrap();
485 spawns::spawn_native_capability(
486 capability,
487 self.bus.clone(),
488 self.middlewares.clone(),
489 self.bindings.clone(),
490 self.terminators.clone(),
491 self.plugins.clone(),
492 wg.clone(),
493 Arc::new(key),
494 )?;
495 wg.wait();
496 Ok(())
497 }
498
499 pub fn add_native_capability_from_registry(
503 &self,
504 image_ref: &str,
505 binding_name: Option<String>,
506 ) -> Result<()> {
507 let b = binding_name.unwrap_or("default".to_string());
508 match crate::inthost::fetch_provider(image_ref, &b, self.labels.clone()) {
509 Ok((prov, claims)) => {
510 self.add_native_capability(prov)?;
511 self.image_map
513 .write()
514 .unwrap()
515 .insert(image_ref.to_string(), claims.subject.to_string());
516 Ok(())
517 }
518 Err(e) => Err(e),
519 }
520 }
521
522 pub fn remove_native_capability(
524 &self,
525 capability_id: &str,
526 binding_name: Option<String>,
527 ) -> Result<()> {
528 let b = binding_name.unwrap_or("default".to_string());
529 let subject =
530 bus::provider_subject(self.ns.as_ref().map(String::as_str), capability_id, &b);
531 if let Some(terminator) = self.terminators.read().unwrap().get(&subject) {
532 terminator.send(true).unwrap();
533 Ok(())
534 } else {
535 Err(errors::new(errors::ErrorKind::MiscHost(
536 "No such capability".into(),
537 )))
538 }
539 }
540
541 pub fn remove_binding(
546 &self,
547 actor: &str,
548 capid: &str,
549 binding_name: Option<String>,
550 ) -> Result<()> {
551 let cfg = CapabilityConfiguration {
552 module: actor.to_string(),
553 values: HashMap::new(),
554 };
555 let buf = serialize(&cfg).unwrap();
556 let binding = binding_name.unwrap_or("default".to_string());
557 let key = KeyPair::from_seed(&self.sk).unwrap();
558 let inv_r = self.bus.invoke(
559 &self.bus.provider_subject(&capid, &binding), crate::inthost::gen_remove_actor(&key, buf.clone(), &binding, &capid),
561 )?;
562 if let Some(s) = inv_r.error {
563 Err(format!("Failed to remove binding: {}", s).into())
564 } else {
565 Ok(())
566 }
567 }
568
569 pub fn set_binding(
577 &self,
578 actor: &str,
579 capid: &str,
580 binding_name: Option<String>,
581 config: HashMap<String, String>,
582 ) -> Result<()> {
583 #[cfg(feature = "lattice")]
584 let claims = self.bus.discover_claims(actor);
585 #[cfg(not(feature = "lattice"))]
586 let claims = self.claims.read().unwrap().get(actor).cloned();
587
588 let key = KeyPair::from_seed(&self.sk).unwrap();
589
590 if claims.is_none() {
591 return Err(errors::new(errors::ErrorKind::MiscHost(
592 "Attempted to bind non-existent actor".to_string(),
593 )));
594 }
595 let c = claims.unwrap().clone();
596 let binding = binding_name.unwrap_or("default".to_string());
597 if !authz::can_invoke(&c, capid, OP_BIND_ACTOR) {
598 return Err(errors::new(errors::ErrorKind::Authorization(format!(
599 "Unauthorized binding: actor {} is not authorized to use capability {}.",
600 actor, capid
601 ))));
602 } else {
603 if !self.authorizer.read().unwrap().can_invoke(
604 &c,
605 &WasccEntity::Capability {
606 capid: capid.to_string(),
607 binding: binding.to_string(),
608 },
609 OP_BIND_ACTOR,
610 ) {
611 return Err(errors::new(errors::ErrorKind::Authorization(format!(
612 "Unauthorized binding: actor {} is not authorized to use capability {}.",
613 actor, capid
614 ))));
615 }
616 }
617
618 info!(
619 "Attempting to bind actor {} to {},{}",
620 actor, &binding, capid
621 );
622
623 let tgt_subject = if (actor == capid || actor == SYSTEM_ACTOR) && capid.starts_with("M") {
624 bus::actor_subject(self.ns.as_ref().map(String::as_str), actor)
626 } else {
627 bus::provider_subject(self.ns.as_ref().map(String::as_str), capid, &binding)
628 };
629 trace!("Binding subject: {}", tgt_subject);
630 let inv = inthost::gen_config_invocation(
631 &key,
632 actor,
633 capid,
634 c.clone(),
635 binding.clone(),
636 config.clone(),
637 );
638 match self.bus.invoke(&tgt_subject, inv) {
639 Ok(inv_r) => {
640 if let Some(e) = inv_r.error {
641 Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
642 "Failed to configure {},{} - {}",
643 binding, capid, e
644 ))))
645 } else {
646 self.record_binding(
647 actor,
648 capid,
649 &binding,
650 &CapabilityConfiguration {
651 module: actor.to_string(),
652 values: config,
653 },
654 )?;
655 #[cfg(feature = "lattice")]
656 let _ = self.bus.publish_event(BusEvent::ActorBindingCreated {
657 actor: actor.to_string(),
658 capid: capid.to_string(),
659 instance_name: binding.to_string(),
660 host: self.id(),
661 });
662 Ok(())
663 }
664 }
665 Err(e) => Err(errors::new(errors::ErrorKind::CapabilityProvider(format!(
666 "Failed to configure {},{} - {}",
667 binding, capid, e
668 )))),
669 }
670 }
671
672 pub fn call_actor(&self, actor: &str, operation: &str, msg: &[u8]) -> Result<Vec<u8>> {
678 let key = KeyPair::from_seed(&self.sk).unwrap();
679 if !self.claims.read().unwrap().contains_key(actor) {
680 return Err(errors::new(errors::ErrorKind::MiscHost(
681 "No such actor".into(),
682 )));
683 }
684 let inv = Invocation::new(
685 &key,
686 WasccEntity::Actor(SYSTEM_ACTOR.to_string()),
687 WasccEntity::Actor(actor.to_string()),
688 operation,
689 msg.to_vec(),
690 );
691 let tgt_subject = bus::actor_subject(self.ns.as_ref().map(String::as_str), actor);
692 match self.bus.invoke(&tgt_subject, inv) {
693 Ok(resp) => match resp.error {
694 Some(e) => Err(format!("Invocation failure: {}", e).into()),
695 None => Ok(resp.msg),
696 },
697 Err(e) => Err(e),
698 }
699 }
700
701 pub fn claims_for_actor(&self, pk: &str) -> Option<Claims<wascap::jwt::Actor>> {
704 let c = self.claims.read().unwrap().get(pk).cloned();
705
706 c
707 }
708
709 #[cfg(feature = "manifest")]
712 pub fn apply_manifest(&self, manifest: HostManifest) -> Result<()> {
713 {
714 let mut labels = self.labels.write().unwrap();
715 for (label, label_value) in manifest.labels {
716 if !RESTRICTED_LABELS.contains(&label.as_ref()) {
717 labels.insert(label.to_string(), label_value.to_string());
718 }
719 }
720 }
721 for actor in manifest.actors {
722 self.add_actor_file_first(&actor)?; }
724 for cap in manifest.capabilities {
725 if Path::new(&cap.path).exists() {
727 self.add_native_capability(NativeCapability::from_file(
728 cap.path,
729 cap.binding_name,
730 )?)?;
731 } else {
732 self.add_native_capability_from_registry(&cap.path, cap.binding_name)?;
733 }
734 }
735 for config in manifest.bindings {
736 self.set_binding(
737 &config.actor,
738 &config.capability,
739 config.binding,
740 config.values.unwrap_or(HashMap::new()),
741 )?;
742 }
743 Ok(())
744 }
745
746 fn add_actor_file_first(&self, actor: &str) -> Result<()> {
747 if std::path::Path::new(actor).exists() {
748 self.add_actor(Actor::from_file(&actor)?)
749 } else {
750 self.add_actor_from_registry(actor)
751 }
752 }
753
754 pub fn actors(&self) -> Vec<SubjectClaimsPair> {
757 authz::get_all_claims(self.claims.clone())
758 }
759
760 pub fn capabilities(&self) -> HashMap<(String, String), CapabilityDescriptor> {
762 let lock = self.caps.read().unwrap();
763 let mut res = HashMap::new();
764 for (rk, descriptor) in lock.iter() {
765 res.insert(
766 (rk.binding_name.to_string(), rk.capid.to_string()),
767 descriptor.clone(),
768 );
769 }
770 res
771 }
772
773 pub fn actors_by_tag(&self, tags: &[&str]) -> Vec<String> {
776 let mut actors = vec![];
777
778 for (actor, claims) in self.claims.read().unwrap().iter() {
779 if let Some(actor_tags) = claims.metadata.as_ref().and_then(|m| m.tags.as_ref()) {
780 if tags.iter().all(|&t| actor_tags.contains(&t.to_string())) {
781 actors.push(actor.to_string())
782 }
783 }
784 }
785
786 actors
787 }
788
789 pub fn shutdown(&self) -> Result<()> {
793 {
794 let lock = self.claims.read().unwrap();
795 let actors: Vec<_> = lock.values().collect();
796 for claims in actors {
797 self.remove_actor(&claims.subject)?;
798 }
799 }
800 let caps = self.capabilities();
801 for (binding_name, capid) in caps.keys() {
802 self.remove_native_capability(&capid, Some(binding_name.to_string()))?;
803 }
804 self.bus.disconnect();
805 Ok(())
806 }
807
808 pub fn id(&self) -> String {
810 self.pk.to_string()
811 }
812}