1use core::fmt::{self, Debug};
2use core::future::Future;
3use core::ops::Deref;
4use core::pin::Pin;
5use core::time::Duration;
6
7use anyhow::{ensure, Context as _};
8use futures::{Stream, TryStreamExt as _};
9use tokio::io::{AsyncRead, AsyncReadExt as _};
10use tokio::sync::mpsc;
11use tracing::{debug, info_span, instrument, warn, Instrument as _, Span};
12use tracing_opentelemetry::OpenTelemetrySpanExt;
13use wascap::jwt;
14use wascap::wasm::extract_claims;
15use wasi_preview1_component_adapter_provider::{
16 WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER,
17};
18use wasmtime::component::{types, Linker, ResourceTable, ResourceTableError};
19use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiView};
20use wasmtime_wasi_http::WasiHttpCtx;
21use wrpc_runtime_wasmtime::{
22 collect_component_resources, link_item, ServeExt as _, SharedResourceTable, WrpcView,
23};
24
25use crate::capability::{self, wrpc};
26use crate::experimental::Features;
27use crate::Runtime;
28
29pub use bus::Bus;
30pub use bus1_0_0::Bus as Bus1_0_0;
31pub use config::Config;
32pub use identity::Identity;
33pub use logging::Logging;
34pub use messaging::v0_2::Messaging as Messaging0_2;
35pub use messaging::v0_3::{
36 Client as MessagingClient0_3, GuestMessage as MessagingGuestMessage0_3,
37 HostMessage as MessagingHostMessage0_3, Messaging as Messaging0_3,
38};
39pub use secrets::Secrets;
40
41pub(crate) mod blobstore;
42mod bus;
43mod bus1_0_0;
44mod config;
45mod http;
46mod identity;
47mod keyvalue;
48mod logging;
49pub(crate) mod messaging;
50mod secrets;
51
52#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
56pub enum ReplacedInstanceTarget {
57 BlobstoreBlobstore,
59 BlobstoreContainer,
61 KeyvalueAtomics,
63 KeyvalueStore,
65 KeyvalueBatch,
67 KeyvalueWatch,
69 HttpIncomingHandler,
71 HttpOutgoingHandler,
73}
74
75fn is_0_2(version: &str, min_patch: u64) -> bool {
76 if let Ok(semver::Version {
77 major,
78 minor,
79 patch,
80 pre,
81 build,
82 }) = version.parse()
83 {
84 major == 0 && minor == 2 && patch >= min_patch && pre.is_empty() && build.is_empty()
85 } else {
86 false
87 }
88}
89
90pub enum InvocationErrorKind {
92 NotFound,
95
96 Trap,
98}
99
100pub trait InvocationErrorIntrospect {
102 fn invocation_error_kind(&self, err: &anyhow::Error) -> InvocationErrorKind;
104}
105
106pub trait Handler:
108 wrpc_transport::Invoke<Context = Option<ReplacedInstanceTarget>>
109 + Bus
110 + Config
111 + Logging
112 + Secrets
113 + Messaging0_2
114 + Messaging0_3
115 + Identity
116 + InvocationErrorIntrospect
117 + Send
118 + Sync
119 + Clone
120 + 'static
121{
122}
123
124impl<
125 T: wrpc_transport::Invoke<Context = Option<ReplacedInstanceTarget>>
126 + Bus
127 + Config
128 + Logging
129 + Secrets
130 + Messaging0_2
131 + Messaging0_3
132 + Identity
133 + InvocationErrorIntrospect
134 + Send
135 + Sync
136 + Clone
137 + 'static,
138 > Handler for T
139{
140}
141
142#[derive(Clone, Debug, Default)]
144pub struct ComponentConfig {
145 pub require_signature: bool,
147}
148
149pub fn claims_token(wasm: impl AsRef<[u8]>) -> anyhow::Result<Option<jwt::Token<jwt::Component>>> {
162 let Some(claims) = extract_claims(wasm).context("failed to extract module claims")? else {
163 return Ok(None);
164 };
165 let v = jwt::validate_token::<jwt::Component>(&claims.jwt)
166 .context("failed to validate module token")?;
167 ensure!(!v.expired, "token expired at `{}`", v.expires_human);
168 ensure!(
169 !v.cannot_use_yet,
170 "token cannot be used before `{}`",
171 v.not_before_human
172 );
173 ensure!(v.signature_valid, "signature is not valid");
174 Ok(Some(claims))
175}
176
177#[derive(Clone)]
179pub struct Component<H>
180where
181 H: Handler,
182{
183 engine: wasmtime::Engine,
184 claims: Option<jwt::Claims<jwt::Component>>,
185 instance_pre: wasmtime::component::InstancePre<Ctx<H>>,
186 max_execution_time: Duration,
187 experimental_features: Features,
188}
189
190impl<H> Debug for Component<H>
191where
192 H: Handler,
193{
194 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195 f.debug_struct("Component")
196 .field("claims", &self.claims)
197 .field("runtime", &"wasmtime")
198 .field("max_execution_time", &self.max_execution_time)
199 .finish_non_exhaustive()
200 }
201}
202
203fn new_store<H: Handler>(
204 engine: &wasmtime::Engine,
205 handler: H,
206 max_execution_time: Duration,
207) -> wasmtime::Store<Ctx<H>> {
208 let table = ResourceTable::new();
209 let wasi = WasiCtxBuilder::new()
210 .args(&["main.wasm"]) .inherit_stderr()
212 .build();
213
214 let mut store = wasmtime::Store::new(
215 engine,
216 Ctx {
217 handler,
218 wasi,
219 http: WasiHttpCtx::new(),
220 table,
221 shared_resources: SharedResourceTable::default(),
222 timeout: max_execution_time,
223 parent_context: None,
224 },
225 );
226 store.set_epoch_deadline(max_execution_time.as_secs());
227 store
228}
229
230#[derive(Clone, Debug)]
232pub enum WrpcServeEvent<C> {
233 HttpIncomingHandlerHandleReturned {
235 context: C,
237 success: bool,
239 },
240 MessagingHandlerHandleMessageReturned {
242 context: C,
244 success: bool,
246 },
247 DynamicExportReturned {
249 context: C,
251 success: bool,
253 },
254}
255
256pub type InvocationStream = Pin<
259 Box<
260 dyn Stream<
261 Item = anyhow::Result<
262 Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'static>>,
263 >,
264 > + Send
265 + 'static,
266 >,
267>;
268
269impl<H> Component<H>
270where
271 H: Handler,
272{
273 #[instrument(level = "trace", skip_all)]
277 pub fn new(rt: &Runtime, wasm: &[u8]) -> anyhow::Result<Self> {
278 if wasmparser::Parser::is_core_wasm(wasm) {
279 let wasm = wit_component::ComponentEncoder::default()
280 .module(wasm)
281 .context("failed to set core component module")?
282 .adapter(
283 WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME,
284 WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER,
285 )
286 .context("failed to add WASI preview1 adapter")?
287 .encode()
288 .context("failed to encode a component from module")?;
289 return Self::new(rt, &wasm);
290 }
291 let engine = rt.engine.clone();
292 let claims_token = claims_token(wasm)?;
293 let claims = claims_token.map(|c| c.claims);
294 let component = wasmtime::component::Component::new(&engine, wasm)
295 .context("failed to compile component")?;
296
297 let mut linker = Linker::new(&engine);
298
299 wasmtime_wasi::add_to_linker_async(&mut linker)
300 .context("failed to link core WASI interfaces")?;
301 wasmtime_wasi_http::add_only_http_to_linker_async(&mut linker)
302 .context("failed to link `wasi:http`")?;
303
304 capability::blobstore::blobstore::add_to_linker(&mut linker, |ctx| ctx)
305 .context("failed to link `wasi:blobstore/blobstore`")?;
306 capability::blobstore::container::add_to_linker(&mut linker, |ctx| ctx)
307 .context("failed to link `wasi:blobstore/container`")?;
308 capability::blobstore::types::add_to_linker(&mut linker, |ctx| ctx)
309 .context("failed to link `wasi:blobstore/types`")?;
310 capability::config::runtime::add_to_linker(&mut linker, |ctx| ctx)
311 .context("failed to link `wasi:config/runtime`")?;
312 capability::config::store::add_to_linker(&mut linker, |ctx| ctx)
313 .context("failed to link `wasi:config/store`")?;
314 capability::keyvalue::atomics::add_to_linker(&mut linker, |ctx| ctx)
315 .context("failed to link `wasi:keyvalue/atomics`")?;
316 capability::keyvalue::store::add_to_linker(&mut linker, |ctx| ctx)
317 .context("failed to link `wasi:keyvalue/store`")?;
318 capability::keyvalue::batch::add_to_linker(&mut linker, |ctx| ctx)
319 .context("failed to link `wasi:keyvalue/batch`")?;
320 capability::logging::logging::add_to_linker(&mut linker, |ctx| ctx)
321 .context("failed to link `wasi:logging/logging`")?;
322 capability::unversioned_logging::logging::add_to_linker(&mut linker, |ctx| ctx)
323 .context("failed to link unversioned `wasi:logging/logging`")?;
324
325 capability::bus1_0_0::lattice::add_to_linker(&mut linker, |ctx| ctx)
326 .context("failed to link `wasmcloud:bus/lattice@1.0.0`")?;
327 capability::bus2_0_0::lattice::add_to_linker(&mut linker, |ctx| ctx)
328 .context("failed to link `wasmcloud:bus/lattice@2.0.0`")?;
329 capability::messaging0_2_0::types::add_to_linker(&mut linker, |ctx| ctx)
330 .context("failed to link `wasmcloud:messaging/types@0.2.0`")?;
331 capability::messaging0_2_0::consumer::add_to_linker(&mut linker, |ctx| ctx)
332 .context("failed to link `wasmcloud:messaging/consumer@0.2.0`")?;
333 capability::secrets::reveal::add_to_linker(&mut linker, |ctx| ctx)
334 .context("failed to link `wasmcloud:secrets/reveal`")?;
335 capability::secrets::store::add_to_linker(&mut linker, |ctx| ctx)
336 .context("failed to link `wasmcloud:secrets/store`")?;
337 if rt.experimental_features.wasmcloud_messaging_v3 {
339 capability::messaging0_3_0::types::add_to_linker(&mut linker, |ctx| ctx)
340 .context("failed to link `wasmcloud:messaging/types@0.3.0`")?;
341 capability::messaging0_3_0::producer::add_to_linker(&mut linker, |ctx| ctx)
342 .context("failed to link `wasmcloud:messaging/producer@0.3.0`")?;
343 capability::messaging0_3_0::request_reply::add_to_linker(&mut linker, |ctx| ctx)
344 .context("failed to link `wasmcloud:messaging/request-reply@0.3.0`")?;
345 }
346 if rt.experimental_features.workload_identity_interface {
348 capability::identity::store::add_to_linker(&mut linker, |ctx| ctx)
349 .context("failed to link `wasmcloud:identity/store`")?;
350 }
351
352 let ty = component.component_type();
353 let mut guest_resources = Vec::new();
354 collect_component_resources(&engine, &ty, &mut guest_resources);
355 if !guest_resources.is_empty() {
356 warn!("exported component resources are not supported in wasmCloud runtime and will be ignored, use a provider instead to enable this functionality");
357 }
358 for (name, ty) in ty.imports(&engine) {
359 match name.split_once('/').map(|(pkg, suffix)| {
361 suffix
362 .split_once('@')
363 .map_or((pkg, suffix, None), |(iface, version)| {
364 (pkg, iface, Some(version))
365 })
366 }) {
367 Some(
368 ("wasi:blobstore", "blobstore" | "container" | "types", Some("0.2.0-draft"))
369 | ("wasi:config", "runtime" | "store", Some("0.2.0-draft"))
370 | ("wasi:keyvalue", "atomics" | "batch" | "store", Some("0.2.0-draft"))
371 | ("wasi:logging", "logging", None | Some("0.1.0-draft"))
372 | ("wasmcloud:bus", "lattice", Some("1.0.0" | "2.0.0"))
373 | ("wasmcloud:messaging", "consumer" | "types", Some("0.2.0"))
374 | ("wasmcloud:secrets", "reveal" | "store", Some("0.1.0-draft")),
375 ) => {}
376 Some((
377 "wasi:cli",
378 "environment" | "exit" | "stderr" | "stdin" | "stdout" | "terminal-input"
379 | "terminal-output" | "terminal-stderr" | "terminal-stdin" | "terminal-stdout",
380 Some(version),
381 )) if is_0_2(version, 0) => {}
382 Some(("wasi:clocks", "monotonic-clock" | "wall-clock", Some(version)))
383 if is_0_2(version, 0) => {}
384 Some(("wasi:clocks", "timezone", Some(version))) if is_0_2(version, 1) => {}
385 Some(("wasi:filesystem", "preopens" | "types", Some(version)))
386 if is_0_2(version, 0) => {}
387 Some((
388 "wasi:http",
389 "incoming-handler" | "outgoing-handler" | "types",
390 Some(version),
391 )) if is_0_2(version, 0) => {}
392 Some(("wasi:io", "error" | "poll" | "streams", Some(version)))
393 if is_0_2(version, 0) => {}
394 Some(("wasi:random", "insecure-seed" | "insecure" | "random", Some(version)))
395 if is_0_2(version, 0) => {}
396 Some((
397 "wasi:sockets",
398 "instance-network" | "ip-name-lookup" | "network" | "tcp-create-socket" | "tcp"
399 | "udp-create-socket" | "udp",
400 Some(version),
401 )) if is_0_2(version, 0) => {}
402 _ if rt.skip_feature_gated_instance(name) => {}
403 _ => link_item(&engine, &mut linker.root(), [], ty, "", name, None)
404 .context("failed to link item")?,
405 };
406 }
407 let instance_pre = linker.instantiate_pre(&component)?;
408 Ok(Self {
409 engine,
410 claims,
411 instance_pre,
412 max_execution_time: rt.max_execution_time,
413 experimental_features: rt.experimental_features,
414 })
415 }
416
417 #[instrument(level = "trace", skip_all)]
420 pub fn set_max_execution_time(&mut self, max_execution_time: Duration) -> &mut Self {
421 self.max_execution_time = max_execution_time.max(Duration::from_secs(1));
422 self
423 }
424
425 #[instrument(level = "trace", skip_all)]
431 pub async fn read(rt: &Runtime, mut wasm: impl AsyncRead + Unpin) -> anyhow::Result<Self> {
432 let mut buf = Vec::new();
433 wasm.read_to_end(&mut buf)
434 .await
435 .context("failed to read Wasm")?;
436 Self::new(rt, &buf)
437 }
438
439 #[instrument(level = "trace", skip_all)]
445 pub fn read_sync(rt: &Runtime, mut wasm: impl std::io::Read) -> anyhow::Result<Self> {
446 let mut buf = Vec::new();
447 wasm.read_to_end(&mut buf).context("failed to read Wasm")?;
448 Self::new(rt, &buf)
449 }
450
451 #[instrument(level = "trace")]
453 pub fn claims(&self) -> Option<&jwt::Claims<jwt::Component>> {
454 self.claims.as_ref()
455 }
456
457 pub fn instantiate<C>(
459 &self,
460 handler: H,
461 events: mpsc::Sender<WrpcServeEvent<C>>,
462 ) -> Instance<H, C> {
463 Instance {
464 engine: self.engine.clone(),
465 pre: self.instance_pre.clone(),
466 handler,
467 max_execution_time: self.max_execution_time,
468 events,
469 experimental_features: self.experimental_features,
470 }
471 }
472
473 #[instrument(level = "debug", skip_all)]
480 pub async fn serve_wrpc<S>(
481 &self,
482 srv: &S,
483 handler: H,
484 events: mpsc::Sender<WrpcServeEvent<S::Context>>,
485 ) -> anyhow::Result<Vec<InvocationStream>>
486 where
487 S: wrpc_transport::Serve,
488 S::Context: Deref<Target = tracing::Span>,
489 {
490 let max_execution_time = self.max_execution_time;
491 let mut invocations = vec![];
492 let instance = self.instantiate(handler.clone(), events.clone());
493 for (name, ty) in self
494 .instance_pre
495 .component()
496 .component_type()
497 .exports(&self.engine)
498 {
499 match (name, ty) {
500 (_, types::ComponentItem::ComponentInstance(..))
501 if name.starts_with("wasi:http/incoming-handler@0.2") =>
502 {
503 let instance = instance.clone();
504 let [(_, _, handle)] = wrpc_interface_http::bindings::exports::wrpc::http::incoming_handler::serve_interface(
505 srv,
506 wrpc_interface_http::ServeWasmtime(instance),
507 )
508 .await
509 .context("failed to serve `wrpc:http/incoming-handler`")?;
510 invocations.push(handle);
511 }
512 (
513 "wasmcloud:messaging/handler@0.2.0"
514 | "wasmcloud:messaging/incoming-handler@0.3.0",
515 types::ComponentItem::ComponentInstance(..),
516 ) => {
517 let instance = instance.clone();
518 let [(_, _, handle_message)] =
519 wrpc::exports::wasmcloud::messaging0_2_0::handler::serve_interface(
520 srv, instance,
521 )
522 .await
523 .context("failed to serve `wasmcloud:messaging/handler`")?;
524 invocations.push(handle_message);
525 }
526 (
527 "wasi:keyvalue/watcher@0.2.0-draft",
528 types::ComponentItem::ComponentInstance(..),
529 ) => {
530 let instance = instance.clone();
531 let [(_, _, on_set), (_, _, on_delete)] =
532 wrpc::exports::wrpc::keyvalue::watcher::serve_interface(srv, instance)
533 .await
534 .context("failed to serve `wrpc:keyvalue/watcher`")?;
535 invocations.push(on_set);
536 invocations.push(on_delete);
537 }
538 (name, types::ComponentItem::ComponentFunc(ty)) => {
539 let engine = self.engine.clone();
540 let handler = handler.clone();
541 let pre = self.instance_pre.clone();
542 debug!(?name, "serving root function");
543 let func = srv
544 .serve_function(
545 move || {
546 let span = info_span!("call_instance_function");
547 let mut store =
548 new_store(&engine, handler.clone(), max_execution_time);
549 store.data_mut().parent_context = Some(span.context());
550 store
551 },
552 pre,
553 ty,
554 "",
555 name,
556 )
557 .await
558 .context("failed to serve root function")?;
559 let events = events.clone();
560 invocations.push(Box::pin(func.map_ok(move |(cx, res)| {
561 let events = events.clone();
562 let span = cx.deref().clone();
563 Box::pin(
564 async move {
565 let res =
566 res.instrument(info_span!("handle_instance_function")).await;
567 let success = res.is_ok();
568 if let Err(err) =
569 events.try_send(WrpcServeEvent::DynamicExportReturned {
570 context: cx,
571 success,
572 })
573 {
574 warn!(
575 ?err,
576 success, "failed to send dynamic root export return event"
577 );
578 }
579 res
580 }
581 .instrument(span),
582 )
583 as Pin<Box<dyn Future<Output = _> + Send + 'static>>
584 })));
585 }
586 (_, types::ComponentItem::CoreFunc(_)) => {
587 warn!(name, "serving root core function exports not supported yet");
588 }
589 (_, types::ComponentItem::Module(_)) => {
590 warn!(name, "serving root module exports not supported yet");
591 }
592 (_, types::ComponentItem::Component(_)) => {
593 warn!(name, "serving root component exports not supported yet");
594 }
595 (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
596 for (name, ty) in ty.exports(&self.engine) {
597 match ty {
598 types::ComponentItem::ComponentFunc(ty) => {
599 let engine = self.engine.clone();
600 let handler = handler.clone();
601 let pre = self.instance_pre.clone();
602 debug!(?instance_name, ?name, "serving instance function");
603 let func = srv
604 .serve_function(
605 move || {
606 let span = info_span!("call_instance_function");
607 let mut store = new_store(
608 &engine,
609 handler.clone(),
610 max_execution_time,
611 );
612 store.data_mut().parent_context = Some(span.context());
613 store
614 },
615 pre,
616 ty,
617 instance_name,
618 name,
619 )
620 .await
621 .context("failed to serve instance function")?;
622 let events = events.clone();
623 invocations.push(Box::pin(func.map_ok(move |(cx, res)| {
624 let events = events.clone();
625 let span = cx.deref().clone();
626 Box::pin(
627 async move {
628 let res = res.await;
629 let success = res.is_ok();
630 if let Err(err) = events.try_send(
631 WrpcServeEvent::DynamicExportReturned {
632 context: cx,
633 success,
634 },
635 ) {
636 warn!(
637 ?err,
638 success,
639 "failed to send dynamic instance export return event"
640 );
641 }
642 res
643 }
644 .instrument(span),
645 )
646 as Pin<Box<dyn Future<Output = _> + Send + 'static>>
647 })));
648 }
649 types::ComponentItem::CoreFunc(_) => {
650 warn!(
651 instance_name,
652 name,
653 "serving instance core function exports not supported yet"
654 );
655 }
656 types::ComponentItem::Module(_) => {
657 warn!(
658 instance_name,
659 name, "serving instance module exports not supported yet"
660 );
661 }
662 types::ComponentItem::Component(_) => {
663 warn!(
664 instance_name,
665 name, "serving instance component exports not supported yet"
666 );
667 }
668 types::ComponentItem::ComponentInstance(_) => {
669 warn!(
670 instance_name,
671 name, "serving nested instance exports not supported yet"
672 );
673 }
674 types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
675 }
676 }
677 }
678 (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
679 }
680 }
681 Ok(invocations)
682 }
683}
684
685impl<H> From<Component<H>> for Option<jwt::Claims<jwt::Component>>
686where
687 H: Handler,
688{
689 fn from(Component { claims, .. }: Component<H>) -> Self {
690 claims
691 }
692}
693
694pub struct Instance<H, C>
696where
697 H: Handler,
698{
699 engine: wasmtime::Engine,
700 pre: wasmtime::component::InstancePre<Ctx<H>>,
701 handler: H,
702 max_execution_time: Duration,
703 events: mpsc::Sender<WrpcServeEvent<C>>,
704 experimental_features: Features,
705}
706
707impl<H, C> Clone for Instance<H, C>
708where
709 H: Handler,
710{
711 fn clone(&self) -> Self {
712 Self {
713 engine: self.engine.clone(),
714 pre: self.pre.clone(),
715 handler: self.handler.clone(),
716 max_execution_time: self.max_execution_time,
717 events: self.events.clone(),
718 experimental_features: self.experimental_features,
719 }
720 }
721}
722
723type TableResult<T> = Result<T, ResourceTableError>;
724
725pub(crate) struct Ctx<H>
726where
727 H: Handler,
728{
729 handler: H,
730 wasi: WasiCtx,
731 http: WasiHttpCtx,
732 table: ResourceTable,
733 shared_resources: SharedResourceTable,
734 timeout: Duration,
735 parent_context: Option<opentelemetry::Context>,
736}
737
738impl<H: Handler> WasiView for Ctx<H> {
739 fn table(&mut self) -> &mut ResourceTable {
740 &mut self.table
741 }
742
743 fn ctx(&mut self) -> &mut WasiCtx {
744 &mut self.wasi
745 }
746}
747
748impl<H: Handler> WrpcView for Ctx<H> {
749 type Invoke = H;
750
751 fn client(&self) -> &H {
752 &self.handler
753 }
754
755 fn shared_resources(&mut self) -> &mut SharedResourceTable {
756 &mut self.shared_resources
757 }
758
759 fn timeout(&self) -> Option<Duration> {
760 Some(self.timeout)
761 }
762}
763
764impl<H: Handler> Debug for Ctx<H> {
765 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
766 f.debug_struct("Ctx").field("runtime", &"wasmtime").finish()
767 }
768}
769
770impl<H: Handler> Ctx<H> {
771 fn attach_parent_context(&self) {
772 if let Some(context) = self.parent_context.as_ref() {
773 Span::current().set_parent(context.clone());
774 }
775 }
776}