1use core::fmt::{self, Debug};
2use core::future::Future;
3use core::ops::{Bound, Deref};
4use core::pin::Pin;
5use core::time::Duration;
6
7use std::collections::{BTreeMap, HashMap};
8use std::sync::Arc;
9
10use anyhow::{ensure, Context as _};
11use futures::{Stream, TryStreamExt as _};
12use tokio::io::{AsyncRead, AsyncReadExt as _};
13use tokio::sync::mpsc;
14use tracing::{debug, info_span, instrument, warn, Instrument as _, Span};
15use tracing_opentelemetry::OpenTelemetrySpanExt;
16use wascap::jwt;
17use wascap::wasm::extract_claims;
18use wasi_preview1_component_adapter_provider::{
19 WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER,
20};
21use wasmtime::component::{types, Linker, ResourceTable, ResourceTableError, ResourceType};
22use wasmtime_wasi::{IoView, WasiCtx, WasiCtxBuilder, WasiView};
23use wasmtime_wasi_http::WasiHttpCtx;
24use wrpc_runtime_wasmtime::{
25 collect_component_resource_exports, collect_component_resource_imports, link_item, rpc,
26 RemoteResource, ServeExt as _, SharedResourceTable, WrpcView,
27};
28
29use crate::capability::{self, wrpc};
30use crate::experimental::Features;
31use crate::Runtime;
32
33pub use bus::{Bus, Error};
34pub use bus1_0_0::Bus as Bus1_0_0;
35pub use config::Config;
36pub use identity::Identity;
37pub use logging::Logging;
38pub use messaging::v0_2::Messaging as Messaging0_2;
39pub use messaging::v0_3::{
40 Client as MessagingClient0_3, GuestMessage as MessagingGuestMessage0_3,
41 HostMessage as MessagingHostMessage0_3, Messaging as Messaging0_3,
42};
43pub use secrets::Secrets;
44
45pub(crate) mod blobstore;
46mod bus;
47mod bus1_0_0;
48mod config;
49mod http;
50mod identity;
51mod keyvalue;
52mod logging;
53pub(crate) mod messaging;
54mod secrets;
55
56#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
60pub enum ReplacedInstanceTarget {
61 BlobstoreBlobstore,
63 BlobstoreContainer,
65 KeyvalueAtomics,
67 KeyvalueStore,
69 KeyvalueBatch,
71 KeyvalueWatch,
73 HttpIncomingHandler,
75 HttpOutgoingHandler,
77}
78
79fn is_0_2(version: &str, min_patch: u64) -> bool {
80 if let Ok(semver::Version {
81 major,
82 minor,
83 patch,
84 pre,
85 build,
86 }) = version.parse()
87 {
88 major == 0 && minor == 2 && patch >= min_patch && pre.is_empty() && build.is_empty()
89 } else {
90 false
91 }
92}
93
94pub enum InvocationErrorKind {
96 NotFound,
99
100 Trap,
102}
103
104pub trait InvocationErrorIntrospect {
106 fn invocation_error_kind(&self, err: &anyhow::Error) -> InvocationErrorKind;
108}
109
110pub trait Handler:
112 wrpc_transport::Invoke<Context = Option<ReplacedInstanceTarget>>
113 + Bus
114 + Config
115 + Logging
116 + Secrets
117 + Messaging0_2
118 + Messaging0_3
119 + Identity
120 + InvocationErrorIntrospect
121 + Send
122 + Sync
123 + Clone
124 + 'static
125{
126}
127
128impl<
129 T: wrpc_transport::Invoke<Context = Option<ReplacedInstanceTarget>>
130 + Bus
131 + Config
132 + Logging
133 + Secrets
134 + Messaging0_2
135 + Messaging0_3
136 + Identity
137 + InvocationErrorIntrospect
138 + Send
139 + Sync
140 + Clone
141 + 'static,
142 > Handler for T
143{
144}
145
146#[derive(Clone, Debug, Default)]
148pub struct ComponentConfig {
149 pub require_signature: bool,
151}
152
153pub fn claims_token(wasm: impl AsRef<[u8]>) -> anyhow::Result<Option<jwt::Token<jwt::Component>>> {
166 let Some(claims) = extract_claims(wasm).context("failed to extract module claims")? else {
167 return Ok(None);
168 };
169 let v = jwt::validate_token::<jwt::Component>(&claims.jwt)
170 .context("failed to validate module token")?;
171 ensure!(!v.expired, "token expired at `{}`", v.expires_human);
172 ensure!(
173 !v.cannot_use_yet,
174 "token cannot be used before `{}`",
175 v.not_before_human
176 );
177 ensure!(v.signature_valid, "signature is not valid");
178 Ok(Some(claims))
179}
180
181#[derive(Clone)]
183pub struct Component<H>
184where
185 H: Handler,
186{
187 engine: wasmtime::Engine,
188 claims: Option<jwt::Claims<jwt::Component>>,
189 instance_pre: wasmtime::component::InstancePre<Ctx<H>>,
190 host_resources: Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
191 max_execution_time: Duration,
192 experimental_features: Features,
193}
194
195impl<H> Debug for Component<H>
196where
197 H: Handler,
198{
199 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200 f.debug_struct("Component")
201 .field("claims", &self.claims)
202 .field("runtime", &"wasmtime")
203 .field("max_execution_time", &self.max_execution_time)
204 .finish_non_exhaustive()
205 }
206}
207
208fn new_store<H: Handler>(
209 engine: &wasmtime::Engine,
210 handler: H,
211 max_execution_time: Duration,
212) -> wasmtime::Store<Ctx<H>> {
213 let table = ResourceTable::new();
214 let wasi = WasiCtxBuilder::new()
215 .args(&["main.wasm"]) .inherit_stderr()
217 .build();
218
219 let mut store = wasmtime::Store::new(
220 engine,
221 Ctx {
222 handler,
223 wasi,
224 http: WasiHttpCtx::new(),
225 table,
226 shared_resources: SharedResourceTable::default(),
227 timeout: max_execution_time,
228 parent_context: None,
229 },
230 );
231 store.set_epoch_deadline(max_execution_time.as_secs());
232 store
233}
234
235#[derive(Clone, Debug)]
237pub enum WrpcServeEvent<C> {
238 HttpIncomingHandlerHandleReturned {
240 context: C,
242 success: bool,
244 },
245 MessagingHandlerHandleMessageReturned {
247 context: C,
249 success: bool,
251 },
252 DynamicExportReturned {
254 context: C,
256 success: bool,
258 },
259}
260
261pub type InvocationStream = Pin<
264 Box<
265 dyn Stream<
266 Item = anyhow::Result<
267 Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'static>>,
268 >,
269 > + Send
270 + 'static,
271 >,
272>;
273
274impl<H> Component<H>
275where
276 H: Handler,
277{
278 #[instrument(level = "trace", skip_all)]
282 pub fn new(rt: &Runtime, wasm: &[u8]) -> anyhow::Result<Self> {
283 if wasmparser::Parser::is_core_wasm(wasm) {
284 let wasm = wit_component::ComponentEncoder::default()
285 .module(wasm)
286 .context("failed to set core component module")?
287 .adapter(
288 WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME,
289 WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER,
290 )
291 .context("failed to add WASI preview1 adapter")?
292 .encode()
293 .context("failed to encode a component from module")?;
294 return Self::new(rt, &wasm);
295 }
296 let engine = rt.engine.clone();
297 let claims_token = claims_token(wasm)?;
298 let claims = claims_token.map(|c| c.claims);
299 let component = wasmtime::component::Component::new(&engine, wasm)
300 .context("failed to compile component")?;
301
302 let mut linker = Linker::new(&engine);
303
304 wasmtime_wasi::add_to_linker_async(&mut linker)
305 .context("failed to link core WASI interfaces")?;
306 wasmtime_wasi_http::add_only_http_to_linker_async(&mut linker)
307 .context("failed to link `wasi:http`")?;
308
309 capability::blobstore::blobstore::add_to_linker(&mut linker, |ctx| ctx)
310 .context("failed to link `wasi:blobstore/blobstore`")?;
311 capability::blobstore::container::add_to_linker(&mut linker, |ctx| ctx)
312 .context("failed to link `wasi:blobstore/container`")?;
313 capability::blobstore::types::add_to_linker(&mut linker, |ctx| ctx)
314 .context("failed to link `wasi:blobstore/types`")?;
315 capability::config::runtime::add_to_linker(&mut linker, |ctx| ctx)
316 .context("failed to link `wasi:config/runtime`")?;
317 capability::config::store::add_to_linker(&mut linker, |ctx| ctx)
318 .context("failed to link `wasi:config/store`")?;
319 capability::keyvalue::atomics::add_to_linker(&mut linker, |ctx| ctx)
320 .context("failed to link `wasi:keyvalue/atomics`")?;
321 capability::keyvalue::store::add_to_linker(&mut linker, |ctx| ctx)
322 .context("failed to link `wasi:keyvalue/store`")?;
323 capability::keyvalue::batch::add_to_linker(&mut linker, |ctx| ctx)
324 .context("failed to link `wasi:keyvalue/batch`")?;
325 capability::logging::logging::add_to_linker(&mut linker, |ctx| ctx)
326 .context("failed to link `wasi:logging/logging`")?;
327 capability::unversioned_logging::logging::add_to_linker(&mut linker, |ctx| ctx)
328 .context("failed to link unversioned `wasi:logging/logging`")?;
329
330 capability::bus1_0_0::lattice::add_to_linker(&mut linker, |ctx| ctx)
331 .context("failed to link `wasmcloud:bus/lattice@1.0.0`")?;
332 capability::bus2_0_1::lattice::add_to_linker(&mut linker, |ctx| ctx)
333 .context("failed to link `wasmcloud:bus/lattice@2.0.1`")?;
334 capability::bus2_0_1::error::add_to_linker(&mut linker, |ctx| ctx)
335 .context("failed to link `wasmcloud:bus/error@2.0.1`")?;
336 capability::messaging0_2_0::types::add_to_linker(&mut linker, |ctx| ctx)
337 .context("failed to link `wasmcloud:messaging/types@0.2.0`")?;
338 capability::messaging0_2_0::consumer::add_to_linker(&mut linker, |ctx| ctx)
339 .context("failed to link `wasmcloud:messaging/consumer@0.2.0`")?;
340 capability::secrets::reveal::add_to_linker(&mut linker, |ctx| ctx)
341 .context("failed to link `wasmcloud:secrets/reveal`")?;
342 capability::secrets::store::add_to_linker(&mut linker, |ctx| ctx)
343 .context("failed to link `wasmcloud:secrets/store`")?;
344 if rt.experimental_features.wasmcloud_messaging_v3 {
346 capability::messaging0_3_0::types::add_to_linker(&mut linker, |ctx| ctx)
347 .context("failed to link `wasmcloud:messaging/types@0.3.0`")?;
348 capability::messaging0_3_0::producer::add_to_linker(&mut linker, |ctx| ctx)
349 .context("failed to link `wasmcloud:messaging/producer@0.3.0`")?;
350 capability::messaging0_3_0::request_reply::add_to_linker(&mut linker, |ctx| ctx)
351 .context("failed to link `wasmcloud:messaging/request-reply@0.3.0`")?;
352 }
353 if rt.experimental_features.workload_identity_interface {
355 capability::identity::store::add_to_linker(&mut linker, |ctx| ctx)
356 .context("failed to link `wasmcloud:identity/store`")?;
357 }
358
359 if rt.experimental_features.rpc_interface {
361 rpc::add_to_linker(&mut linker).context("failed to link `wrpc:rpc`")?;
362 }
363
364 let ty = component.component_type();
365 let mut guest_resources = Vec::new();
366 let mut host_resources = BTreeMap::new();
367 collect_component_resource_exports(&engine, &ty, &mut guest_resources);
368 collect_component_resource_imports(&engine, &ty, &mut host_resources);
369 let io_err_tys = host_resources
370 .range::<str, _>((
371 Bound::Included("wasi:io/error@0.2"),
372 Bound::Excluded("wasi:io/error@0.3"),
373 ))
374 .map(|(name, instance)| {
375 instance
376 .get("error")
377 .copied()
378 .with_context(|| format!("{name} instance import missing `error` resource"))
379 })
380 .collect::<anyhow::Result<Box<[_]>>>()?;
381 let io_pollable_tys = host_resources
382 .range::<str, _>((
383 Bound::Included("wasi:io/poll@0.2"),
384 Bound::Excluded("wasi:io/poll@0.3"),
385 ))
386 .map(|(name, instance)| {
387 instance
388 .get("pollable")
389 .copied()
390 .with_context(|| format!("{name} instance import missing `pollable` resource"))
391 })
392 .collect::<anyhow::Result<Box<[_]>>>()?;
393 let io_input_stream_tys = host_resources
394 .range::<str, _>((
395 Bound::Included("wasi:io/streams@0.2"),
396 Bound::Excluded("wasi:io/streams@0.3"),
397 ))
398 .map(|(name, instance)| {
399 instance.get("input-stream").copied().with_context(|| {
400 format!("{name} instance import missing `input-stream` resource")
401 })
402 })
403 .collect::<anyhow::Result<Box<[_]>>>()?;
404 let io_output_stream_tys = host_resources
405 .range::<str, _>((
406 Bound::Included("wasi:io/streams@0.2"),
407 Bound::Excluded("wasi:io/streams@0.3"),
408 ))
409 .map(|(name, instance)| {
410 instance.get("output-stream").copied().with_context(|| {
411 format!("{name} instance import missing `output-stream` resource")
412 })
413 })
414 .collect::<anyhow::Result<Box<[_]>>>()?;
415 let rpc_err_ty = host_resources
416 .get("wrpc:rpc/error@0.1.0")
417 .map(|instance| {
418 instance
419 .get("error")
420 .copied()
421 .context("`wrpc:rpc/error@0.1.0` instance import missing `error` resource")
422 })
423 .transpose()?;
424 let host_resources = host_resources
426 .into_iter()
427 .map(|(name, instance)| {
428 let instance = instance
429 .into_iter()
430 .map(|(name, ty)| {
431 let host_ty = match ty {
432 ty if Some(ty) == rpc_err_ty => ResourceType::host::<rpc::Error>(),
433 ty if io_err_tys.contains(&ty) => {
434 ResourceType::host::<wasmtime_wasi::bindings::io::error::Error>()
435 }
436 ty if io_input_stream_tys.contains(&ty) => ResourceType::host::<
437 wasmtime_wasi::bindings::io::streams::InputStream,
438 >(
439 ),
440 ty if io_output_stream_tys.contains(&ty) => ResourceType::host::<
441 wasmtime_wasi::bindings::io::streams::OutputStream,
442 >(
443 ),
444 ty if io_pollable_tys.contains(&ty) => {
445 ResourceType::host::<wasmtime_wasi::bindings::io::poll::Pollable>()
446 }
447 _ => ResourceType::host::<RemoteResource>(),
448 };
449 (name, (ty, host_ty))
450 })
451 .collect::<HashMap<_, _>>();
452 (name, instance)
453 })
454 .collect::<HashMap<_, _>>();
455 let host_resources = Arc::from(host_resources);
456 if !guest_resources.is_empty() {
457 warn!("exported component resources are not supported in wasmCloud runtime and will be ignored, use a provider instead to enable this functionality");
458 }
459 for (name, ty) in ty.imports(&engine) {
460 match name.split_once('/').map(|(pkg, suffix)| {
462 suffix
463 .split_once('@')
464 .map_or((pkg, suffix, None), |(iface, version)| {
465 (pkg, iface, Some(version))
466 })
467 }) {
468 Some(
469 ("wasi:blobstore", "blobstore" | "container" | "types", Some("0.2.0-draft"))
470 | ("wasi:config", "runtime" | "store", Some("0.2.0-draft"))
471 | ("wasi:keyvalue", "atomics" | "batch" | "store", Some("0.2.0-draft"))
472 | ("wasi:logging", "logging", None | Some("0.1.0-draft"))
473 | ("wasmcloud:bus", "lattice", Some("1.0.0" | "2.0.0"))
474 | ("wasmcloud:messaging", "consumer" | "types", Some("0.2.0"))
475 | ("wasmcloud:secrets", "reveal" | "store", Some("0.1.0-draft")),
476 ) => {}
477 Some((
478 "wasi:cli",
479 "environment" | "exit" | "stderr" | "stdin" | "stdout" | "terminal-input"
480 | "terminal-output" | "terminal-stderr" | "terminal-stdin" | "terminal-stdout",
481 Some(version),
482 )) if is_0_2(version, 0) => {}
483 Some(("wasi:clocks", "monotonic-clock" | "wall-clock", Some(version)))
484 if is_0_2(version, 0) => {}
485 Some(("wasi:clocks", "timezone", Some(version))) if is_0_2(version, 1) => {}
486 Some(("wasi:filesystem", "preopens" | "types", Some(version)))
487 if is_0_2(version, 0) => {}
488 Some((
489 "wasi:http",
490 "incoming-handler" | "outgoing-handler" | "types",
491 Some(version),
492 )) if is_0_2(version, 0) => {}
493 Some(("wasi:io", "error" | "poll" | "streams", Some(version)))
494 if is_0_2(version, 0) => {}
495 Some(("wasi:random", "insecure-seed" | "insecure" | "random", Some(version)))
496 if is_0_2(version, 0) => {}
497 Some((
498 "wasi:sockets",
499 "instance-network" | "ip-name-lookup" | "network" | "tcp-create-socket" | "tcp"
500 | "udp-create-socket" | "udp",
501 Some(version),
502 )) if is_0_2(version, 0) => {}
503 _ if rt.skip_feature_gated_instance(name) => {}
504 _ => link_item(
505 &engine,
506 &mut linker.root(),
507 [],
508 Arc::clone(&host_resources),
509 ty,
510 "",
511 name,
512 )
513 .context("failed to link item")?,
514 }
515 }
516 let instance_pre = linker.instantiate_pre(&component)?;
517 Ok(Self {
518 engine,
519 claims,
520 instance_pre,
521 host_resources,
522 max_execution_time: rt.max_execution_time,
523 experimental_features: rt.experimental_features,
524 })
525 }
526
527 #[instrument(level = "trace", skip_all)]
530 pub fn set_max_execution_time(&mut self, max_execution_time: Duration) -> &mut Self {
531 self.max_execution_time = max_execution_time.max(Duration::from_secs(1));
532 self
533 }
534
535 #[instrument(level = "trace", skip_all)]
541 pub async fn read(rt: &Runtime, mut wasm: impl AsyncRead + Unpin) -> anyhow::Result<Self> {
542 let mut buf = Vec::new();
543 wasm.read_to_end(&mut buf)
544 .await
545 .context("failed to read Wasm")?;
546 Self::new(rt, &buf)
547 }
548
549 #[instrument(level = "trace", skip_all)]
555 pub fn read_sync(rt: &Runtime, mut wasm: impl std::io::Read) -> anyhow::Result<Self> {
556 let mut buf = Vec::new();
557 wasm.read_to_end(&mut buf).context("failed to read Wasm")?;
558 Self::new(rt, &buf)
559 }
560
561 #[instrument(level = "trace")]
563 pub fn claims(&self) -> Option<&jwt::Claims<jwt::Component>> {
564 self.claims.as_ref()
565 }
566
567 pub fn instantiate<C>(
569 &self,
570 handler: H,
571 events: mpsc::Sender<WrpcServeEvent<C>>,
572 ) -> Instance<H, C> {
573 Instance {
574 engine: self.engine.clone(),
575 pre: self.instance_pre.clone(),
576 handler,
577 max_execution_time: self.max_execution_time,
578 events,
579 experimental_features: self.experimental_features,
580 }
581 }
582
583 #[instrument(level = "debug", skip_all)]
590 pub async fn serve_wrpc<S>(
591 &self,
592 srv: &S,
593 handler: H,
594 events: mpsc::Sender<WrpcServeEvent<S::Context>>,
595 ) -> anyhow::Result<Vec<InvocationStream>>
596 where
597 S: wrpc_transport::Serve,
598 S::Context: Deref<Target = tracing::Span>,
599 {
600 let max_execution_time = self.max_execution_time;
601 let mut invocations = vec![];
602 let instance = self.instantiate(handler.clone(), events.clone());
603 for (name, ty) in self
604 .instance_pre
605 .component()
606 .component_type()
607 .exports(&self.engine)
608 {
609 match (name, ty) {
610 (_, types::ComponentItem::ComponentInstance(..))
611 if name.starts_with("wasi:http/incoming-handler@0.2") =>
612 {
613 let instance = instance.clone();
614 let [(_, _, handle)] = wrpc_interface_http::bindings::exports::wrpc::http::incoming_handler::serve_interface(
615 srv,
616 wrpc_interface_http::ServeWasmtime(instance),
617 )
618 .await
619 .context("failed to serve `wrpc:http/incoming-handler`")?;
620 invocations.push(handle);
621 }
622 (
623 "wasmcloud:messaging/handler@0.2.0"
624 | "wasmcloud:messaging/incoming-handler@0.3.0",
625 types::ComponentItem::ComponentInstance(..),
626 ) => {
627 let instance = instance.clone();
628 let [(_, _, handle_message)] =
629 wrpc::exports::wasmcloud::messaging0_2_0::handler::serve_interface(
630 srv, instance,
631 )
632 .await
633 .context("failed to serve `wasmcloud:messaging/handler`")?;
634 invocations.push(handle_message);
635 }
636 (
637 "wasi:keyvalue/watcher@0.2.0-draft",
638 types::ComponentItem::ComponentInstance(..),
639 ) => {
640 let instance = instance.clone();
641 let [(_, _, on_set), (_, _, on_delete)] =
642 wrpc::exports::wrpc::keyvalue::watcher::serve_interface(srv, instance)
643 .await
644 .context("failed to serve `wrpc:keyvalue/watcher`")?;
645 invocations.push(on_set);
646 invocations.push(on_delete);
647 }
648 (name, types::ComponentItem::ComponentFunc(ty)) => {
649 let engine = self.engine.clone();
650 let handler = handler.clone();
651 let pre = self.instance_pre.clone();
652 debug!(?name, "serving root function");
653 let func = srv
654 .serve_function(
655 move || {
656 let span = info_span!("call_instance_function");
657 let mut store =
658 new_store(&engine, handler.clone(), max_execution_time);
659 store.data_mut().parent_context = Some(span.context());
660 store
661 },
662 pre,
663 Arc::clone(&self.host_resources),
664 ty,
665 "",
666 name,
667 )
668 .await
669 .context("failed to serve root function")?;
670 let events = events.clone();
671 invocations.push(Box::pin(func.map_ok(move |(cx, res)| {
672 let events = events.clone();
673 let span = cx.deref().clone();
674 Box::pin(
675 async move {
676 let res =
677 res.instrument(info_span!("handle_instance_function")).await;
678 let success = res.is_ok();
679 if let Err(err) =
680 events.try_send(WrpcServeEvent::DynamicExportReturned {
681 context: cx,
682 success,
683 })
684 {
685 warn!(
686 ?err,
687 success, "failed to send dynamic root export return event"
688 );
689 }
690 res
691 }
692 .instrument(span),
693 )
694 as Pin<Box<dyn Future<Output = _> + Send + 'static>>
695 })));
696 }
697 (_, types::ComponentItem::CoreFunc(_)) => {
698 warn!(name, "serving root core function exports not supported yet");
699 }
700 (_, types::ComponentItem::Module(_)) => {
701 warn!(name, "serving root module exports not supported yet");
702 }
703 (_, types::ComponentItem::Component(_)) => {
704 warn!(name, "serving root component exports not supported yet");
705 }
706 (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
707 for (name, ty) in ty.exports(&self.engine) {
708 match ty {
709 types::ComponentItem::ComponentFunc(ty) => {
710 let engine = self.engine.clone();
711 let handler = handler.clone();
712 let pre = self.instance_pre.clone();
713 debug!(?instance_name, ?name, "serving instance function");
714 let func = srv
715 .serve_function(
716 move || {
717 let span = info_span!("call_instance_function");
718 let mut store = new_store(
719 &engine,
720 handler.clone(),
721 max_execution_time,
722 );
723 store.data_mut().parent_context = Some(span.context());
724 store
725 },
726 pre,
727 Arc::clone(&self.host_resources),
728 ty,
729 instance_name,
730 name,
731 )
732 .await
733 .context("failed to serve instance function")?;
734 let events = events.clone();
735 invocations.push(Box::pin(func.map_ok(move |(cx, res)| {
736 let events = events.clone();
737 let span = cx.deref().clone();
738 Box::pin(
739 async move {
740 let res = res.await;
741 let success = res.is_ok();
742 if let Err(err) = events.try_send(
743 WrpcServeEvent::DynamicExportReturned {
744 context: cx,
745 success,
746 },
747 ) {
748 warn!(
749 ?err,
750 success,
751 "failed to send dynamic instance export return event"
752 );
753 }
754 res
755 }
756 .instrument(span),
757 )
758 as Pin<Box<dyn Future<Output = _> + Send + 'static>>
759 })));
760 }
761 types::ComponentItem::CoreFunc(_) => {
762 warn!(
763 instance_name,
764 name,
765 "serving instance core function exports not supported yet"
766 );
767 }
768 types::ComponentItem::Module(_) => {
769 warn!(
770 instance_name,
771 name, "serving instance module exports not supported yet"
772 );
773 }
774 types::ComponentItem::Component(_) => {
775 warn!(
776 instance_name,
777 name, "serving instance component exports not supported yet"
778 );
779 }
780 types::ComponentItem::ComponentInstance(_) => {
781 warn!(
782 instance_name,
783 name, "serving nested instance exports not supported yet"
784 );
785 }
786 types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
787 }
788 }
789 }
790 (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
791 }
792 }
793 Ok(invocations)
794 }
795}
796
797impl<H> From<Component<H>> for Option<jwt::Claims<jwt::Component>>
798where
799 H: Handler,
800{
801 fn from(Component { claims, .. }: Component<H>) -> Self {
802 claims
803 }
804}
805
806pub struct Instance<H, C>
808where
809 H: Handler,
810{
811 engine: wasmtime::Engine,
812 pre: wasmtime::component::InstancePre<Ctx<H>>,
813 handler: H,
814 max_execution_time: Duration,
815 events: mpsc::Sender<WrpcServeEvent<C>>,
816 experimental_features: Features,
817}
818
819impl<H, C> Clone for Instance<H, C>
820where
821 H: Handler,
822{
823 fn clone(&self) -> Self {
824 Self {
825 engine: self.engine.clone(),
826 pre: self.pre.clone(),
827 handler: self.handler.clone(),
828 max_execution_time: self.max_execution_time,
829 events: self.events.clone(),
830 experimental_features: self.experimental_features,
831 }
832 }
833}
834
835type TableResult<T> = Result<T, ResourceTableError>;
836
837pub(crate) struct Ctx<H>
838where
839 H: Handler,
840{
841 handler: H,
842 wasi: WasiCtx,
843 http: WasiHttpCtx,
844 table: ResourceTable,
845 shared_resources: SharedResourceTable,
846 timeout: Duration,
847 parent_context: Option<opentelemetry::Context>,
848}
849
850impl<H: Handler> IoView for Ctx<H> {
851 fn table(&mut self) -> &mut ResourceTable {
852 &mut self.table
853 }
854}
855
856impl<H: Handler> WasiView for Ctx<H> {
857 fn ctx(&mut self) -> &mut WasiCtx {
858 &mut self.wasi
859 }
860}
861
862impl<H: Handler> WrpcView for Ctx<H> {
863 type Invoke = H;
864
865 fn context(&self) -> H::Context {
866 None
867 }
868
869 fn client(&self) -> &H {
870 &self.handler
871 }
872
873 fn shared_resources(&mut self) -> &mut SharedResourceTable {
874 &mut self.shared_resources
875 }
876
877 fn timeout(&self) -> Option<Duration> {
878 Some(self.timeout)
879 }
880}
881
882impl<H: Handler> Debug for Ctx<H> {
883 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
884 f.debug_struct("Ctx").field("runtime", &"wasmtime").finish()
885 }
886}
887
888impl<H: Handler> Ctx<H> {
889 fn attach_parent_context(&self) {
890 if let Some(context) = self.parent_context.as_ref() {
891 Span::current().set_parent(context.clone());
892 }
893 }
894}