wasmcloud_runtime/component/
http.rs1use core::ops::Deref;
2
3use anyhow::{bail, Context as _};
4use futures::stream::StreamExt as _;
5use tokio::sync::oneshot;
6use tokio::{join, spawn};
7use tracing::{debug, debug_span, info_span, instrument, trace, warn, Instrument as _, Span};
8use tracing_opentelemetry::OpenTelemetrySpanExt as _;
9use wasmtime::component::ResourceTable;
10use wasmtime_wasi_http::body::{HostIncomingBody, HyperOutgoingBody};
11use wasmtime_wasi_http::types::{
12 HostFutureIncomingResponse, HostIncomingRequest, IncomingResponse, OutgoingRequestConfig,
13};
14use wasmtime_wasi_http::{HttpResult, WasiHttpCtx, WasiHttpView};
15use wrpc_interface_http::ServeIncomingHandlerWasmtime;
16
17use crate::capability::http::types;
18
19use super::{new_store, Ctx, Handler, Instance, ReplacedInstanceTarget, WrpcServeEvent};
20
21pub mod incoming_http_bindings {
22 wasmtime::component::bindgen!({
23 world: "incoming-http",
24 async: true,
25 trappable_imports: true,
26 with: {
27 "wasi:http/types": wasmtime_wasi_http::bindings::http::types,
28 },
29 });
30}
31
32#[instrument(level = "debug", skip_all)]
33async fn invoke_outgoing_handle<H>(
34 handler: H,
35 request: http::Request<HyperOutgoingBody>,
36 config: OutgoingRequestConfig,
37) -> anyhow::Result<Result<IncomingResponse, types::ErrorCode>>
38where
39 H: Handler,
40{
41 use wrpc_interface_http::InvokeOutgoingHandler as _;
42
43 let between_bytes_timeout = config.between_bytes_timeout;
44 debug!("invoking `wrpc:http/outgoing-handler.handle`");
45 match handler
46 .invoke_handle_wasmtime(
47 Some(ReplacedInstanceTarget::HttpOutgoingHandler),
48 request,
49 config,
50 )
51 .await?
52 {
53 (Ok(resp), errs, io) => {
54 debug!("`wrpc:http/outgoing-handler.handle` succeeded");
55 let worker = wasmtime_wasi::runtime::spawn(
56 async move {
57 join!(
59 errs.for_each(|err| async move {
60 warn!(?err, "body processing error encountered");
61 }),
62 async move {
63 if let Some(io) = io {
64 debug!("performing async I/O");
65 if let Err(err) = io.await {
66 warn!(?err, "failed to complete async I/O");
67 }
68 debug!("async I/O completed");
69 }
70 }
71 );
72 }
73 .in_current_span(),
74 );
75 Ok(Ok(IncomingResponse {
76 resp,
77 worker: Some(worker),
78 between_bytes_timeout,
79 }))
80 }
81 (Err(err), _, _) => {
82 debug!(
83 ?err,
84 "`wrpc:http/outgoing-handler.handle` returned an error code"
85 );
86 Ok(Err(err))
87 }
88 }
89}
90
91impl<H> WasiHttpView for Ctx<H>
92where
93 H: Handler,
94{
95 fn ctx(&mut self) -> &mut WasiHttpCtx {
96 &mut self.http
97 }
98
99 fn table(&mut self) -> &mut ResourceTable {
100 &mut self.table
101 }
102
103 #[instrument(level = "debug", skip_all)]
104 fn send_request(
105 &mut self,
106 request: http::Request<HyperOutgoingBody>,
107 config: OutgoingRequestConfig,
108 ) -> HttpResult<HostFutureIncomingResponse>
109 where
110 Self: Sized,
111 {
112 self.attach_parent_context();
113 Ok(HostFutureIncomingResponse::pending(
114 wasmtime_wasi::runtime::spawn(
115 invoke_outgoing_handle(self.handler.clone(), request, config).in_current_span(),
116 ),
117 ))
118 }
119}
120
121impl<H, C> ServeIncomingHandlerWasmtime<C> for Instance<H, C>
122where
123 H: Handler,
124 C: Send + Deref<Target = tracing::Span>,
125{
126 #[instrument(level = "debug", skip_all)]
127 async fn handle(
128 &self,
129 cx: C,
130 request: ::http::Request<wasmtime_wasi_http::body::HyperIncomingBody>,
131 ) -> anyhow::Result<
132 Result<
133 http::Response<wasmtime_wasi_http::body::HyperOutgoingBody>,
134 wasmtime_wasi_http::bindings::http::types::ErrorCode,
135 >,
136 > {
137 Span::current().set_parent(cx.deref().context());
139 let scheme = request.uri().scheme().context("scheme missing")?;
140 let scheme = wrpc_interface_http::bindings::wrpc::http::types::Scheme::from(scheme).into();
141
142 let (tx, rx) = oneshot::channel();
143 let mut store = new_store(&self.engine, self.handler.clone(), self.max_execution_time);
144 let pre = incoming_http_bindings::IncomingHttpPre::new(self.pre.clone())
145 .context("failed to pre-instantiate `wasi:http/incoming-handler`")?;
146 trace!("instantiating `wasi:http/incoming-handler`");
147 let bindings = pre
148 .instantiate_async(&mut store)
149 .instrument(debug_span!("instantiate_async"))
150 .await
151 .context("failed to instantiate `wasi:http/incoming-handler`")?;
152 let data = store.data_mut();
153
154 let (parts, body) = request.into_parts();
158 let body = HostIncomingBody::new(
159 body,
160 std::time::Duration::from_millis(600 * 1000),
162 );
163 let incoming_req = HostIncomingRequest::new(data, parts, scheme, Some(body))?;
164 let request = data.table().push(incoming_req)?;
165
166 let response = data
167 .new_response_outparam(tx)
168 .context("failed to create response")?;
169
170 let call_incoming_handle = info_span!("call_http_incoming_handle");
173 store.data_mut().parent_context = Some(call_incoming_handle.context());
174 let handle = spawn(
175 async move {
176 debug!("invoking `wasi:http/incoming-handler.handle`");
177 if let Err(err) = bindings
178 .wasi_http_incoming_handler()
179 .call_handle(&mut store, request, response)
180 .instrument(call_incoming_handle)
181 .await
182 {
183 warn!(?err, "failed to call `wasi:http/incoming-handler.handle`");
184 bail!(err.context("failed to call `wasi:http/incoming-handler.handle`"));
185 }
186 Ok(())
187 }
188 .in_current_span(),
189 );
190 let res = async {
191 debug!("awaiting `wasi:http/incoming-handler.handle` response");
192 match rx.await {
193 Ok(Ok(res)) => {
194 debug!("successful `wasi:http/incoming-handler.handle` response received");
195 Ok(Ok(res))
196 }
197 Ok(Err(err)) => {
198 debug!(
199 ?err,
200 "unsuccessful `wasi:http/incoming-handler.handle` response received"
201 );
202 Ok(Err(err))
203 }
204 Err(_) => {
205 debug!("`wasi:http/incoming-handler.handle` response sender dropped");
206 handle
207 .instrument(debug_span!("await_response"))
208 .await
209 .context("failed to join handle task")??;
210 bail!("component did not call `response-outparam::set`")
211 }
212 }
213 }
214 .in_current_span()
215 .await;
216 let success = res.as_ref().is_ok_and(Result::is_ok);
217 if let Err(err) = self
218 .events
219 .try_send(WrpcServeEvent::HttpIncomingHandlerHandleReturned {
220 context: cx,
221 success,
222 })
223 {
224 warn!(
225 ?err,
226 success, "failed to send `wasi:http/incoming-handler.handle` return event"
227 );
228 }
229 res
230 }
231}