Skip to main content

spin_sdk/http/
body.rs

1//! Utilities for working with HTTP message bodies.
2//!
3//! This module provides extension traits and utilities for working with
4//! [`wasip3::http_compat::IncomingBody`] instances, such as streaming or collecting the entire
5//! body into memory.
6//!
7//! These helpers make it easier to transform low-level streaming body types
8//! into higher-level forms (e.g., [`bytes::Bytes`]) for simplified data handling.
9
10use bytes::Bytes;
11use futures::{
12    channel::mpsc::{channel, Sender},
13    StreamExt,
14};
15use http_body_util::{BodyDataStream, BodyExt};
16use wasip3::{
17    http::types::ErrorCode,
18    http_compat::{IncomingBody, IncomingMessage},
19};
20
21/// Extension trait providing convenient methods for consuming an [`IncomingBody`].
22///
23/// This trait defines common patterns for handling HTTP body data in
24/// asynchronous contexts. It allows converting the body into a stream
25/// or fully collecting it into memory as a [`Bytes`] buffer.
26#[allow(async_fn_in_trait)]
27pub trait IncomingBodyExt {
28    /// Convert this [`IncomingBody`] into a [`BodyDataStream`].
29    ///
30    /// This method enables iteration over the body's data chunks as they
31    /// arrive, without collecting them all into memory at once. It is
32    /// suitable for processing large or streaming payloads efficiently.
33    fn stream(self) -> BodyDataStream<Self>
34    where
35        Self: Sized;
36
37    /// Consume this [`IncomingBody`] and collect it into a single [`Bytes`] buffer.
38    ///
39    /// This method reads the entire body asynchronously and returns the
40    /// concatenated contents. It is best suited for small or bounded-size
41    /// payloads where holding all data in memory is acceptable.
42    async fn bytes(self) -> Result<Bytes, ErrorCode>;
43}
44
45impl<T: IncomingMessage> IncomingBodyExt for IncomingBody<T> {
46    /// Convert this [`IncomingBody`] into a [`BodyDataStream`].
47    fn stream(self) -> BodyDataStream<Self>
48    where
49        Self: Sized,
50    {
51        BodyDataStream::new(self)
52    }
53
54    /// Collect the [`IncomingBody`] into a single [`Bytes`] buffer.
55    async fn bytes(self) -> Result<Bytes, ErrorCode> {
56        self.collect().await.map(|c| c.to_bytes())
57    }
58}
59
60/// Create a streaming body, with a `Sender` for writing to the body.
61/// This supports strings, `Bytes`, `Vec<u8>`, and any `IntoIterator<Item = u8>`.
62/// For types which are not `Into<Bytes>`, use [`stream_any`].
63///
64/// # Examples
65///
66/// ```no_run
67/// # use spin_sdk::http::Response;
68/// # use spin_sdk::http::body::stream;
69/// use futures::SinkExt;
70///
71/// let (mut tx, body) = stream::<String>();
72///
73/// spin_sdk::wasip3::spawn(async move {
74///     for i in 0..10000 {
75///         if tx.send(format!("{i}\n")).await.is_err() {
76///             break;
77///         }
78///     }
79/// });
80///
81/// let response = Response::new(body);
82/// ```
83pub fn stream<T: Into<Bytes>>() -> (
84    Sender<T>,
85    impl http_body::Body<Data = Bytes, Error = anyhow::Error>,
86) {
87    stream_any::<T>(|t| t.into())
88}
89
90/// Create a streaming body, with a `Sender` for writing to the body.
91/// This supports any type, but requires you to provider a converter from your
92/// type to `Bytes`.  (For types that implement `Into<Bytes>`, use [`stream`]).
93pub fn stream_any<T>(
94    f: impl Fn(T) -> Bytes,
95) -> (
96    Sender<T>,
97    impl http_body::Body<Data = Bytes, Error = anyhow::Error>,
98) {
99    let (tx, rx) = channel::<T>(1024);
100    let stm = rx.map(move |value| Ok(http_body::Frame::data(f(value))));
101    (tx, http_body_util::StreamBody::new(stm))
102}