wasmcloud-host 0.27.1

wasmCloud host library
use core::net::SocketAddr;

use std::collections::HashMap;
use std::sync::Arc;

use anyhow::Context as _;
use http::header::HOST;
use http::uri::Scheme;
use http::Uri;
use http_body_util::BodyExt as _;
use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinSet;
use tokio::time::Instant;
use tracing::{info_span, instrument, trace_span, Instrument as _, Span};
use wasmcloud_core::http::{load_settings, ServiceSettings};
use wasmcloud_provider_sdk::{LinkConfig, LinkDeleteInfo};
use wasmcloud_tracing::KeyValue;
use wrpc_interface_http::ServeIncomingHandlerWasmtime as _;

use crate::wasmbus::{Component, InvocationContext};

use super::listen;

pub(crate) struct Provider {
    /// Default address for the provider to try to listen on if no address is provided
    pub(crate) address: SocketAddr,
    /// Map of components that the provider can instantiate, keyed by component ID
    pub(crate) components: Arc<RwLock<HashMap<String, Arc<Component>>>>,
    /// Map of links that the provider has established, component ID -> link name -> listener task
    pub(crate) links: Mutex<HashMap<Arc<str>, HashMap<Box<str>, JoinSet<()>>>>,
    pub(crate) lattice_id: Arc<str>,
    pub(crate) host_id: Arc<str>,
}

impl wasmcloud_provider_sdk::Provider for Provider {
    #[instrument(level = "debug", skip_all)]
    async fn receive_link_config_as_source(
        &self,
        LinkConfig {
            target_id,
            config,
            link_name,
            ..
        }: LinkConfig<'_>,
    ) -> anyhow::Result<()> {
        let ServiceSettings { address, .. } =
            load_settings(Some(self.address), config).context("failed to load settings")?;

        let components = Arc::clone(&self.components);
        let host_id = Arc::clone(&self.host_id);
        let lattice_id = Arc::clone(&self.lattice_id);
        let target_id: Arc<str> = Arc::from(target_id);
        let tasks = listen(address, {
            let target_id = Arc::clone(&target_id);
            move |req: hyper::Request<hyper::body::Incoming>| {
                let components = Arc::clone(&components);
                let host_id = Arc::clone(&host_id);
                let lattice_id = Arc::clone(&lattice_id);
                let target_id = Arc::clone(&target_id);
                async move {
                    let component = {
                        let components = components.read().await;
                        let component = components
                            .get(target_id.as_ref())
                            .context("linked component not found")?;
                        Arc::clone(component)
                    };
                    let (
                        http::request::Parts {
                            method,
                            uri,
                            headers,
                            ..
                        },
                        body,
                    ) = req.into_parts();
                    let http::uri::Parts {
                        scheme,
                        authority,
                        path_and_query,
                        ..
                    } = uri.into_parts();
                    // TODO(#3705): Propagate trace context from headers
                    let mut uri = Uri::builder().scheme(scheme.unwrap_or(Scheme::HTTP));
                    if let Some(authority) = authority {
                        uri = uri.authority(authority);
                    } else if let Some(authority) = headers.get("X-Forwarded-Host") {
                        uri = uri.authority(authority.as_bytes());
                    } else if let Some(authority) = headers.get(HOST) {
                        uri = uri.authority(authority.as_bytes());
                    }
                    if let Some(path_and_query) = path_and_query {
                        uri = uri.path_and_query(path_and_query)
                    };
                    let uri = uri.build().context("invalid URI")?;
                    let mut req = http::Request::builder().method(method);
                    *req.headers_mut().expect("headers missing") = headers;

                    let req = req
                        .uri(uri)
                        .body(
                            body.map_err(wasmtime_wasi_http::hyper_response_error)
                                .boxed(),
                        )
                        .context("invalid request")?;
                    let _permit = component
                        .permits
                        .acquire()
                        .instrument(trace_span!("acquire_permit"))
                        .await
                        .context("failed to acquire execution permit")?;
                    let res = component
                        .instantiate(component.handler.copy_for_new(), component.events.clone())
                        .handle(
                            InvocationContext {
                                span: Span::current(),
                                start_at: Instant::now(),
                                attributes: vec![
                                    KeyValue::new(
                                        "component.ref",
                                        Arc::clone(&component.image_reference),
                                    ),
                                    KeyValue::new("lattice", Arc::clone(&lattice_id)),
                                    KeyValue::new("host", Arc::clone(&host_id)),
                                ],
                            },
                            req,
                        )
                        .await?;
                    let res = res?;
                    anyhow::Ok(res)
                }
                .instrument(info_span!("handle"))
            }
        })
        .await?;

        self.links
            .lock()
            .instrument(trace_span!("insert_link"))
            .await
            .entry(target_id)
            .or_default()
            .insert(link_name.into(), tasks);
        Ok(())
    }

    #[instrument(level = "debug", skip_all)]
    async fn delete_link_as_source(&self, info: impl LinkDeleteInfo) -> anyhow::Result<()> {
        let target_id = info.get_target_id();
        let link_name = info.get_link_name();
        self.links
            .lock()
            .await
            .get_mut(target_id)
            .map(|links| links.remove(link_name));
        Ok(())
    }
}