wasmcloud_runtime/component/
mod.rs

1use core::fmt::{self, Debug};
2use core::future::Future;
3use core::ops::Deref;
4use core::pin::Pin;
5use core::time::Duration;
6
7use anyhow::{ensure, Context as _};
8use futures::{Stream, TryStreamExt as _};
9use tokio::io::{AsyncRead, AsyncReadExt as _};
10use tokio::sync::mpsc;
11use tracing::{debug, info_span, instrument, warn, Instrument as _, Span};
12use tracing_opentelemetry::OpenTelemetrySpanExt;
13use wascap::jwt;
14use wascap::wasm::extract_claims;
15use wasi_preview1_component_adapter_provider::{
16    WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER,
17};
18use wasmtime::component::{types, Linker, ResourceTable, ResourceTableError};
19use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiView};
20use wasmtime_wasi_http::WasiHttpCtx;
21use wrpc_runtime_wasmtime::{
22    collect_component_resources, link_item, ServeExt as _, SharedResourceTable, WrpcView,
23};
24
25use crate::capability::{self, wrpc};
26use crate::experimental::Features;
27use crate::Runtime;
28
29pub use bus::Bus;
30pub use bus1_0_0::Bus as Bus1_0_0;
31pub use config::Config;
32pub use identity::Identity;
33pub use logging::Logging;
34pub use messaging::v0_2::Messaging as Messaging0_2;
35pub use messaging::v0_3::{
36    Client as MessagingClient0_3, GuestMessage as MessagingGuestMessage0_3,
37    HostMessage as MessagingHostMessage0_3, Messaging as Messaging0_3,
38};
39pub use secrets::Secrets;
40
41pub(crate) mod blobstore;
42mod bus;
43mod bus1_0_0;
44mod config;
45mod http;
46mod identity;
47mod keyvalue;
48mod logging;
49pub(crate) mod messaging;
50mod secrets;
51
52/// Instance target, which is replaced in wRPC
53///
54/// This enum represents the original instance import invoked by the component
55#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
56pub enum ReplacedInstanceTarget {
57    /// `wasi:blobstore/blobstore` instance replacement
58    BlobstoreBlobstore,
59    /// `wasi:blobstore/container` instance replacement
60    BlobstoreContainer,
61    /// `wasi:keyvalue/atomic` instance replacement
62    KeyvalueAtomics,
63    /// `wasi:keyvalue/store` instance replacement
64    KeyvalueStore,
65    /// `wasi:keyvalue/batch` instance replacement
66    KeyvalueBatch,
67    /// `wasi:keyvalue/watch` instance replacment
68    KeyvalueWatch,
69    /// `wasi:http/incoming-handler` instance replacement
70    HttpIncomingHandler,
71    /// `wasi:http/outgoing-handler` instance replacement
72    HttpOutgoingHandler,
73}
74
75fn is_0_2(version: &str, min_patch: u64) -> bool {
76    if let Ok(semver::Version {
77        major,
78        minor,
79        patch,
80        pre,
81        build,
82    }) = version.parse()
83    {
84        major == 0 && minor == 2 && patch >= min_patch && pre.is_empty() && build.is_empty()
85    } else {
86        false
87    }
88}
89
90/// This represents a kind of wRPC invocation error
91pub enum InvocationErrorKind {
92    /// This occurs when the endpoint is not found, for example as would happen when the runtime
93    /// would attempt to call `foo:bar/baz@0.2.0`, but the peer served `foo:bar/baz@0.1.0`.
94    NotFound,
95
96    /// An error kind, which will result in a trap in the component
97    Trap,
98}
99
100/// Implementations of this trait are able to introspect an error returned by wRPC invocations
101pub trait InvocationErrorIntrospect {
102    /// Classify [`InvocationErrorKind`] of an error returned by wRPC
103    fn invocation_error_kind(&self, err: &anyhow::Error) -> InvocationErrorKind;
104}
105
106/// A collection of traits that the host must implement
107pub trait Handler:
108    wrpc_transport::Invoke<Context = Option<ReplacedInstanceTarget>>
109    + Bus
110    + Config
111    + Logging
112    + Secrets
113    + Messaging0_2
114    + Messaging0_3
115    + Identity
116    + InvocationErrorIntrospect
117    + Send
118    + Sync
119    + Clone
120    + 'static
121{
122}
123
124impl<
125        T: wrpc_transport::Invoke<Context = Option<ReplacedInstanceTarget>>
126            + Bus
127            + Config
128            + Logging
129            + Secrets
130            + Messaging0_2
131            + Messaging0_3
132            + Identity
133            + InvocationErrorIntrospect
134            + Send
135            + Sync
136            + Clone
137            + 'static,
138    > Handler for T
139{
140}
141
142/// Component instance configuration
143#[derive(Clone, Debug, Default)]
144pub struct ComponentConfig {
145    /// Whether components are required to be signed to be executed
146    pub require_signature: bool,
147}
148
149/// Extracts and validates claims contained within a WebAssembly binary, if present
150///
151/// # Arguments
152///
153/// * `wasm` - Bytes that constitute a valid WebAssembly binary
154///
155/// # Errors
156///
157/// Fails if either parsing fails, or claims are not valid
158///
159/// # Returns
160/// The token embedded in the component, including the [`jwt::Claims`] and the raw JWT
161pub fn claims_token(wasm: impl AsRef<[u8]>) -> anyhow::Result<Option<jwt::Token<jwt::Component>>> {
162    let Some(claims) = extract_claims(wasm).context("failed to extract module claims")? else {
163        return Ok(None);
164    };
165    let v = jwt::validate_token::<jwt::Component>(&claims.jwt)
166        .context("failed to validate module token")?;
167    ensure!(!v.expired, "token expired at `{}`", v.expires_human);
168    ensure!(
169        !v.cannot_use_yet,
170        "token cannot be used before `{}`",
171        v.not_before_human
172    );
173    ensure!(v.signature_valid, "signature is not valid");
174    Ok(Some(claims))
175}
176
177/// Pre-compiled component [Component], which is cheapily-[Cloneable](Clone)
178#[derive(Clone)]
179pub struct Component<H>
180where
181    H: Handler,
182{
183    engine: wasmtime::Engine,
184    claims: Option<jwt::Claims<jwt::Component>>,
185    instance_pre: wasmtime::component::InstancePre<Ctx<H>>,
186    max_execution_time: Duration,
187    experimental_features: Features,
188}
189
190impl<H> Debug for Component<H>
191where
192    H: Handler,
193{
194    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195        f.debug_struct("Component")
196            .field("claims", &self.claims)
197            .field("runtime", &"wasmtime")
198            .field("max_execution_time", &self.max_execution_time)
199            .finish_non_exhaustive()
200    }
201}
202
203fn new_store<H: Handler>(
204    engine: &wasmtime::Engine,
205    handler: H,
206    max_execution_time: Duration,
207) -> wasmtime::Store<Ctx<H>> {
208    let table = ResourceTable::new();
209    let wasi = WasiCtxBuilder::new()
210        .args(&["main.wasm"]) // TODO: Configure argv[0]
211        .inherit_stderr()
212        .build();
213
214    let mut store = wasmtime::Store::new(
215        engine,
216        Ctx {
217            handler,
218            wasi,
219            http: WasiHttpCtx::new(),
220            table,
221            shared_resources: SharedResourceTable::default(),
222            timeout: max_execution_time,
223            parent_context: None,
224        },
225    );
226    store.set_epoch_deadline(max_execution_time.as_secs());
227    store
228}
229
230/// Events sent by [`Component::serve_wrpc`]
231#[derive(Clone, Debug)]
232pub enum WrpcServeEvent<C> {
233    /// `wasi:http/incoming-handler.handle` return event
234    HttpIncomingHandlerHandleReturned {
235        /// Invocation context
236        context: C,
237        /// Whether the invocation was successfully handled
238        success: bool,
239    },
240    /// `wasmcloud:messaging/handler.handle-message` return event
241    MessagingHandlerHandleMessageReturned {
242        /// Invocation context
243        context: C,
244        /// Whether the invocation was successfully handled
245        success: bool,
246    },
247    /// dynamic export return event
248    DynamicExportReturned {
249        /// Invocation context
250        context: C,
251        /// Whether the invocation was successfully handled
252        success: bool,
253    },
254}
255
256/// This represents a [Stream] of incoming invocations.
257/// Each item represents processing of a single invocation.
258pub type InvocationStream = Pin<
259    Box<
260        dyn Stream<
261                Item = anyhow::Result<
262                    Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'static>>,
263                >,
264            > + Send
265            + 'static,
266    >,
267>;
268
269impl<H> Component<H>
270where
271    H: Handler,
272{
273    /// Extracts [Claims](jwt::Claims) from WebAssembly component and compiles it using [Runtime].
274    ///
275    /// If `wasm` represents a core Wasm module, then it will first be turned into a component.
276    #[instrument(level = "trace", skip_all)]
277    pub fn new(rt: &Runtime, wasm: &[u8]) -> anyhow::Result<Self> {
278        if wasmparser::Parser::is_core_wasm(wasm) {
279            let wasm = wit_component::ComponentEncoder::default()
280                .module(wasm)
281                .context("failed to set core component module")?
282                .adapter(
283                    WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME,
284                    WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER,
285                )
286                .context("failed to add WASI preview1 adapter")?
287                .encode()
288                .context("failed to encode a component from module")?;
289            return Self::new(rt, &wasm);
290        }
291        let engine = rt.engine.clone();
292        let claims_token = claims_token(wasm)?;
293        let claims = claims_token.map(|c| c.claims);
294        let component = wasmtime::component::Component::new(&engine, wasm)
295            .context("failed to compile component")?;
296
297        let mut linker = Linker::new(&engine);
298
299        wasmtime_wasi::add_to_linker_async(&mut linker)
300            .context("failed to link core WASI interfaces")?;
301        wasmtime_wasi_http::add_only_http_to_linker_async(&mut linker)
302            .context("failed to link `wasi:http`")?;
303
304        capability::blobstore::blobstore::add_to_linker(&mut linker, |ctx| ctx)
305            .context("failed to link `wasi:blobstore/blobstore`")?;
306        capability::blobstore::container::add_to_linker(&mut linker, |ctx| ctx)
307            .context("failed to link `wasi:blobstore/container`")?;
308        capability::blobstore::types::add_to_linker(&mut linker, |ctx| ctx)
309            .context("failed to link `wasi:blobstore/types`")?;
310        capability::config::runtime::add_to_linker(&mut linker, |ctx| ctx)
311            .context("failed to link `wasi:config/runtime`")?;
312        capability::config::store::add_to_linker(&mut linker, |ctx| ctx)
313            .context("failed to link `wasi:config/store`")?;
314        capability::keyvalue::atomics::add_to_linker(&mut linker, |ctx| ctx)
315            .context("failed to link `wasi:keyvalue/atomics`")?;
316        capability::keyvalue::store::add_to_linker(&mut linker, |ctx| ctx)
317            .context("failed to link `wasi:keyvalue/store`")?;
318        capability::keyvalue::batch::add_to_linker(&mut linker, |ctx| ctx)
319            .context("failed to link `wasi:keyvalue/batch`")?;
320        capability::logging::logging::add_to_linker(&mut linker, |ctx| ctx)
321            .context("failed to link `wasi:logging/logging`")?;
322        capability::unversioned_logging::logging::add_to_linker(&mut linker, |ctx| ctx)
323            .context("failed to link unversioned `wasi:logging/logging`")?;
324
325        capability::bus1_0_0::lattice::add_to_linker(&mut linker, |ctx| ctx)
326            .context("failed to link `wasmcloud:bus/lattice@1.0.0`")?;
327        capability::bus2_0_0::lattice::add_to_linker(&mut linker, |ctx| ctx)
328            .context("failed to link `wasmcloud:bus/lattice@2.0.0`")?;
329        capability::messaging0_2_0::types::add_to_linker(&mut linker, |ctx| ctx)
330            .context("failed to link `wasmcloud:messaging/types@0.2.0`")?;
331        capability::messaging0_2_0::consumer::add_to_linker(&mut linker, |ctx| ctx)
332            .context("failed to link `wasmcloud:messaging/consumer@0.2.0`")?;
333        capability::secrets::reveal::add_to_linker(&mut linker, |ctx| ctx)
334            .context("failed to link `wasmcloud:secrets/reveal`")?;
335        capability::secrets::store::add_to_linker(&mut linker, |ctx| ctx)
336            .context("failed to link `wasmcloud:secrets/store`")?;
337        // Only link wasmcloud:messaging@v3 if the feature is enabled
338        if rt.experimental_features.wasmcloud_messaging_v3 {
339            capability::messaging0_3_0::types::add_to_linker(&mut linker, |ctx| ctx)
340                .context("failed to link `wasmcloud:messaging/types@0.3.0`")?;
341            capability::messaging0_3_0::producer::add_to_linker(&mut linker, |ctx| ctx)
342                .context("failed to link `wasmcloud:messaging/producer@0.3.0`")?;
343            capability::messaging0_3_0::request_reply::add_to_linker(&mut linker, |ctx| ctx)
344                .context("failed to link `wasmcloud:messaging/request-reply@0.3.0`")?;
345        }
346        // Only link wasmcloud:identity if the workload identity feature is enabled
347        if rt.experimental_features.workload_identity_interface {
348            capability::identity::store::add_to_linker(&mut linker, |ctx| ctx)
349                .context("failed to link `wasmcloud:identity/store`")?;
350        }
351
352        let ty = component.component_type();
353        let mut guest_resources = Vec::new();
354        collect_component_resources(&engine, &ty, &mut guest_resources);
355        if !guest_resources.is_empty() {
356            warn!("exported component resources are not supported in wasmCloud runtime and will be ignored, use a provider instead to enable this functionality");
357        }
358        for (name, ty) in ty.imports(&engine) {
359            // Don't link builtin instances or feature-gated instances if the feature is disabled
360            match name.split_once('/').map(|(pkg, suffix)| {
361                suffix
362                    .split_once('@')
363                    .map_or((pkg, suffix, None), |(iface, version)| {
364                        (pkg, iface, Some(version))
365                    })
366            }) {
367                Some(
368                    ("wasi:blobstore", "blobstore" | "container" | "types", Some("0.2.0-draft"))
369                    | ("wasi:config", "runtime" | "store", Some("0.2.0-draft"))
370                    | ("wasi:keyvalue", "atomics" | "batch" | "store", Some("0.2.0-draft"))
371                    | ("wasi:logging", "logging", None | Some("0.1.0-draft"))
372                    | ("wasmcloud:bus", "lattice", Some("1.0.0" | "2.0.0"))
373                    | ("wasmcloud:messaging", "consumer" | "types", Some("0.2.0"))
374                    | ("wasmcloud:secrets", "reveal" | "store", Some("0.1.0-draft")),
375                ) => {}
376                Some((
377                    "wasi:cli",
378                    "environment" | "exit" | "stderr" | "stdin" | "stdout" | "terminal-input"
379                    | "terminal-output" | "terminal-stderr" | "terminal-stdin" | "terminal-stdout",
380                    Some(version),
381                )) if is_0_2(version, 0) => {}
382                Some(("wasi:clocks", "monotonic-clock" | "wall-clock", Some(version)))
383                    if is_0_2(version, 0) => {}
384                Some(("wasi:clocks", "timezone", Some(version))) if is_0_2(version, 1) => {}
385                Some(("wasi:filesystem", "preopens" | "types", Some(version)))
386                    if is_0_2(version, 0) => {}
387                Some((
388                    "wasi:http",
389                    "incoming-handler" | "outgoing-handler" | "types",
390                    Some(version),
391                )) if is_0_2(version, 0) => {}
392                Some(("wasi:io", "error" | "poll" | "streams", Some(version)))
393                    if is_0_2(version, 0) => {}
394                Some(("wasi:random", "insecure-seed" | "insecure" | "random", Some(version)))
395                    if is_0_2(version, 0) => {}
396                Some((
397                    "wasi:sockets",
398                    "instance-network" | "ip-name-lookup" | "network" | "tcp-create-socket" | "tcp"
399                    | "udp-create-socket" | "udp",
400                    Some(version),
401                )) if is_0_2(version, 0) => {}
402                _ if rt.skip_feature_gated_instance(name) => {}
403                _ => link_item(&engine, &mut linker.root(), [], ty, "", name, None)
404                    .context("failed to link item")?,
405            };
406        }
407        let instance_pre = linker.instantiate_pre(&component)?;
408        Ok(Self {
409            engine,
410            claims,
411            instance_pre,
412            max_execution_time: rt.max_execution_time,
413            experimental_features: rt.experimental_features,
414        })
415    }
416
417    /// Sets maximum execution time for functionality exported by this component.
418    /// Values below 1 second will be interpreted as 1 second.
419    #[instrument(level = "trace", skip_all)]
420    pub fn set_max_execution_time(&mut self, max_execution_time: Duration) -> &mut Self {
421        self.max_execution_time = max_execution_time.max(Duration::from_secs(1));
422        self
423    }
424
425    /// Reads the WebAssembly binary asynchronously and calls [Component::new].
426    ///
427    /// # Errors
428    ///
429    /// Fails if either reading `wasm` fails or [Self::new] fails
430    #[instrument(level = "trace", skip_all)]
431    pub async fn read(rt: &Runtime, mut wasm: impl AsyncRead + Unpin) -> anyhow::Result<Self> {
432        let mut buf = Vec::new();
433        wasm.read_to_end(&mut buf)
434            .await
435            .context("failed to read Wasm")?;
436        Self::new(rt, &buf)
437    }
438
439    /// Reads the WebAssembly binary synchronously and calls [Component::new].
440    ///
441    /// # Errors
442    ///
443    /// Fails if either reading `wasm` fails or [Self::new] fails
444    #[instrument(level = "trace", skip_all)]
445    pub fn read_sync(rt: &Runtime, mut wasm: impl std::io::Read) -> anyhow::Result<Self> {
446        let mut buf = Vec::new();
447        wasm.read_to_end(&mut buf).context("failed to read Wasm")?;
448        Self::new(rt, &buf)
449    }
450
451    /// [Claims](jwt::Claims) associated with this [Component].
452    #[instrument(level = "trace")]
453    pub fn claims(&self) -> Option<&jwt::Claims<jwt::Component>> {
454        self.claims.as_ref()
455    }
456
457    /// Instantiates the component given a handler and event channel
458    pub fn instantiate<C>(
459        &self,
460        handler: H,
461        events: mpsc::Sender<WrpcServeEvent<C>>,
462    ) -> Instance<H, C> {
463        Instance {
464            engine: self.engine.clone(),
465            pre: self.instance_pre.clone(),
466            handler,
467            max_execution_time: self.max_execution_time,
468            events,
469            experimental_features: self.experimental_features,
470        }
471    }
472
473    /// Serve all exports of this [Component] using supplied [`wrpc_transport::Serve`]
474    ///
475    /// The returned [Vec] contains an [InvocationStream] per each function exported by the component.
476    /// A [`WrpcServeEvent`] containing the incoming [`wrpc_transport::Serve::Context`] will be sent
477    /// on completion of each invocation.
478    /// The supplied [`Handler`] will be used to satisfy imports.
479    #[instrument(level = "debug", skip_all)]
480    pub async fn serve_wrpc<S>(
481        &self,
482        srv: &S,
483        handler: H,
484        events: mpsc::Sender<WrpcServeEvent<S::Context>>,
485    ) -> anyhow::Result<Vec<InvocationStream>>
486    where
487        S: wrpc_transport::Serve,
488        S::Context: Deref<Target = tracing::Span>,
489    {
490        let max_execution_time = self.max_execution_time;
491        let mut invocations = vec![];
492        let instance = self.instantiate(handler.clone(), events.clone());
493        for (name, ty) in self
494            .instance_pre
495            .component()
496            .component_type()
497            .exports(&self.engine)
498        {
499            match (name, ty) {
500                (_, types::ComponentItem::ComponentInstance(..))
501                    if name.starts_with("wasi:http/incoming-handler@0.2") =>
502                {
503                    let instance = instance.clone();
504                    let [(_, _, handle)] = wrpc_interface_http::bindings::exports::wrpc::http::incoming_handler::serve_interface(
505                        srv,
506                        wrpc_interface_http::ServeWasmtime(instance),
507                    )
508                    .await
509                    .context("failed to serve `wrpc:http/incoming-handler`")?;
510                    invocations.push(handle);
511                }
512                (
513                    "wasmcloud:messaging/handler@0.2.0"
514                    | "wasmcloud:messaging/incoming-handler@0.3.0",
515                    types::ComponentItem::ComponentInstance(..),
516                ) => {
517                    let instance = instance.clone();
518                    let [(_, _, handle_message)] =
519                        wrpc::exports::wasmcloud::messaging0_2_0::handler::serve_interface(
520                            srv, instance,
521                        )
522                        .await
523                        .context("failed to serve `wasmcloud:messaging/handler`")?;
524                    invocations.push(handle_message);
525                }
526                (
527                    "wasi:keyvalue/watcher@0.2.0-draft",
528                    types::ComponentItem::ComponentInstance(..),
529                ) => {
530                    let instance = instance.clone();
531                    let [(_, _, on_set), (_, _, on_delete)] =
532                        wrpc::exports::wrpc::keyvalue::watcher::serve_interface(srv, instance)
533                            .await
534                            .context("failed to serve `wrpc:keyvalue/watcher`")?;
535                    invocations.push(on_set);
536                    invocations.push(on_delete);
537                }
538                (name, types::ComponentItem::ComponentFunc(ty)) => {
539                    let engine = self.engine.clone();
540                    let handler = handler.clone();
541                    let pre = self.instance_pre.clone();
542                    debug!(?name, "serving root function");
543                    let func = srv
544                        .serve_function(
545                            move || {
546                                let span = info_span!("call_instance_function");
547                                let mut store =
548                                    new_store(&engine, handler.clone(), max_execution_time);
549                                store.data_mut().parent_context = Some(span.context());
550                                store
551                            },
552                            pre,
553                            ty,
554                            "",
555                            name,
556                        )
557                        .await
558                        .context("failed to serve root function")?;
559                    let events = events.clone();
560                    invocations.push(Box::pin(func.map_ok(move |(cx, res)| {
561                        let events = events.clone();
562                        let span = cx.deref().clone();
563                        Box::pin(
564                            async move {
565                                let res =
566                                    res.instrument(info_span!("handle_instance_function")).await;
567                                let success = res.is_ok();
568                                if let Err(err) =
569                                    events.try_send(WrpcServeEvent::DynamicExportReturned {
570                                        context: cx,
571                                        success,
572                                    })
573                                {
574                                    warn!(
575                                        ?err,
576                                        success, "failed to send dynamic root export return event"
577                                    );
578                                }
579                                res
580                            }
581                            .instrument(span),
582                        )
583                            as Pin<Box<dyn Future<Output = _> + Send + 'static>>
584                    })));
585                }
586                (_, types::ComponentItem::CoreFunc(_)) => {
587                    warn!(name, "serving root core function exports not supported yet");
588                }
589                (_, types::ComponentItem::Module(_)) => {
590                    warn!(name, "serving root module exports not supported yet");
591                }
592                (_, types::ComponentItem::Component(_)) => {
593                    warn!(name, "serving root component exports not supported yet");
594                }
595                (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
596                    for (name, ty) in ty.exports(&self.engine) {
597                        match ty {
598                            types::ComponentItem::ComponentFunc(ty) => {
599                                let engine = self.engine.clone();
600                                let handler = handler.clone();
601                                let pre = self.instance_pre.clone();
602                                debug!(?instance_name, ?name, "serving instance function");
603                                let func = srv
604                                    .serve_function(
605                                        move || {
606                                            let span = info_span!("call_instance_function");
607                                            let mut store = new_store(
608                                                &engine,
609                                                handler.clone(),
610                                                max_execution_time,
611                                            );
612                                            store.data_mut().parent_context = Some(span.context());
613                                            store
614                                        },
615                                        pre,
616                                        ty,
617                                        instance_name,
618                                        name,
619                                    )
620                                    .await
621                                    .context("failed to serve instance function")?;
622                                let events = events.clone();
623                                invocations.push(Box::pin(func.map_ok(move |(cx, res)| {
624                                    let events = events.clone();
625                                    let span = cx.deref().clone();
626                                    Box::pin(
627                                        async move {
628                                            let res = res.await;
629                                            let success = res.is_ok();
630                                            if let Err(err) = events.try_send(
631                                                WrpcServeEvent::DynamicExportReturned {
632                                                    context: cx,
633                                                    success,
634                                                },
635                                            ) {
636                                                warn!(
637                                                    ?err,
638                                                    success,
639                                                    "failed to send dynamic instance export return event"
640                                                );
641                                            }
642                                            res
643                                        }
644                                        .instrument(span),
645                                    )
646                                        as Pin<Box<dyn Future<Output = _> + Send + 'static>>
647                                })));
648                            }
649                            types::ComponentItem::CoreFunc(_) => {
650                                warn!(
651                                    instance_name,
652                                    name,
653                                    "serving instance core function exports not supported yet"
654                                );
655                            }
656                            types::ComponentItem::Module(_) => {
657                                warn!(
658                                    instance_name,
659                                    name, "serving instance module exports not supported yet"
660                                );
661                            }
662                            types::ComponentItem::Component(_) => {
663                                warn!(
664                                    instance_name,
665                                    name, "serving instance component exports not supported yet"
666                                );
667                            }
668                            types::ComponentItem::ComponentInstance(_) => {
669                                warn!(
670                                    instance_name,
671                                    name, "serving nested instance exports not supported yet"
672                                );
673                            }
674                            types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
675                        }
676                    }
677                }
678                (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
679            }
680        }
681        Ok(invocations)
682    }
683}
684
685impl<H> From<Component<H>> for Option<jwt::Claims<jwt::Component>>
686where
687    H: Handler,
688{
689    fn from(Component { claims, .. }: Component<H>) -> Self {
690        claims
691    }
692}
693
694/// Instantiated component
695pub struct Instance<H, C>
696where
697    H: Handler,
698{
699    engine: wasmtime::Engine,
700    pre: wasmtime::component::InstancePre<Ctx<H>>,
701    handler: H,
702    max_execution_time: Duration,
703    events: mpsc::Sender<WrpcServeEvent<C>>,
704    experimental_features: Features,
705}
706
707impl<H, C> Clone for Instance<H, C>
708where
709    H: Handler,
710{
711    fn clone(&self) -> Self {
712        Self {
713            engine: self.engine.clone(),
714            pre: self.pre.clone(),
715            handler: self.handler.clone(),
716            max_execution_time: self.max_execution_time,
717            events: self.events.clone(),
718            experimental_features: self.experimental_features,
719        }
720    }
721}
722
723type TableResult<T> = Result<T, ResourceTableError>;
724
725pub(crate) struct Ctx<H>
726where
727    H: Handler,
728{
729    handler: H,
730    wasi: WasiCtx,
731    http: WasiHttpCtx,
732    table: ResourceTable,
733    shared_resources: SharedResourceTable,
734    timeout: Duration,
735    parent_context: Option<opentelemetry::Context>,
736}
737
738impl<H: Handler> WasiView for Ctx<H> {
739    fn table(&mut self) -> &mut ResourceTable {
740        &mut self.table
741    }
742
743    fn ctx(&mut self) -> &mut WasiCtx {
744        &mut self.wasi
745    }
746}
747
748impl<H: Handler> WrpcView for Ctx<H> {
749    type Invoke = H;
750
751    fn client(&self) -> &H {
752        &self.handler
753    }
754
755    fn shared_resources(&mut self) -> &mut SharedResourceTable {
756        &mut self.shared_resources
757    }
758
759    fn timeout(&self) -> Option<Duration> {
760        Some(self.timeout)
761    }
762}
763
764impl<H: Handler> Debug for Ctx<H> {
765    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
766        f.debug_struct("Ctx").field("runtime", &"wasmtime").finish()
767    }
768}
769
770impl<H: Handler> Ctx<H> {
771    fn attach_parent_context(&self) {
772        if let Some(context) = self.parent_context.as_ref() {
773            Span::current().set_parent(context.clone());
774        }
775    }
776}