bulwark_sdk/
http.rs

1//! The `http` module provides HTTP type definitions and sends outgoing HTTP requests.
2
3/// Reexport from the `http` crate.
4pub use http::{
5    uri, Extensions, HeaderMap, HeaderName, HeaderValue, Method, StatusCode, Uri, Version,
6};
7/// Reexport from the `http` crate.
8pub mod request {
9    pub use http::request::{Builder, Parts};
10}
11/// Reexport from the `http` crate.
12pub mod response {
13    pub use http::response::{Builder, Parts};
14}
15/// An HTTP request combines a head consisting of a [`Method`], [`Uri`], and headers with [`Bytes`](crate::Bytes), which provides
16/// access to the first chunk of a request body.
17pub type Request = http::Request<crate::Bytes>;
18/// An HTTP response combines a head consisting of a [`StatusCode`] and headers with [`Bytes`](crate::Bytes), which provides
19/// access to the first chunk of a response body.
20pub type Response = http::Response<crate::Bytes>;
21
22use crate::wit::wasi::http::outgoing_handler;
23use crate::wit::wasi::http::types::{
24    ErrorCode, Headers, IncomingBody, IncomingResponse, OutgoingBody, OutgoingRequest, Scheme,
25};
26use crate::wit::wasi::io::streams::{InputStream, OutputStream, StreamError};
27use futures::SinkExt;
28use std::cell::RefCell;
29use std::future::Future;
30use std::rc::Rc;
31use std::sync::{Arc, Mutex};
32use std::task::{Poll, Wake, Waker};
33
34const READ_SIZE: u64 = 16 * 1024;
35
36/// Returned when there is an error sending an outgoing HTTP request.
37#[derive(thiserror::Error, Debug)]
38pub enum SendError {
39    /// A generic I/O error
40    #[error("i/o error: {0}")]
41    GenericIo(String),
42    /// A stream I/O error
43    #[error(transparent)]
44    StreamIo(#[from] crate::wit::wasi::io::streams::StreamError),
45    /// An HTTP error originating from the WASI HTTP bindings
46    #[error(transparent)]
47    WasiHttp(#[from] crate::wit::wasi::http::types::ErrorCode),
48    /// An HTTP error originating from the HTTP client
49    #[error(transparent)]
50    ClientHttp(#[from] http::Error),
51    /// A request conversion error
52    #[error("could not convert request: {0}")]
53    RequestConversion(String),
54    /// A response conversion error
55    #[error("could not convert response")]
56    ResponseConversion,
57}
58
59/// Send an outgoing request.
60///
61/// Requires `http` permissions to be set in the plugin configuration for any domain or host this function will be
62/// making requests to. In the example below, this would require the `http` permission value to be set to at least
63/// `["www.example.com"]`.
64///
65/// Note that if an HTTP request version is set, it will be ignored because the underlying `wasi-http` API abstracts
66/// HTTP version and transport protocol choices.
67///
68/// # Example
69///
70#[cfg_attr(doctest, doc = " ````no_test")] // highlight, but don't run the test (rust/issues/63193)
71/// ```rust
72/// use bulwark_sdk::*;
73/// use std::collections::HashMap;
74///
75/// pub struct HttpExample;
76///
77/// #[bulwark_plugin]
78/// impl HttpHandlers for HttpExample {
79///     fn handle_request_decision(
80///         _request: http::Request,
81///         _labels: HashMap<String, String>,
82///     ) -> Result<HandlerOutput, Error> {
83///         let _response = http::send(
84///             http::request::Builder::new()
85///                 .method("GET")
86///                 .uri("http://www.example.com/")
87///                 .body(Bytes::new())?,
88///         )?;
89///         // Do something with the response
90///
91///         Ok(HandlerOutput::default())
92///     }
93/// }
94/// ```
95pub fn send(request: Request) -> Result<Response, SendError> {
96    run(send_async(request))
97}
98
99impl OutgoingRequest {
100    /// Construct a `Sink` which writes chunks to the body of the specified response.
101    ///
102    /// # Panics
103    ///
104    /// Panics if the body was already taken.
105    pub fn take_body(&self) -> impl futures::Sink<Vec<u8>, Error = StreamError> {
106        outgoing_body(self.body().expect("request body was already taken"))
107    }
108}
109
110impl IncomingResponse {
111    /// Return a `Stream` from which the body of the specified response may be read.
112    ///
113    /// # Panics
114    ///
115    /// Panics if the body was already consumed.
116    fn take_body_stream(
117        self,
118    ) -> impl futures::Stream<Item = Result<Vec<u8>, crate::wit::wasi::io::streams::Error>> {
119        incoming_body(self.consume().expect("response body was already consumed"))
120    }
121
122    /// Return a `Vec<u8>` of the body or fails
123    ///
124    /// # Panics
125    ///
126    /// Panics if the body was already consumed.
127    async fn into_body(self) -> Result<Vec<u8>, crate::wit::wasi::io::streams::Error> {
128        use futures::TryStreamExt;
129        let mut stream = self.take_body_stream();
130        let mut body = Vec::new();
131        while let Some(chunk) = stream.try_next().await? {
132            body.extend(chunk);
133        }
134        Ok(body)
135    }
136}
137
138static WAKERS: Mutex<Vec<(crate::wit::wasi::io::poll::Pollable, Waker)>> = Mutex::new(Vec::new());
139
140fn push_waker(pollable: crate::wit::wasi::io::poll::Pollable, waker: Waker) {
141    WAKERS
142        .lock()
143        .expect("poisoned mutex")
144        .push((pollable, waker));
145}
146
147/// Run the specified future to completion blocking until it yields a result.
148///
149/// Based on an executor using `wasi::io/poll/poll-list`,
150fn run<T>(future: impl Future<Output = T>) -> T {
151    futures::pin_mut!(future);
152    struct DummyWaker;
153
154    impl Wake for DummyWaker {
155        fn wake(self: Arc<Self>) {}
156    }
157
158    let waker = Arc::new(DummyWaker).into();
159
160    loop {
161        match future
162            .as_mut()
163            .poll(&mut std::task::Context::from_waker(&waker))
164        {
165            Poll::Pending => {
166                let mut new_wakers = Vec::new();
167
168                let wakers = std::mem::take::<Vec<_>>(&mut WAKERS.lock().expect("poisoned mutex"));
169
170                assert!(!wakers.is_empty());
171
172                let pollables = wakers
173                    .iter()
174                    .map(|(pollable, _)| pollable)
175                    .collect::<Vec<_>>();
176
177                let mut ready = vec![false; wakers.len()];
178
179                for index in crate::wit::wasi::io::poll::poll(&pollables) {
180                    ready[usize::try_from(index).unwrap()] = true;
181                }
182
183                for (ready, (pollable, waker)) in ready.into_iter().zip(wakers) {
184                    if ready {
185                        waker.wake()
186                    } else {
187                        new_wakers.push((pollable, waker));
188                    }
189                }
190
191                *WAKERS.lock().expect("poisoned mutex") = new_wakers;
192            }
193            Poll::Ready(result) => break result,
194        }
195    }
196}
197
198/// Send an outgoing request
199async fn send_async(request: Request) -> Result<Response, SendError> {
200    // Convert the request into an `OutgoingRequest`.
201    let (parts, body) = request.into_parts();
202    let (method, uri, headers) = (parts.method, parts.uri, parts.headers);
203    let is_https = if let Some(scheme) = uri.scheme() {
204        scheme == &http::uri::Scheme::HTTPS
205    } else {
206        false
207    };
208    let headers = headers
209        .iter()
210        .map(|(k, v)| (k.to_string(), v.as_bytes().to_vec()))
211        .collect::<Vec<_>>();
212    let out_req = OutgoingRequest::new(
213        Headers::from_list(&headers)
214            .map_err(|e| SendError::RequestConversion(format!("header error: {}", e)))?,
215    );
216    out_req
217        .set_method(match method {
218            http::Method::GET => &crate::wit::wasi::http::types::Method::Get,
219            http::Method::HEAD => &crate::wit::wasi::http::types::Method::Head,
220            http::Method::POST => &crate::wit::wasi::http::types::Method::Post,
221            http::Method::PUT => &crate::wit::wasi::http::types::Method::Put,
222            http::Method::DELETE => &crate::wit::wasi::http::types::Method::Delete,
223            http::Method::PATCH => &crate::wit::wasi::http::types::Method::Patch,
224            http::Method::CONNECT => &crate::wit::wasi::http::types::Method::Connect,
225            http::Method::OPTIONS => &crate::wit::wasi::http::types::Method::Options,
226            http::Method::TRACE => &crate::wit::wasi::http::types::Method::Trace,
227            _ => {
228                return Err(
229                    crate::wit::wasi::http::types::ErrorCode::HttpRequestMethodInvalid.into(),
230                )
231            }
232        })
233        .map_err(|()| {
234            SendError::RequestConversion(format!("could not set method to {}", method))
235        })?;
236    out_req
237        .set_path_with_query(uri.path_and_query().map(|path| path.as_str()))
238        .map_err(|()| {
239            SendError::RequestConversion(format!(
240                "error setting path to {:?}",
241                uri.path_and_query()
242            ))
243        })?;
244    out_req
245        .set_scheme(Some(if is_https {
246            &Scheme::Https
247        } else {
248            &Scheme::Http
249        }))
250        // According to the documentation, `Request::set_scheme` can only fail due to a malformed
251        // `Scheme::Other` payload, but we never pass `Scheme::Other` above, hence the `expect`.
252        .expect("unexpected scheme");
253    let authority = uri
254        .authority()
255        .map(|authority| authority.as_str())
256        // `wasi-http` requires an authority for outgoing requests, so we always supply one:
257        .or(Some(if is_https { ":443" } else { ":80" }));
258    out_req.set_authority(authority).map_err(|()| {
259        SendError::RequestConversion(format!("error setting authority to {authority:?}"))
260    })?;
261
262    let (request, body_buffer) = (out_req, Some(body.to_vec()));
263
264    let response = if let Some(body_buffer) = body_buffer {
265        // It is part of the contract of the trait that implementors of `TryIntoOutgoingRequest`
266        // do not call `OutgoingRequest::write`` if they return a buffered body.
267        let mut body_sink = request.take_body();
268        let response = outgoing_request_send(request);
269        body_sink
270            .send(body_buffer)
271            .await
272            .map_err(SendError::StreamIo)?;
273        drop(body_sink);
274        response.await.map_err(SendError::WasiHttp)?
275    } else {
276        outgoing_request_send(request)
277            .await
278            .map_err(SendError::WasiHttp)?
279    };
280
281    // Convert the `IncomingResponse` into a `Response`.
282    let mut new_resp = http::response::Builder::new()
283        .status(response.status())
284        .status(response.status());
285    for (name, value) in response.headers().entries() {
286        new_resp = new_resp.header(name, value);
287    }
288    let body = bytes::Bytes::from(
289        response
290            .into_body()
291            .await
292            .map_err(|e| SendError::GenericIo(e.to_debug_string()))?,
293    );
294    Ok(new_resp.body(body)?)
295
296    // TryFromIncomingResponse::try_from_incoming_response(response)
297    //     .await
298    //     .map_err(|_| SendError::ResponseConversion)
299}
300
301fn outgoing_body(body: OutgoingBody) -> impl futures::Sink<Vec<u8>, Error = StreamError> {
302    struct Outgoing(Option<(OutputStream, OutgoingBody)>);
303
304    impl Drop for Outgoing {
305        fn drop(&mut self) {
306            if let Some((stream, body)) = self.0.take() {
307                drop(stream);
308                _ = OutgoingBody::finish(body, None);
309            }
310        }
311    }
312
313    let stream = body.write().expect("response body should be writable");
314    let pair = Rc::new(RefCell::new(Outgoing(Some((stream, body)))));
315
316    futures::sink::unfold((), {
317        move |(), chunk: Vec<u8>| {
318            futures::future::poll_fn({
319                let mut offset = 0;
320                let mut flushing = false;
321                let pair = pair.clone();
322
323                move |context| {
324                    let pair = pair.borrow();
325                    let (stream, _) = &pair.0.as_ref().unwrap();
326                    loop {
327                        match stream.check_write() {
328                            Ok(0) => {
329                                push_waker(stream.subscribe(), context.waker().clone());
330                                break Poll::Pending;
331                            }
332                            Ok(count) => {
333                                if offset == chunk.len() {
334                                    if flushing {
335                                        break Poll::Ready(Ok(()));
336                                    } else {
337                                        match stream.flush() {
338                                            Ok(()) => flushing = true,
339                                            Err(StreamError::Closed) => break Poll::Ready(Ok(())),
340                                            Err(e) => break Poll::Ready(Err(e)),
341                                        }
342                                    }
343                                } else {
344                                    let count =
345                                        usize::try_from(count).unwrap().min(chunk.len() - offset);
346
347                                    match stream.write(&chunk[offset..][..count]) {
348                                        Ok(()) => {
349                                            offset += count;
350                                        }
351                                        Err(e) => break Poll::Ready(Err(e)),
352                                    }
353                                }
354                            }
355                            // If the stream is closed but the entire chunk was
356                            // written then we've done all we could so this
357                            // chunk is now complete.
358                            Err(StreamError::Closed) if offset == chunk.len() => {
359                                break Poll::Ready(Ok(()))
360                            }
361                            Err(e) => break Poll::Ready(Err(e)),
362                        }
363                    }
364                }
365            })
366        }
367    })
368}
369
370fn incoming_body(
371    body: IncomingBody,
372) -> impl futures::Stream<Item = Result<Vec<u8>, crate::wit::wasi::io::streams::Error>> {
373    struct Incoming(Option<(InputStream, IncomingBody)>);
374
375    impl Drop for Incoming {
376        fn drop(&mut self) {
377            if let Some((stream, body)) = self.0.take() {
378                drop(stream);
379                IncomingBody::finish(body);
380            }
381        }
382    }
383
384    futures::stream::poll_fn({
385        let stream = body.stream().expect("response body should be readable");
386        let pair = Incoming(Some((stream, body)));
387
388        move |context| {
389            if let Some((stream, _)) = &pair.0 {
390                match stream.read(READ_SIZE) {
391                    Ok(buffer) => {
392                        if buffer.is_empty() {
393                            push_waker(stream.subscribe(), context.waker().clone());
394                            Poll::Pending
395                        } else {
396                            Poll::Ready(Some(Ok(buffer)))
397                        }
398                    }
399                    Err(StreamError::Closed) => Poll::Ready(None),
400                    Err(StreamError::LastOperationFailed(error)) => Poll::Ready(Some(Err(error))),
401                }
402            } else {
403                Poll::Ready(None)
404            }
405        }
406    })
407}
408
409/// Send the specified request and return the response.
410fn outgoing_request_send(
411    request: OutgoingRequest,
412) -> impl Future<Output = Result<IncomingResponse, ErrorCode>> {
413    let response = outgoing_handler::handle(request, None);
414    futures::future::poll_fn({
415        move |context| match &response {
416            Ok(response) => {
417                if let Some(response) = response.get() {
418                    Poll::Ready(response.unwrap())
419                } else {
420                    push_waker(response.subscribe(), context.waker().clone());
421                    Poll::Pending
422                }
423            }
424            Err(error) => Poll::Ready(Err(error.clone())),
425        }
426    })
427}