wasmcloud_runtime/component/messaging/
mod.rs1use core::ops::Deref;
2
3use anyhow::Context as _;
4use tracing::{instrument, warn, Span};
5use tracing_opentelemetry::OpenTelemetrySpanExt as _;
6
7use crate::capability::wrpc;
8use crate::component::{new_store, Handler, Instance, WrpcServeEvent};
9
10pub mod v0_2;
11pub mod v0_3;
12
13impl<H, C> wrpc::exports::wasmcloud::messaging0_2_0::handler::Handler<C> for Instance<H, C>
14where
15 H: Handler,
16 C: Send + Deref<Target = Span>,
17{
18 #[instrument(level = "debug", skip_all)]
19 async fn handle_message(
20 &self,
21 cx: C,
22 msg: wrpc::wasmcloud::messaging0_2_0::types::BrokerMessage,
23 ) -> anyhow::Result<Result<(), String>> {
24 Span::current().set_parent(cx.deref().context());
26 let mut store = new_store(&self.engine, self.handler.clone(), self.max_execution_time);
27
28 let res = if self.experimental_features.wasmcloud_messaging_v3 {
31 if let Ok(pre) = v0_3::bindings::MessagingHandlerPre::new(self.pre.clone()) {
32 v0_3::handle_message(pre, &mut store, msg).await
33 } else {
34 let pre = v0_2::bindings::MessagingHandlerOhTwoPre::new(self.pre.clone())
35 .context("failed to pre-instantiate `wasmcloud:messaging/handler`")?;
36 v0_2::handle_message(pre, &mut store, msg).await
37 }
38 } else {
39 let pre = v0_2::bindings::MessagingHandlerOhTwoPre::new(self.pre.clone())
40 .context("failed to pre-instantiate `wasmcloud:messaging/handler`")?;
41 v0_2::handle_message(pre, &mut store, msg).await
42 };
43
44 let success = res.is_ok();
45 if let Err(err) =
46 self.events
47 .try_send(WrpcServeEvent::MessagingHandlerHandleMessageReturned {
48 context: cx,
49 success,
50 })
51 {
52 warn!(
53 ?err,
54 success, "failed to send `wasmcloud:messaging/handler.handle-message` return event"
55 );
56 }
57 res
58 }
59}