wascc_host/
inthost.rs

1// Implementations of support functions for the `Host` struct
2
3use super::Host;
4use crate::Result;
5use data_encoding::HEXUPPER;
6use ring::digest::{Context, Digest, SHA256};
7
8use crate::bus;
9use crate::bus::MessageBus;
10use crate::BindingsList;
11use crate::{authz, errors, Actor, Authorizer, NativeCapability, RouteKey};
12use errors::ErrorKind;
13use provider_archive::ProviderArchive;
14use std::str::FromStr;
15use std::{
16    collections::HashMap,
17    io::Read,
18    sync::{Arc, RwLock},
19};
20use uuid::Uuid;
21use wapc::WapcHost;
22use wascap::{jwt::Claims, prelude::KeyPair};
23use wascc_codec::{
24    capabilities::{CapabilityDescriptor, OP_GET_CAPABILITY_DESCRIPTOR},
25    core::{CapabilityConfiguration, OP_PERFORM_LIVE_UPDATE, OP_REMOVE_ACTOR},
26    deserialize, serialize, SYSTEM_ACTOR,
27};
28
29pub(crate) const CORELABEL_ARCH: &str = "hostcore.arch";
30pub(crate) const CORELABEL_OS: &str = "hostcore.os";
31pub(crate) const CORELABEL_OSFAMILY: &str = "hostcore.osfamily";
32
33pub(crate) const OCI_VAR_USER: &str = "OCI_REGISTRY_USER";
34pub(crate) const OCI_VAR_PASSWORD: &str = "OCI_REGISTRY_PASSWORD";
35
36#[allow(dead_code)]
37pub(crate) const RESTRICTED_LABELS: [&str; 3] = [CORELABEL_OSFAMILY, CORELABEL_ARCH, CORELABEL_OS];
38
39// Unsubscribes all of the private actor-provider comms subjects
40pub(crate) fn unsub_all_bindings(
41    bindings: Arc<RwLock<BindingsList>>,
42    bus: Arc<MessageBus>,
43    capid: &str,
44) {
45    bindings
46        .read()
47        .unwrap()
48        .keys()
49        .filter(|(_a, c, _b)| c == capid)
50        .for_each(|(a, c, b)| {
51            let _ = bus.unsubscribe(&bus.provider_subject_bound_actor(c, b, a));
52        });
53}
54
55impl Host {
56    pub(crate) fn record_binding(
57        &self,
58        actor: &str,
59        capid: &str,
60        binding: &str,
61        config: &CapabilityConfiguration,
62    ) -> Result<()> {
63        let mut lock = self.bindings.write().unwrap();
64        lock.insert(
65            (actor.to_string(), capid.to_string(), binding.to_string()),
66            config.clone(),
67        );
68        trace!(
69            "Actor {} successfully bound to {},{}",
70            actor,
71            binding,
72            capid
73        );
74        Ok(())
75    }
76
77    pub(crate) fn ensure_extras(&self) -> Result<()> {
78        self.add_native_capability(NativeCapability::from_instance(
79            crate::extras::ExtrasCapabilityProvider::default(),
80            None,
81        )?)?;
82        Ok(())
83    }
84}
85
86/// In the case of a portable capability provider, obtain its capability descriptor
87pub(crate) fn get_descriptor(host: &mut WapcHost) -> Result<CapabilityDescriptor> {
88    let msg = wascc_codec::core::HealthRequest { placeholder: false }; // TODO: eventually support sending an empty slice for this
89    let res = host.call(OP_GET_CAPABILITY_DESCRIPTOR, &serialize(&msg)?)?;
90    deserialize(&res).map_err(|e| e.into())
91}
92
93pub(crate) fn remove_cap(
94    caps: Arc<RwLock<HashMap<RouteKey, CapabilityDescriptor>>>,
95    capid: &str,
96    binding: &str,
97) {
98    caps.write().unwrap().remove(&RouteKey::new(binding, capid));
99}
100
101/// Puts a "live update" message into the dispatch queue, which will be handled
102/// as soon as it is pulled off the channel for the target actor
103pub(crate) fn replace_actor(
104    hostkey: &KeyPair,
105    bus: Arc<MessageBus>,
106    new_actor: Actor,
107) -> Result<()> {
108    let public_key = new_actor.token.claims.subject;
109    let tgt_subject = bus.actor_subject(&public_key);
110    let inv = gen_liveupdate_invocation(hostkey, &public_key, new_actor.bytes);
111
112    match bus.invoke(&tgt_subject, inv) {
113        Ok(_) => {
114            info!("Actor {} replaced", public_key);
115            Ok(())
116        }
117        Err(e) => Err(e),
118    }
119}
120
121pub(crate) fn live_update(guest: &mut WapcHost, inv: &Invocation) -> InvocationResponse {
122    match guest.replace_module(&inv.msg) {
123        Ok(_) => InvocationResponse::success(inv, vec![]),
124        Err(e) => {
125            error!("Failed to perform hot swap, ignoring message: {}", e);
126            InvocationResponse::error(inv, "Failed to perform hot swap")
127        }
128    }
129}
130
131fn gen_liveupdate_invocation(hostkey: &KeyPair, target: &str, bytes: Vec<u8>) -> Invocation {
132    Invocation::new(
133        hostkey,
134        WasccEntity::Actor(SYSTEM_ACTOR.to_string()),
135        WasccEntity::Actor(target.to_string()),
136        OP_PERFORM_LIVE_UPDATE,
137        bytes,
138    )
139}
140
141/// Removes all bindings for a given actor by sending the "remove actor" message
142/// to each of the capabilities
143pub(crate) fn deconfigure_actor(
144    hostkey: KeyPair,
145    bus: Arc<MessageBus>,
146    bindings: Arc<RwLock<BindingsList>>,
147    key: &str,
148) {
149    #[cfg(feature = "lattice")]
150    {
151        // Don't remove the bindings for this actor unless it's the last instance in the lattice
152        if let Ok(i) = bus.instance_count(key) {
153            if i > 0 {
154                // This is 0 because the actor being removed has already been taken out of the claims map, so bus queries will not see the local instance
155                info!("Actor instance terminated at scale > 1, bypassing binding removal.");
156                return;
157            }
158        }
159    }
160    let cfg = CapabilityConfiguration {
161        module: key.to_string(),
162        values: HashMap::new(),
163    };
164    let buf = serialize(&cfg).unwrap();
165    let nbindings: Vec<_> = {
166        let lock = bindings.read().unwrap();
167        lock.keys()
168            .filter(|(a, _cap, _bind)| a == key)
169            .cloned()
170            .collect()
171    };
172
173    // (actor, capid, binding)
174    for (actor, capid, binding) in nbindings {
175        info!("Unbinding actor {} from {},{}", actor, binding, capid);
176        let _inv_r = bus.invoke(
177            &bus.provider_subject(&capid, &binding), // The OP_REMOVE_ACTOR invocation should go to _all_ instances of the provider being unbound
178            gen_remove_actor(&hostkey, buf.clone(), &binding, &capid),
179        );
180        remove_binding(bindings.clone(), key, &binding, &capid);
181    }
182}
183
184/// Removes all bindings from a capability without notifying anyone
185pub(crate) fn unbind_all_from_cap(bindings: Arc<RwLock<BindingsList>>, capid: &str, binding: &str) {
186    let mut lock = bindings.write().unwrap();
187    // (actor, capid, binding name)
188    lock.retain(|k, _| !(k.1 == capid) && (k.2 == binding));
189}
190
191pub(crate) fn remove_binding(
192    bindings: Arc<RwLock<BindingsList>>,
193    actor: &str,
194    binding: &str,
195    capid: &str,
196) {
197    // binding: (actor,  capid, binding)
198    let mut lock = bindings.write().unwrap();
199    lock.remove(&(actor.to_string(), capid.to_string(), binding.to_string()));
200}
201
202pub(crate) fn gen_remove_actor(
203    hostkey: &KeyPair,
204    msg: Vec<u8>,
205    binding: &str,
206    capid: &str,
207) -> Invocation {
208    Invocation::new(
209        hostkey,
210        WasccEntity::Actor(SYSTEM_ACTOR.to_string()),
211        WasccEntity::Capability {
212            capid: capid.to_string(),
213            binding: binding.to_string(),
214        },
215        OP_REMOVE_ACTOR,
216        msg,
217    )
218}
219
220/// An immutable representation of an invocation within waSCC
221#[derive(Debug, Clone)]
222#[cfg_attr(feature = "lattice", derive(serde::Serialize, serde::Deserialize))]
223pub struct Invocation {
224    pub origin: WasccEntity,
225    pub target: WasccEntity,
226    pub operation: String,
227    pub msg: Vec<u8>,
228    pub id: String,
229    pub encoded_claims: String,
230    pub host_id: String,
231}
232
233/// Represents an invocation target - either an actor or a bound capability provider
234#[derive(Debug, Clone, PartialEq)]
235#[cfg_attr(feature = "lattice", derive(serde::Serialize, serde::Deserialize))]
236pub enum WasccEntity {
237    Actor(String),
238    Capability { capid: String, binding: String },
239}
240
241impl WasccEntity {
242    pub fn url(&self) -> String {
243        match self {
244            WasccEntity::Actor(pk) => format!("{}://{}", bus::URL_SCHEME, pk),
245            WasccEntity::Capability { capid, binding } => format!(
246                "{}://{}/{}",
247                bus::URL_SCHEME,
248                capid.replace(":", "/").replace(" ", "_").to_lowercase(),
249                binding.replace(" ", "_").to_lowercase(),
250            ),
251        }
252    }
253}
254
255impl Invocation {
256    pub fn new(
257        hostkey: &KeyPair,
258        origin: WasccEntity,
259        target: WasccEntity,
260        op: &str,
261        msg: Vec<u8>,
262    ) -> Invocation {
263        let subject = format!("{}", Uuid::new_v4());
264        let issuer = hostkey.public_key();
265        let target_url = format!("{}/{}", target.url(), op);
266        let claims = Claims::<wascap::prelude::Invocation>::new(
267            issuer.to_string(),
268            subject.to_string(),
269            &target_url,
270            &origin.url(),
271            &invocation_hash(&target_url, &origin.url(), &msg),
272        );
273        Invocation {
274            origin,
275            target,
276            operation: op.to_string(),
277            msg,
278            id: subject,
279            encoded_claims: claims.encode(&hostkey).unwrap(),
280            host_id: issuer.to_string(),
281        }
282    }
283
284    pub fn origin_url(&self) -> String {
285        self.origin.url()
286    }
287
288    pub fn target_url(&self) -> String {
289        format!("{}/{}", self.target.url(), self.operation)
290    }
291
292    pub fn hash(&self) -> String {
293        invocation_hash(&self.target_url(), &self.origin_url(), &self.msg)
294    }
295
296    pub fn validate_antiforgery(&self) -> Result<()> {
297        let vr = wascap::jwt::validate_token::<wascap::prelude::Invocation>(&self.encoded_claims)?;
298        let claims = Claims::<wascap::prelude::Invocation>::decode(&self.encoded_claims)?;
299        if vr.expired {
300            return Err(errors::new(ErrorKind::Authorization(
301                "Invocation claims token expired".into(),
302            )));
303        }
304        if !vr.signature_valid {
305            return Err(errors::new(ErrorKind::Authorization(
306                "Invocation claims signature invalid".into(),
307            )));
308        }
309        if vr.cannot_use_yet {
310            return Err(errors::new(ErrorKind::Authorization(
311                "Attempt to use invocation before claims token allows".into(),
312            )));
313        }
314        let inv_claims = claims.metadata.unwrap();
315        if inv_claims.invocation_hash != self.hash() {
316            return Err(errors::new(ErrorKind::Authorization(
317                "Invocation hash does not match signed claims hash".into(),
318            )));
319        }
320        if claims.subject != self.id {
321            return Err(errors::new(ErrorKind::Authorization(
322                "Subject of invocation claims token does not match invocation ID".into(),
323            )));
324        }
325        if claims.issuer != self.host_id {
326            return Err(errors::new(ErrorKind::Authorization(
327                "Invocation claims issuer does not match invocation host".into(),
328            )));
329        }
330        if inv_claims.target_url != self.target_url() {
331            return Err(errors::new(ErrorKind::Authorization(
332                "Invocation claims and invocation target URL do not match".into(),
333            )));
334        }
335        if inv_claims.origin_url != self.origin_url() {
336            return Err(errors::new(ErrorKind::Authorization(
337                "Invocation claims and invocation origin URL do not match".into(),
338            )));
339        }
340
341        Ok(())
342    }
343}
344
345/// The response to an invocation
346#[derive(Debug, Clone)]
347#[cfg_attr(feature = "lattice", derive(serde::Serialize, serde::Deserialize))]
348pub struct InvocationResponse {
349    pub msg: Vec<u8>,
350    pub error: Option<String>,
351    pub invocation_id: String,
352}
353
354impl InvocationResponse {
355    pub fn success(inv: &Invocation, msg: Vec<u8>) -> InvocationResponse {
356        InvocationResponse {
357            msg,
358            error: None,
359            invocation_id: inv.id.to_string(),
360        }
361    }
362
363    pub fn error(inv: &Invocation, err: &str) -> InvocationResponse {
364        InvocationResponse {
365            msg: Vec::new(),
366            error: Some(err.to_string()),
367            invocation_id: inv.id.to_string(),
368        }
369    }
370}
371
372pub(crate) fn wapc_host_callback(
373    hostkey: KeyPair,
374    claims: Claims<wascap::jwt::Actor>,
375    bus: Arc<MessageBus>,
376    binding: &str,
377    namespace: &str,
378    operation: &str,
379    payload: &[u8],
380    authorizer: Arc<RwLock<Box<dyn Authorizer>>>,
381) -> std::result::Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
382    trace!(
383        "Guest {} invoking {}:{}",
384        claims.subject,
385        namespace,
386        operation
387    );
388
389    let capability_id = namespace;
390    let inv = invocation_from_callback(
391        &hostkey,
392        &claims.subject,
393        binding,
394        namespace,
395        operation,
396        payload,
397    );
398
399    if !authz::can_invoke(&claims, capability_id, operation) {
400        return Err(Box::new(errors::new(errors::ErrorKind::Authorization(
401            format!(
402                "{} {} attempted to call {} on {},{} - PERMISSION DENIED.",
403                if claims.metadata.unwrap().provider {
404                    "Provider"
405                } else {
406                    "Actor"
407                },
408                claims.subject,
409                operation,
410                capability_id,
411                binding
412            ),
413        ))));
414    } else {
415        if !authorizer
416            .read()
417            .unwrap()
418            .can_invoke(&claims, &inv.target, operation)
419        {
420            return Err(Box::new(errors::new(errors::ErrorKind::Authorization(
421                format!(
422                    "{} {} attempted to call {:?} - Authorizer denied access",
423                    if claims.metadata.unwrap().provider {
424                        "Provider"
425                    } else {
426                        "Actor"
427                    },
428                    claims.subject,
429                    &inv.target
430                ),
431            ))));
432        }
433    }
434    // Make a request on either `wasmbus.Mxxxxx` for an actor or `wasmbus.{capid}.{binding}.{calling-actor}` for
435    // a bound capability provider
436    let invoke_subject = match &inv.target {
437        WasccEntity::Actor(subject) => bus.actor_subject(subject),
438        WasccEntity::Capability { capid, binding } => {
439            bus.provider_subject_bound_actor(capid, binding, &claims.subject)
440        }
441    };
442    match bus.invoke(&invoke_subject, inv) {
443        Ok(inv_r) => match inv_r.error {
444            Some(e) => Err(format!("Invocation failure: {}", e).into()),
445            None => Ok(inv_r.msg),
446        },
447        Err(e) => Err(Box::new(errors::new(errors::ErrorKind::HostCallFailure(
448            e.into(),
449        )))),
450    }
451}
452
453pub(crate) fn fetch_oci_bytes(img: &str) -> Result<Vec<u8>> {
454    let cfg = oci_distribution::client::ClientConfig::default();
455    let mut c = oci_distribution::Client::new(cfg);
456
457    let img = oci_distribution::Reference::from_str(img).map_err(|e| {
458        crate::errors::new(crate::errors::ErrorKind::MiscHost(format!(
459            "Failed to parse OCI distribution reference: {}",
460            e
461        )))
462    })?;
463    let auth = if let Ok(u) = std::env::var(OCI_VAR_USER) {
464        if let Ok(p) = std::env::var(OCI_VAR_PASSWORD) {
465            oci_distribution::secrets::RegistryAuth::Basic(u, p)
466        } else {
467            oci_distribution::secrets::RegistryAuth::Anonymous
468        }
469    } else {
470        oci_distribution::secrets::RegistryAuth::Anonymous
471    };
472    let imgdata: Result<oci_distribution::client::ImageData> =
473        tokio::runtime::Runtime::new().unwrap().block_on(async {
474            c.pull_image(&img, &auth)
475                .await
476                .map_err(|e| format!("{}", e).into())
477        });
478
479    match imgdata {
480        Ok(imgdata) => Ok(imgdata.content),
481        Err(e) => {
482            error!("Failed to fetch OCI bytes: {}", e);
483            Err(crate::errors::new(crate::errors::ErrorKind::MiscHost(
484                "Failed to fetch OCI bytes".to_string(),
485            )))
486        }
487    }
488}
489
490pub(crate) fn fetch_provider_archive(img: &str) -> Result<ProviderArchive> {
491    let bytes = fetch_oci_bytes(img)?;
492    ProviderArchive::try_load(&bytes)
493        .map_err(|e| format!("Failed to load provider archive: {}", e).into())
494}
495
496fn invocation_from_callback(
497    hostkey: &KeyPair,
498    origin: &str,
499    bd: &str,
500    ns: &str,
501    op: &str,
502    payload: &[u8],
503) -> Invocation {
504    let binding = if bd.trim().is_empty() {
505        // Some actor SDKs may not specify a binding field by default
506        "default".to_string()
507    } else {
508        bd.to_string()
509    };
510    let target = if ns.len() == 56 && ns.starts_with("M") {
511        WasccEntity::Actor(ns.to_string())
512    } else {
513        WasccEntity::Capability {
514            binding,
515            capid: ns.to_string(),
516        }
517    };
518    Invocation::new(
519        hostkey,
520        WasccEntity::Actor(origin.to_string()),
521        target,
522        op,
523        payload.to_vec(),
524    )
525}
526
527pub(crate) fn gen_config_invocation(
528    hostkey: &KeyPair,
529    actor: &str,
530    capid: &str,
531    claims: Claims<wascap::jwt::Actor>,
532    binding: String,
533    values: HashMap<String, String>,
534) -> Invocation {
535    use wascc_codec::core::*;
536    let mut values = values.clone();
537    values.insert(
538        CONFIG_WASCC_CLAIMS_ISSUER.to_string(),
539        claims.issuer.to_string(),
540    );
541    values.insert(
542        CONFIG_WASCC_CLAIMS_CAPABILITIES.to_string(),
543        claims
544            .metadata
545            .as_ref()
546            .unwrap()
547            .caps
548            .as_ref()
549            .unwrap_or(&Vec::new())
550            .join(","),
551    );
552    values.insert(CONFIG_WASCC_CLAIMS_NAME.to_string(), claims.name());
553    values.insert(
554        CONFIG_WASCC_CLAIMS_EXPIRES.to_string(),
555        claims.expires.unwrap_or(0).to_string(),
556    );
557    values.insert(
558        CONFIG_WASCC_CLAIMS_TAGS.to_string(),
559        claims
560            .metadata
561            .as_ref()
562            .unwrap()
563            .tags
564            .as_ref()
565            .unwrap_or(&Vec::new())
566            .join(","),
567    );
568    let cfgvals = CapabilityConfiguration {
569        module: actor.to_string(),
570        values,
571    };
572    let payload = serialize(&cfgvals).unwrap();
573    Invocation::new(
574        hostkey,
575        WasccEntity::Actor(SYSTEM_ACTOR.to_string()),
576        WasccEntity::Capability {
577            capid: capid.to_string(),
578            binding,
579        },
580        OP_BIND_ACTOR,
581        payload,
582    )
583}
584
585fn sha256_digest<R: Read>(mut reader: R) -> Result<Digest> {
586    let mut context = Context::new(&SHA256);
587    let mut buffer = [0; 1024];
588
589    loop {
590        let count = reader.read(&mut buffer)?;
591        if count == 0 {
592            break;
593        }
594        context.update(&buffer[..count]);
595    }
596
597    Ok(context.finish())
598}
599
600pub fn invocation_hash(target_url: &str, origin_url: &str, msg: &[u8]) -> String {
601    use std::io::Write;
602    let mut cleanbytes: Vec<u8> = Vec::new();
603    cleanbytes.write(origin_url.as_bytes()).unwrap();
604    cleanbytes.write(target_url.as_bytes()).unwrap();
605    cleanbytes.write(msg).unwrap();
606    let digest = sha256_digest(cleanbytes.as_slice()).unwrap();
607    HEXUPPER.encode(digest.as_ref())
608}
609
610pub(crate) fn detect_core_host_labels() -> HashMap<String, String> {
611    let mut hm = HashMap::new();
612    hm.insert(
613        CORELABEL_ARCH.to_string(),
614        std::env::consts::ARCH.to_string(),
615    );
616    hm.insert(CORELABEL_OS.to_string(), std::env::consts::OS.to_string());
617    hm.insert(
618        CORELABEL_OSFAMILY.to_string(),
619        std::env::consts::FAMILY.to_string(),
620    );
621    info!("Detected Intrinsic host labels. hostcore.arch = {}, hostcore.os = {}, hostcore.family = {}",
622          std::env::consts::ARCH,
623          std::env::consts::OS,
624          std::env::consts::FAMILY,
625    );
626    hm
627}
628
629pub(crate) fn fetch_actor(actor_id: &str) -> Result<crate::actor::Actor> {
630    let vec = crate::inthost::fetch_oci_bytes(actor_id)?;
631
632    crate::actor::Actor::from_slice(&vec)
633}
634
635pub(crate) fn fetch_provider(
636    provider_ref: &str,
637    binding_name: &str,
638    labels: Arc<RwLock<HashMap<String, String>>>,
639) -> Result<(
640    crate::capability::NativeCapability,
641    Claims<wascap::jwt::CapabilityProvider>,
642)> {
643    use std::fs::File;
644    use std::io::Write;
645
646    let par = crate::inthost::fetch_provider_archive(provider_ref)?;
647    let lock = labels.read().unwrap();
648    let target = format!("{}-{}", lock[CORELABEL_ARCH], lock[CORELABEL_OS]);
649    let v = par.target_bytes(&target);
650    if let Some(v) = v {
651        let path = std::env::temp_dir();
652        let path = path.join(target);
653        {
654            let mut tf = File::create(&path)?;
655            tf.write_all(&v)?;
656        }
657        let nc = NativeCapability::from_file(path, Some(binding_name.to_string()))?;
658        if let Some(c) = par.claims() {
659            Ok((nc, c))
660        } else {
661            Err(format!(
662                "No embedded claims found in provider archive for {}",
663                provider_ref
664            )
665            .into())
666        }
667    } else {
668        Err(format!("No binary found in provider archive for {}", target).into())
669    }
670}
671
672#[cfg(test)]
673mod test {
674    use super::Invocation;
675    use crate::WasccEntity;
676    use wascap::prelude::KeyPair;
677
678    #[test]
679    fn invocation_antiforgery() {
680        let hostkey = KeyPair::new_server();
681        // As soon as we create the invocation, the claims are baked and signed with the hash embedded.
682        let inv = Invocation::new(
683            &hostkey,
684            WasccEntity::Actor("testing".into()),
685            WasccEntity::Capability {
686                capid: "wascc:messaging".into(),
687                binding: "default".into(),
688            },
689            "OP_TESTING",
690            vec![1, 2, 3, 4],
691        );
692        let res = inv.validate_antiforgery();
693        println!("{:?}", res);
694        // Obviously an invocation we just created should pass anti-forgery check
695        assert!(inv.validate_antiforgery().is_ok());
696
697        // Let's tamper with the invocation and we should hit the hash check first
698        let mut bad_inv = inv.clone();
699        bad_inv.target = WasccEntity::Actor("BADACTOR-EXFILTRATOR".into());
700        assert!(bad_inv.validate_antiforgery().is_err());
701
702        // Alter the payload and we should also hit the hash check
703        let mut really_bad_inv = inv.clone();
704        really_bad_inv.msg = vec![5, 4, 3, 2];
705        assert!(really_bad_inv.validate_antiforgery().is_err());
706
707        // And just to double-check the routing address
708        assert_eq!(
709            inv.target_url(),
710            "wasmbus://wascc/messaging/default/OP_TESTING"
711        );
712    }
713}