wasmcloud_runtime/component/
mod.rs

1use core::fmt::{self, Debug};
2use core::future::Future;
3use core::ops::{Bound, Deref};
4use core::pin::Pin;
5use core::time::Duration;
6
7use std::collections::{BTreeMap, HashMap};
8use std::sync::Arc;
9
10use anyhow::{ensure, Context as _};
11use futures::{Stream, TryStreamExt as _};
12use tokio::io::{AsyncRead, AsyncReadExt as _};
13use tokio::sync::mpsc;
14use tracing::{debug, info_span, instrument, warn, Instrument as _, Span};
15use tracing_opentelemetry::OpenTelemetrySpanExt;
16use wascap::jwt;
17use wascap::wasm::extract_claims;
18use wasi_preview1_component_adapter_provider::{
19    WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER,
20};
21use wasmtime::component::{types, Linker, ResourceTable, ResourceTableError, ResourceType};
22use wasmtime_wasi::{IoView, WasiCtx, WasiCtxBuilder, WasiView};
23use wasmtime_wasi_http::WasiHttpCtx;
24use wrpc_runtime_wasmtime::{
25    collect_component_resource_exports, collect_component_resource_imports, link_item, rpc,
26    RemoteResource, ServeExt as _, SharedResourceTable, WrpcView,
27};
28
29use crate::capability::{self, wrpc};
30use crate::experimental::Features;
31use crate::Runtime;
32
33pub use bus::{Bus, Error};
34pub use bus1_0_0::Bus as Bus1_0_0;
35pub use config::Config;
36pub use identity::Identity;
37pub use logging::Logging;
38pub use messaging::v0_2::Messaging as Messaging0_2;
39pub use messaging::v0_3::{
40    Client as MessagingClient0_3, GuestMessage as MessagingGuestMessage0_3,
41    HostMessage as MessagingHostMessage0_3, Messaging as Messaging0_3,
42};
43pub use secrets::Secrets;
44
45pub(crate) mod blobstore;
46mod bus;
47mod bus1_0_0;
48mod config;
49mod http;
50mod identity;
51mod keyvalue;
52mod logging;
53pub(crate) mod messaging;
54mod secrets;
55
56/// Instance target, which is replaced in wRPC
57///
58/// This enum represents the original instance import invoked by the component
59#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
60pub enum ReplacedInstanceTarget {
61    /// `wasi:blobstore/blobstore` instance replacement
62    BlobstoreBlobstore,
63    /// `wasi:blobstore/container` instance replacement
64    BlobstoreContainer,
65    /// `wasi:keyvalue/atomic` instance replacement
66    KeyvalueAtomics,
67    /// `wasi:keyvalue/store` instance replacement
68    KeyvalueStore,
69    /// `wasi:keyvalue/batch` instance replacement
70    KeyvalueBatch,
71    /// `wasi:keyvalue/watch` instance replacment
72    KeyvalueWatch,
73    /// `wasi:http/incoming-handler` instance replacement
74    HttpIncomingHandler,
75    /// `wasi:http/outgoing-handler` instance replacement
76    HttpOutgoingHandler,
77}
78
79fn is_0_2(version: &str, min_patch: u64) -> bool {
80    if let Ok(semver::Version {
81        major,
82        minor,
83        patch,
84        pre,
85        build,
86    }) = version.parse()
87    {
88        major == 0 && minor == 2 && patch >= min_patch && pre.is_empty() && build.is_empty()
89    } else {
90        false
91    }
92}
93
94/// This represents a kind of wRPC invocation error
95pub enum InvocationErrorKind {
96    /// This occurs when the endpoint is not found, for example as would happen when the runtime
97    /// would attempt to call `foo:bar/baz@0.2.0`, but the peer served `foo:bar/baz@0.1.0`.
98    NotFound,
99
100    /// An error kind, which will result in a trap in the component
101    Trap,
102}
103
104/// Implementations of this trait are able to introspect an error returned by wRPC invocations
105pub trait InvocationErrorIntrospect {
106    /// Classify [`InvocationErrorKind`] of an error returned by wRPC
107    fn invocation_error_kind(&self, err: &anyhow::Error) -> InvocationErrorKind;
108}
109
110/// A collection of traits that the host must implement
111pub trait Handler:
112    wrpc_transport::Invoke<Context = Option<ReplacedInstanceTarget>>
113    + Bus
114    + Config
115    + Logging
116    + Secrets
117    + Messaging0_2
118    + Messaging0_3
119    + Identity
120    + InvocationErrorIntrospect
121    + Send
122    + Sync
123    + Clone
124    + 'static
125{
126}
127
128impl<
129        T: wrpc_transport::Invoke<Context = Option<ReplacedInstanceTarget>>
130            + Bus
131            + Config
132            + Logging
133            + Secrets
134            + Messaging0_2
135            + Messaging0_3
136            + Identity
137            + InvocationErrorIntrospect
138            + Send
139            + Sync
140            + Clone
141            + 'static,
142    > Handler for T
143{
144}
145
146/// Component instance configuration
147#[derive(Clone, Debug, Default)]
148pub struct ComponentConfig {
149    /// Whether components are required to be signed to be executed
150    pub require_signature: bool,
151}
152
153/// Extracts and validates claims contained within a WebAssembly binary, if present
154///
155/// # Arguments
156///
157/// * `wasm` - Bytes that constitute a valid WebAssembly binary
158///
159/// # Errors
160///
161/// Fails if either parsing fails, or claims are not valid
162///
163/// # Returns
164/// The token embedded in the component, including the [`jwt::Claims`] and the raw JWT
165pub fn claims_token(wasm: impl AsRef<[u8]>) -> anyhow::Result<Option<jwt::Token<jwt::Component>>> {
166    let Some(claims) = extract_claims(wasm).context("failed to extract module claims")? else {
167        return Ok(None);
168    };
169    let v = jwt::validate_token::<jwt::Component>(&claims.jwt)
170        .context("failed to validate module token")?;
171    ensure!(!v.expired, "token expired at `{}`", v.expires_human);
172    ensure!(
173        !v.cannot_use_yet,
174        "token cannot be used before `{}`",
175        v.not_before_human
176    );
177    ensure!(v.signature_valid, "signature is not valid");
178    Ok(Some(claims))
179}
180
181/// Pre-compiled component [Component], which is cheapily-[Cloneable](Clone)
182#[derive(Clone)]
183pub struct Component<H>
184where
185    H: Handler,
186{
187    engine: wasmtime::Engine,
188    claims: Option<jwt::Claims<jwt::Component>>,
189    instance_pre: wasmtime::component::InstancePre<Ctx<H>>,
190    host_resources: Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
191    max_execution_time: Duration,
192    experimental_features: Features,
193}
194
195impl<H> Debug for Component<H>
196where
197    H: Handler,
198{
199    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200        f.debug_struct("Component")
201            .field("claims", &self.claims)
202            .field("runtime", &"wasmtime")
203            .field("max_execution_time", &self.max_execution_time)
204            .finish_non_exhaustive()
205    }
206}
207
208fn new_store<H: Handler>(
209    engine: &wasmtime::Engine,
210    handler: H,
211    max_execution_time: Duration,
212) -> wasmtime::Store<Ctx<H>> {
213    let table = ResourceTable::new();
214    let wasi = WasiCtxBuilder::new()
215        .args(&["main.wasm"]) // TODO: Configure argv[0]
216        .inherit_stderr()
217        .build();
218
219    let mut store = wasmtime::Store::new(
220        engine,
221        Ctx {
222            handler,
223            wasi,
224            http: WasiHttpCtx::new(),
225            table,
226            shared_resources: SharedResourceTable::default(),
227            timeout: max_execution_time,
228            parent_context: None,
229        },
230    );
231    store.set_epoch_deadline(max_execution_time.as_secs());
232    store
233}
234
235/// Events sent by [`Component::serve_wrpc`]
236#[derive(Clone, Debug)]
237pub enum WrpcServeEvent<C> {
238    /// `wasi:http/incoming-handler.handle` return event
239    HttpIncomingHandlerHandleReturned {
240        /// Invocation context
241        context: C,
242        /// Whether the invocation was successfully handled
243        success: bool,
244    },
245    /// `wasmcloud:messaging/handler.handle-message` return event
246    MessagingHandlerHandleMessageReturned {
247        /// Invocation context
248        context: C,
249        /// Whether the invocation was successfully handled
250        success: bool,
251    },
252    /// dynamic export return event
253    DynamicExportReturned {
254        /// Invocation context
255        context: C,
256        /// Whether the invocation was successfully handled
257        success: bool,
258    },
259}
260
261/// This represents a [Stream] of incoming invocations.
262/// Each item represents processing of a single invocation.
263pub type InvocationStream = Pin<
264    Box<
265        dyn Stream<
266                Item = anyhow::Result<
267                    Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'static>>,
268                >,
269            > + Send
270            + 'static,
271    >,
272>;
273
274impl<H> Component<H>
275where
276    H: Handler,
277{
278    /// Extracts [Claims](jwt::Claims) from WebAssembly component and compiles it using [Runtime].
279    ///
280    /// If `wasm` represents a core Wasm module, then it will first be turned into a component.
281    #[instrument(level = "trace", skip_all)]
282    pub fn new(rt: &Runtime, wasm: &[u8]) -> anyhow::Result<Self> {
283        if wasmparser::Parser::is_core_wasm(wasm) {
284            let wasm = wit_component::ComponentEncoder::default()
285                .module(wasm)
286                .context("failed to set core component module")?
287                .adapter(
288                    WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME,
289                    WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER,
290                )
291                .context("failed to add WASI preview1 adapter")?
292                .encode()
293                .context("failed to encode a component from module")?;
294            return Self::new(rt, &wasm);
295        }
296        let engine = rt.engine.clone();
297        let claims_token = claims_token(wasm)?;
298        let claims = claims_token.map(|c| c.claims);
299        let component = wasmtime::component::Component::new(&engine, wasm)
300            .context("failed to compile component")?;
301
302        let mut linker = Linker::new(&engine);
303
304        wasmtime_wasi::add_to_linker_async(&mut linker)
305            .context("failed to link core WASI interfaces")?;
306        wasmtime_wasi_http::add_only_http_to_linker_async(&mut linker)
307            .context("failed to link `wasi:http`")?;
308
309        capability::blobstore::blobstore::add_to_linker(&mut linker, |ctx| ctx)
310            .context("failed to link `wasi:blobstore/blobstore`")?;
311        capability::blobstore::container::add_to_linker(&mut linker, |ctx| ctx)
312            .context("failed to link `wasi:blobstore/container`")?;
313        capability::blobstore::types::add_to_linker(&mut linker, |ctx| ctx)
314            .context("failed to link `wasi:blobstore/types`")?;
315        capability::config::runtime::add_to_linker(&mut linker, |ctx| ctx)
316            .context("failed to link `wasi:config/runtime`")?;
317        capability::config::store::add_to_linker(&mut linker, |ctx| ctx)
318            .context("failed to link `wasi:config/store`")?;
319        capability::keyvalue::atomics::add_to_linker(&mut linker, |ctx| ctx)
320            .context("failed to link `wasi:keyvalue/atomics`")?;
321        capability::keyvalue::store::add_to_linker(&mut linker, |ctx| ctx)
322            .context("failed to link `wasi:keyvalue/store`")?;
323        capability::keyvalue::batch::add_to_linker(&mut linker, |ctx| ctx)
324            .context("failed to link `wasi:keyvalue/batch`")?;
325        capability::logging::logging::add_to_linker(&mut linker, |ctx| ctx)
326            .context("failed to link `wasi:logging/logging`")?;
327        capability::unversioned_logging::logging::add_to_linker(&mut linker, |ctx| ctx)
328            .context("failed to link unversioned `wasi:logging/logging`")?;
329
330        capability::bus1_0_0::lattice::add_to_linker(&mut linker, |ctx| ctx)
331            .context("failed to link `wasmcloud:bus/lattice@1.0.0`")?;
332        capability::bus2_0_1::lattice::add_to_linker(&mut linker, |ctx| ctx)
333            .context("failed to link `wasmcloud:bus/lattice@2.0.1`")?;
334        capability::bus2_0_1::error::add_to_linker(&mut linker, |ctx| ctx)
335            .context("failed to link `wasmcloud:bus/error@2.0.1`")?;
336        capability::messaging0_2_0::types::add_to_linker(&mut linker, |ctx| ctx)
337            .context("failed to link `wasmcloud:messaging/types@0.2.0`")?;
338        capability::messaging0_2_0::consumer::add_to_linker(&mut linker, |ctx| ctx)
339            .context("failed to link `wasmcloud:messaging/consumer@0.2.0`")?;
340        capability::secrets::reveal::add_to_linker(&mut linker, |ctx| ctx)
341            .context("failed to link `wasmcloud:secrets/reveal`")?;
342        capability::secrets::store::add_to_linker(&mut linker, |ctx| ctx)
343            .context("failed to link `wasmcloud:secrets/store`")?;
344        // Only link wasmcloud:messaging@v3 if the feature is enabled
345        if rt.experimental_features.wasmcloud_messaging_v3 {
346            capability::messaging0_3_0::types::add_to_linker(&mut linker, |ctx| ctx)
347                .context("failed to link `wasmcloud:messaging/types@0.3.0`")?;
348            capability::messaging0_3_0::producer::add_to_linker(&mut linker, |ctx| ctx)
349                .context("failed to link `wasmcloud:messaging/producer@0.3.0`")?;
350            capability::messaging0_3_0::request_reply::add_to_linker(&mut linker, |ctx| ctx)
351                .context("failed to link `wasmcloud:messaging/request-reply@0.3.0`")?;
352        }
353        // Only link wasmcloud:identity if the workload identity feature is enabled
354        if rt.experimental_features.workload_identity_interface {
355            capability::identity::store::add_to_linker(&mut linker, |ctx| ctx)
356                .context("failed to link `wasmcloud:identity/store`")?;
357        }
358
359        // Only link wrpc:rpc if the RPC feature is enabled
360        if rt.experimental_features.rpc_interface {
361            rpc::add_to_linker(&mut linker).context("failed to link `wrpc:rpc`")?;
362        }
363
364        let ty = component.component_type();
365        let mut guest_resources = Vec::new();
366        let mut host_resources = BTreeMap::new();
367        collect_component_resource_exports(&engine, &ty, &mut guest_resources);
368        collect_component_resource_imports(&engine, &ty, &mut host_resources);
369        let io_err_tys = host_resources
370            .range::<str, _>((
371                Bound::Included("wasi:io/error@0.2"),
372                Bound::Excluded("wasi:io/error@0.3"),
373            ))
374            .map(|(name, instance)| {
375                instance
376                    .get("error")
377                    .copied()
378                    .with_context(|| format!("{name} instance import missing `error` resource"))
379            })
380            .collect::<anyhow::Result<Box<[_]>>>()?;
381        let io_pollable_tys = host_resources
382            .range::<str, _>((
383                Bound::Included("wasi:io/poll@0.2"),
384                Bound::Excluded("wasi:io/poll@0.3"),
385            ))
386            .map(|(name, instance)| {
387                instance
388                    .get("pollable")
389                    .copied()
390                    .with_context(|| format!("{name} instance import missing `pollable` resource"))
391            })
392            .collect::<anyhow::Result<Box<[_]>>>()?;
393        let io_input_stream_tys = host_resources
394            .range::<str, _>((
395                Bound::Included("wasi:io/streams@0.2"),
396                Bound::Excluded("wasi:io/streams@0.3"),
397            ))
398            .map(|(name, instance)| {
399                instance.get("input-stream").copied().with_context(|| {
400                    format!("{name} instance import missing `input-stream` resource")
401                })
402            })
403            .collect::<anyhow::Result<Box<[_]>>>()?;
404        let io_output_stream_tys = host_resources
405            .range::<str, _>((
406                Bound::Included("wasi:io/streams@0.2"),
407                Bound::Excluded("wasi:io/streams@0.3"),
408            ))
409            .map(|(name, instance)| {
410                instance.get("output-stream").copied().with_context(|| {
411                    format!("{name} instance import missing `output-stream` resource")
412                })
413            })
414            .collect::<anyhow::Result<Box<[_]>>>()?;
415        let rpc_err_ty = host_resources
416            .get("wrpc:rpc/error@0.1.0")
417            .map(|instance| {
418                instance
419                    .get("error")
420                    .copied()
421                    .context("`wrpc:rpc/error@0.1.0` instance import missing `error` resource")
422            })
423            .transpose()?;
424        // TODO: This should include `wasi:http` resources
425        let host_resources = host_resources
426            .into_iter()
427            .map(|(name, instance)| {
428                let instance = instance
429                    .into_iter()
430                    .map(|(name, ty)| {
431                        let host_ty = match ty {
432                            ty if Some(ty) == rpc_err_ty => ResourceType::host::<rpc::Error>(),
433                            ty if io_err_tys.contains(&ty) => {
434                                ResourceType::host::<wasmtime_wasi::bindings::io::error::Error>()
435                            }
436                            ty if io_input_stream_tys.contains(&ty) => ResourceType::host::<
437                                wasmtime_wasi::bindings::io::streams::InputStream,
438                            >(
439                            ),
440                            ty if io_output_stream_tys.contains(&ty) => ResourceType::host::<
441                                wasmtime_wasi::bindings::io::streams::OutputStream,
442                            >(
443                            ),
444                            ty if io_pollable_tys.contains(&ty) => {
445                                ResourceType::host::<wasmtime_wasi::bindings::io::poll::Pollable>()
446                            }
447                            _ => ResourceType::host::<RemoteResource>(),
448                        };
449                        (name, (ty, host_ty))
450                    })
451                    .collect::<HashMap<_, _>>();
452                (name, instance)
453            })
454            .collect::<HashMap<_, _>>();
455        let host_resources = Arc::from(host_resources);
456        if !guest_resources.is_empty() {
457            warn!("exported component resources are not supported in wasmCloud runtime and will be ignored, use a provider instead to enable this functionality");
458        }
459        for (name, ty) in ty.imports(&engine) {
460            // Don't link builtin instances or feature-gated instances if the feature is disabled
461            match name.split_once('/').map(|(pkg, suffix)| {
462                suffix
463                    .split_once('@')
464                    .map_or((pkg, suffix, None), |(iface, version)| {
465                        (pkg, iface, Some(version))
466                    })
467            }) {
468                Some(
469                    ("wasi:blobstore", "blobstore" | "container" | "types", Some("0.2.0-draft"))
470                    | ("wasi:config", "runtime" | "store", Some("0.2.0-draft"))
471                    | ("wasi:keyvalue", "atomics" | "batch" | "store", Some("0.2.0-draft"))
472                    | ("wasi:logging", "logging", None | Some("0.1.0-draft"))
473                    | ("wasmcloud:bus", "lattice", Some("1.0.0" | "2.0.0"))
474                    | ("wasmcloud:messaging", "consumer" | "types", Some("0.2.0"))
475                    | ("wasmcloud:secrets", "reveal" | "store", Some("0.1.0-draft")),
476                ) => {}
477                Some((
478                    "wasi:cli",
479                    "environment" | "exit" | "stderr" | "stdin" | "stdout" | "terminal-input"
480                    | "terminal-output" | "terminal-stderr" | "terminal-stdin" | "terminal-stdout",
481                    Some(version),
482                )) if is_0_2(version, 0) => {}
483                Some(("wasi:clocks", "monotonic-clock" | "wall-clock", Some(version)))
484                    if is_0_2(version, 0) => {}
485                Some(("wasi:clocks", "timezone", Some(version))) if is_0_2(version, 1) => {}
486                Some(("wasi:filesystem", "preopens" | "types", Some(version)))
487                    if is_0_2(version, 0) => {}
488                Some((
489                    "wasi:http",
490                    "incoming-handler" | "outgoing-handler" | "types",
491                    Some(version),
492                )) if is_0_2(version, 0) => {}
493                Some(("wasi:io", "error" | "poll" | "streams", Some(version)))
494                    if is_0_2(version, 0) => {}
495                Some(("wasi:random", "insecure-seed" | "insecure" | "random", Some(version)))
496                    if is_0_2(version, 0) => {}
497                Some((
498                    "wasi:sockets",
499                    "instance-network" | "ip-name-lookup" | "network" | "tcp-create-socket" | "tcp"
500                    | "udp-create-socket" | "udp",
501                    Some(version),
502                )) if is_0_2(version, 0) => {}
503                _ if rt.skip_feature_gated_instance(name) => {}
504                _ => link_item(
505                    &engine,
506                    &mut linker.root(),
507                    [],
508                    Arc::clone(&host_resources),
509                    ty,
510                    "",
511                    name,
512                )
513                .context("failed to link item")?,
514            }
515        }
516        let instance_pre = linker.instantiate_pre(&component)?;
517        Ok(Self {
518            engine,
519            claims,
520            instance_pre,
521            host_resources,
522            max_execution_time: rt.max_execution_time,
523            experimental_features: rt.experimental_features,
524        })
525    }
526
527    /// Sets maximum execution time for functionality exported by this component.
528    /// Values below 1 second will be interpreted as 1 second.
529    #[instrument(level = "trace", skip_all)]
530    pub fn set_max_execution_time(&mut self, max_execution_time: Duration) -> &mut Self {
531        self.max_execution_time = max_execution_time.max(Duration::from_secs(1));
532        self
533    }
534
535    /// Reads the WebAssembly binary asynchronously and calls [Component::new].
536    ///
537    /// # Errors
538    ///
539    /// Fails if either reading `wasm` fails or [Self::new] fails
540    #[instrument(level = "trace", skip_all)]
541    pub async fn read(rt: &Runtime, mut wasm: impl AsyncRead + Unpin) -> anyhow::Result<Self> {
542        let mut buf = Vec::new();
543        wasm.read_to_end(&mut buf)
544            .await
545            .context("failed to read Wasm")?;
546        Self::new(rt, &buf)
547    }
548
549    /// Reads the WebAssembly binary synchronously and calls [Component::new].
550    ///
551    /// # Errors
552    ///
553    /// Fails if either reading `wasm` fails or [Self::new] fails
554    #[instrument(level = "trace", skip_all)]
555    pub fn read_sync(rt: &Runtime, mut wasm: impl std::io::Read) -> anyhow::Result<Self> {
556        let mut buf = Vec::new();
557        wasm.read_to_end(&mut buf).context("failed to read Wasm")?;
558        Self::new(rt, &buf)
559    }
560
561    /// [Claims](jwt::Claims) associated with this [Component].
562    #[instrument(level = "trace")]
563    pub fn claims(&self) -> Option<&jwt::Claims<jwt::Component>> {
564        self.claims.as_ref()
565    }
566
567    /// Instantiates the component given a handler and event channel
568    pub fn instantiate<C>(
569        &self,
570        handler: H,
571        events: mpsc::Sender<WrpcServeEvent<C>>,
572    ) -> Instance<H, C> {
573        Instance {
574            engine: self.engine.clone(),
575            pre: self.instance_pre.clone(),
576            handler,
577            max_execution_time: self.max_execution_time,
578            events,
579            experimental_features: self.experimental_features,
580        }
581    }
582
583    /// Serve all exports of this [Component] using supplied [`wrpc_transport::Serve`]
584    ///
585    /// The returned [Vec] contains an [InvocationStream] per each function exported by the component.
586    /// A [`WrpcServeEvent`] containing the incoming [`wrpc_transport::Serve::Context`] will be sent
587    /// on completion of each invocation.
588    /// The supplied [`Handler`] will be used to satisfy imports.
589    #[instrument(level = "debug", skip_all)]
590    pub async fn serve_wrpc<S>(
591        &self,
592        srv: &S,
593        handler: H,
594        events: mpsc::Sender<WrpcServeEvent<S::Context>>,
595    ) -> anyhow::Result<Vec<InvocationStream>>
596    where
597        S: wrpc_transport::Serve,
598        S::Context: Deref<Target = tracing::Span>,
599    {
600        let max_execution_time = self.max_execution_time;
601        let mut invocations = vec![];
602        let instance = self.instantiate(handler.clone(), events.clone());
603        for (name, ty) in self
604            .instance_pre
605            .component()
606            .component_type()
607            .exports(&self.engine)
608        {
609            match (name, ty) {
610                (_, types::ComponentItem::ComponentInstance(..))
611                    if name.starts_with("wasi:http/incoming-handler@0.2") =>
612                {
613                    let instance = instance.clone();
614                    let [(_, _, handle)] = wrpc_interface_http::bindings::exports::wrpc::http::incoming_handler::serve_interface(
615                        srv,
616                        wrpc_interface_http::ServeWasmtime(instance),
617                    )
618                    .await
619                    .context("failed to serve `wrpc:http/incoming-handler`")?;
620                    invocations.push(handle);
621                }
622                (
623                    "wasmcloud:messaging/handler@0.2.0"
624                    | "wasmcloud:messaging/incoming-handler@0.3.0",
625                    types::ComponentItem::ComponentInstance(..),
626                ) => {
627                    let instance = instance.clone();
628                    let [(_, _, handle_message)] =
629                        wrpc::exports::wasmcloud::messaging0_2_0::handler::serve_interface(
630                            srv, instance,
631                        )
632                        .await
633                        .context("failed to serve `wasmcloud:messaging/handler`")?;
634                    invocations.push(handle_message);
635                }
636                (
637                    "wasi:keyvalue/watcher@0.2.0-draft",
638                    types::ComponentItem::ComponentInstance(..),
639                ) => {
640                    let instance = instance.clone();
641                    let [(_, _, on_set), (_, _, on_delete)] =
642                        wrpc::exports::wrpc::keyvalue::watcher::serve_interface(srv, instance)
643                            .await
644                            .context("failed to serve `wrpc:keyvalue/watcher`")?;
645                    invocations.push(on_set);
646                    invocations.push(on_delete);
647                }
648                (name, types::ComponentItem::ComponentFunc(ty)) => {
649                    let engine = self.engine.clone();
650                    let handler = handler.clone();
651                    let pre = self.instance_pre.clone();
652                    debug!(?name, "serving root function");
653                    let func = srv
654                        .serve_function(
655                            move || {
656                                let span = info_span!("call_instance_function");
657                                let mut store =
658                                    new_store(&engine, handler.clone(), max_execution_time);
659                                store.data_mut().parent_context = Some(span.context());
660                                store
661                            },
662                            pre,
663                            Arc::clone(&self.host_resources),
664                            ty,
665                            "",
666                            name,
667                        )
668                        .await
669                        .context("failed to serve root function")?;
670                    let events = events.clone();
671                    invocations.push(Box::pin(func.map_ok(move |(cx, res)| {
672                        let events = events.clone();
673                        let span = cx.deref().clone();
674                        Box::pin(
675                            async move {
676                                let res =
677                                    res.instrument(info_span!("handle_instance_function")).await;
678                                let success = res.is_ok();
679                                if let Err(err) =
680                                    events.try_send(WrpcServeEvent::DynamicExportReturned {
681                                        context: cx,
682                                        success,
683                                    })
684                                {
685                                    warn!(
686                                        ?err,
687                                        success, "failed to send dynamic root export return event"
688                                    );
689                                }
690                                res
691                            }
692                            .instrument(span),
693                        )
694                            as Pin<Box<dyn Future<Output = _> + Send + 'static>>
695                    })));
696                }
697                (_, types::ComponentItem::CoreFunc(_)) => {
698                    warn!(name, "serving root core function exports not supported yet");
699                }
700                (_, types::ComponentItem::Module(_)) => {
701                    warn!(name, "serving root module exports not supported yet");
702                }
703                (_, types::ComponentItem::Component(_)) => {
704                    warn!(name, "serving root component exports not supported yet");
705                }
706                (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
707                    for (name, ty) in ty.exports(&self.engine) {
708                        match ty {
709                            types::ComponentItem::ComponentFunc(ty) => {
710                                let engine = self.engine.clone();
711                                let handler = handler.clone();
712                                let pre = self.instance_pre.clone();
713                                debug!(?instance_name, ?name, "serving instance function");
714                                let func = srv
715                                    .serve_function(
716                                        move || {
717                                            let span = info_span!("call_instance_function");
718                                            let mut store = new_store(
719                                                &engine,
720                                                handler.clone(),
721                                                max_execution_time,
722                                            );
723                                            store.data_mut().parent_context = Some(span.context());
724                                            store
725                                        },
726                                        pre,
727                                        Arc::clone(&self.host_resources),
728                                        ty,
729                                        instance_name,
730                                        name,
731                                    )
732                                    .await
733                                    .context("failed to serve instance function")?;
734                                let events = events.clone();
735                                invocations.push(Box::pin(func.map_ok(move |(cx, res)| {
736                                    let events = events.clone();
737                                    let span = cx.deref().clone();
738                                    Box::pin(
739                                        async move {
740                                            let res = res.await;
741                                            let success = res.is_ok();
742                                            if let Err(err) = events.try_send(
743                                                WrpcServeEvent::DynamicExportReturned {
744                                                    context: cx,
745                                                    success,
746                                                },
747                                            ) {
748                                                warn!(
749                                                    ?err,
750                                                    success,
751                                                    "failed to send dynamic instance export return event"
752                                                );
753                                            }
754                                            res
755                                        }
756                                        .instrument(span),
757                                    )
758                                        as Pin<Box<dyn Future<Output = _> + Send + 'static>>
759                                })));
760                            }
761                            types::ComponentItem::CoreFunc(_) => {
762                                warn!(
763                                    instance_name,
764                                    name,
765                                    "serving instance core function exports not supported yet"
766                                );
767                            }
768                            types::ComponentItem::Module(_) => {
769                                warn!(
770                                    instance_name,
771                                    name, "serving instance module exports not supported yet"
772                                );
773                            }
774                            types::ComponentItem::Component(_) => {
775                                warn!(
776                                    instance_name,
777                                    name, "serving instance component exports not supported yet"
778                                );
779                            }
780                            types::ComponentItem::ComponentInstance(_) => {
781                                warn!(
782                                    instance_name,
783                                    name, "serving nested instance exports not supported yet"
784                                );
785                            }
786                            types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
787                        }
788                    }
789                }
790                (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
791            }
792        }
793        Ok(invocations)
794    }
795}
796
797impl<H> From<Component<H>> for Option<jwt::Claims<jwt::Component>>
798where
799    H: Handler,
800{
801    fn from(Component { claims, .. }: Component<H>) -> Self {
802        claims
803    }
804}
805
806/// Instantiated component
807pub struct Instance<H, C>
808where
809    H: Handler,
810{
811    engine: wasmtime::Engine,
812    pre: wasmtime::component::InstancePre<Ctx<H>>,
813    handler: H,
814    max_execution_time: Duration,
815    events: mpsc::Sender<WrpcServeEvent<C>>,
816    experimental_features: Features,
817}
818
819impl<H, C> Clone for Instance<H, C>
820where
821    H: Handler,
822{
823    fn clone(&self) -> Self {
824        Self {
825            engine: self.engine.clone(),
826            pre: self.pre.clone(),
827            handler: self.handler.clone(),
828            max_execution_time: self.max_execution_time,
829            events: self.events.clone(),
830            experimental_features: self.experimental_features,
831        }
832    }
833}
834
835type TableResult<T> = Result<T, ResourceTableError>;
836
837pub(crate) struct Ctx<H>
838where
839    H: Handler,
840{
841    handler: H,
842    wasi: WasiCtx,
843    http: WasiHttpCtx,
844    table: ResourceTable,
845    shared_resources: SharedResourceTable,
846    timeout: Duration,
847    parent_context: Option<opentelemetry::Context>,
848}
849
850impl<H: Handler> IoView for Ctx<H> {
851    fn table(&mut self) -> &mut ResourceTable {
852        &mut self.table
853    }
854}
855
856impl<H: Handler> WasiView for Ctx<H> {
857    fn ctx(&mut self) -> &mut WasiCtx {
858        &mut self.wasi
859    }
860}
861
862impl<H: Handler> WrpcView for Ctx<H> {
863    type Invoke = H;
864
865    fn context(&self) -> H::Context {
866        None
867    }
868
869    fn client(&self) -> &H {
870        &self.handler
871    }
872
873    fn shared_resources(&mut self) -> &mut SharedResourceTable {
874        &mut self.shared_resources
875    }
876
877    fn timeout(&self) -> Option<Duration> {
878        Some(self.timeout)
879    }
880}
881
882impl<H: Handler> Debug for Ctx<H> {
883    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
884        f.debug_struct("Ctx").field("runtime", &"wasmtime").finish()
885    }
886}
887
888impl<H: Handler> Ctx<H> {
889    fn attach_parent_context(&self) {
890        if let Some(context) = self.parent_context.as_ref() {
891            Span::current().set_parent(context.clone());
892        }
893    }
894}