wasmcloud_runtime/component/
http.rs

1use core::ops::Deref;
2
3use anyhow::{bail, Context as _};
4use futures::stream::StreamExt as _;
5use tokio::sync::oneshot;
6use tokio::{join, spawn};
7use tracing::{debug, debug_span, info_span, instrument, trace, warn, Instrument as _, Span};
8use tracing_opentelemetry::OpenTelemetrySpanExt as _;
9use wasmtime::component::ResourceTable;
10use wasmtime_wasi_http::body::{HostIncomingBody, HyperOutgoingBody};
11use wasmtime_wasi_http::types::{
12    HostFutureIncomingResponse, HostIncomingRequest, IncomingResponse, OutgoingRequestConfig,
13};
14use wasmtime_wasi_http::{HttpResult, WasiHttpCtx, WasiHttpView};
15use wrpc_interface_http::ServeIncomingHandlerWasmtime;
16
17use crate::capability::http::types;
18
19use super::{new_store, Ctx, Handler, Instance, ReplacedInstanceTarget, WrpcServeEvent};
20
21pub mod incoming_http_bindings {
22    wasmtime::component::bindgen!({
23        world: "incoming-http",
24        async: true,
25        trappable_imports: true,
26        with: {
27           "wasi:http/types": wasmtime_wasi_http::bindings::http::types,
28        },
29    });
30}
31
32#[instrument(level = "debug", skip_all)]
33async fn invoke_outgoing_handle<H>(
34    handler: H,
35    request: http::Request<HyperOutgoingBody>,
36    config: OutgoingRequestConfig,
37) -> anyhow::Result<Result<IncomingResponse, types::ErrorCode>>
38where
39    H: Handler,
40{
41    use wrpc_interface_http::InvokeOutgoingHandler as _;
42
43    let between_bytes_timeout = config.between_bytes_timeout;
44    debug!("invoking `wrpc:http/outgoing-handler.handle`");
45    match handler
46        .invoke_handle_wasmtime(
47            Some(ReplacedInstanceTarget::HttpOutgoingHandler),
48            request,
49            config,
50        )
51        .await?
52    {
53        (Ok(resp), errs, io) => {
54            debug!("`wrpc:http/outgoing-handler.handle` succeeded");
55            let worker = wasmtime_wasi::runtime::spawn(
56                async move {
57                    // TODO: Do more than just log errors
58                    join!(
59                        errs.for_each(|err| async move {
60                            warn!(?err, "body processing error encountered");
61                        }),
62                        async move {
63                            if let Some(io) = io {
64                                debug!("performing async I/O");
65                                if let Err(err) = io.await {
66                                    warn!(?err, "failed to complete async I/O");
67                                }
68                                debug!("async I/O completed");
69                            }
70                        }
71                    );
72                }
73                .in_current_span(),
74            );
75            Ok(Ok(IncomingResponse {
76                resp,
77                worker: Some(worker),
78                between_bytes_timeout,
79            }))
80        }
81        (Err(err), _, _) => {
82            debug!(
83                ?err,
84                "`wrpc:http/outgoing-handler.handle` returned an error code"
85            );
86            Ok(Err(err))
87        }
88    }
89}
90
91impl<H> WasiHttpView for Ctx<H>
92where
93    H: Handler,
94{
95    fn ctx(&mut self) -> &mut WasiHttpCtx {
96        &mut self.http
97    }
98
99    fn table(&mut self) -> &mut ResourceTable {
100        &mut self.table
101    }
102
103    #[instrument(level = "debug", skip_all)]
104    fn send_request(
105        &mut self,
106        request: http::Request<HyperOutgoingBody>,
107        config: OutgoingRequestConfig,
108    ) -> HttpResult<HostFutureIncomingResponse>
109    where
110        Self: Sized,
111    {
112        self.attach_parent_context();
113        Ok(HostFutureIncomingResponse::pending(
114            wasmtime_wasi::runtime::spawn(
115                invoke_outgoing_handle(self.handler.clone(), request, config).in_current_span(),
116            ),
117        ))
118    }
119}
120
121impl<H, C> ServeIncomingHandlerWasmtime<C> for Instance<H, C>
122where
123    H: Handler,
124    C: Send + Deref<Target = tracing::Span>,
125{
126    #[instrument(level = "debug", skip_all)]
127    async fn handle(
128        &self,
129        cx: C,
130        request: ::http::Request<wasmtime_wasi_http::body::HyperIncomingBody>,
131    ) -> anyhow::Result<
132        Result<
133            http::Response<wasmtime_wasi_http::body::HyperOutgoingBody>,
134            wasmtime_wasi_http::bindings::http::types::ErrorCode,
135        >,
136    > {
137        // Set the parent of the current context to the span passed in
138        Span::current().set_parent(cx.deref().context());
139        let scheme = request.uri().scheme().context("scheme missing")?;
140        let scheme = wrpc_interface_http::bindings::wrpc::http::types::Scheme::from(scheme).into();
141
142        let (tx, rx) = oneshot::channel();
143        let mut store = new_store(&self.engine, self.handler.clone(), self.max_execution_time);
144        let pre = incoming_http_bindings::IncomingHttpPre::new(self.pre.clone())
145            .context("failed to pre-instantiate `wasi:http/incoming-handler`")?;
146        trace!("instantiating `wasi:http/incoming-handler`");
147        let bindings = pre
148            .instantiate_async(&mut store)
149            .instrument(debug_span!("instantiate_async"))
150            .await
151            .context("failed to instantiate `wasi:http/incoming-handler`")?;
152        let data = store.data_mut();
153
154        // The below is adapted from `WasiHttpView::new_incoming_request`, which is unusable for
155        // us, since it requires a `hyper::Error`
156
157        let (parts, body) = request.into_parts();
158        let body = HostIncomingBody::new(
159            body,
160            // TODO: this needs to be plumbed through
161            std::time::Duration::from_millis(600 * 1000),
162        );
163        let incoming_req = HostIncomingRequest::new(data, parts, scheme, Some(body))?;
164        let request = data.table().push(incoming_req)?;
165
166        let response = data
167            .new_response_outparam(tx)
168            .context("failed to create response")?;
169
170        // TODO: Replicate this for custom interface
171        // Set the current invocation parent context for injection on outgoing wRPC requests
172        let call_incoming_handle = info_span!("call_http_incoming_handle");
173        store.data_mut().parent_context = Some(call_incoming_handle.context());
174        let handle = spawn(
175            async move {
176                debug!("invoking `wasi:http/incoming-handler.handle`");
177                if let Err(err) = bindings
178                    .wasi_http_incoming_handler()
179                    .call_handle(&mut store, request, response)
180                    .instrument(call_incoming_handle)
181                    .await
182                {
183                    warn!(?err, "failed to call `wasi:http/incoming-handler.handle`");
184                    bail!(err.context("failed to call `wasi:http/incoming-handler.handle`"));
185                }
186                Ok(())
187            }
188            .in_current_span(),
189        );
190        let res = async {
191            debug!("awaiting `wasi:http/incoming-handler.handle` response");
192            match rx.await {
193                Ok(Ok(res)) => {
194                    debug!("successful `wasi:http/incoming-handler.handle` response received");
195                    Ok(Ok(res))
196                }
197                Ok(Err(err)) => {
198                    debug!(
199                        ?err,
200                        "unsuccessful `wasi:http/incoming-handler.handle` response received"
201                    );
202                    Ok(Err(err))
203                }
204                Err(_) => {
205                    debug!("`wasi:http/incoming-handler.handle` response sender dropped");
206                    handle
207                        .instrument(debug_span!("await_response"))
208                        .await
209                        .context("failed to join handle task")??;
210                    bail!("component did not call `response-outparam::set`")
211                }
212            }
213        }
214        .in_current_span()
215        .await;
216        let success = res.as_ref().is_ok_and(Result::is_ok);
217        if let Err(err) = self
218            .events
219            .try_send(WrpcServeEvent::HttpIncomingHandlerHandleReturned {
220                context: cx,
221                success,
222            })
223        {
224            warn!(
225                ?err,
226                success, "failed to send `wasi:http/incoming-handler.handle` return event"
227            );
228        }
229        res
230    }
231}