spin_sdk/http/body.rs
1//! Utilities for working with HTTP message bodies.
2//!
3//! This module provides extension traits and utilities for working with
4//! [`wasip3::http_compat::IncomingBody`] instances, such as streaming or collecting the entire
5//! body into memory.
6//!
7//! These helpers make it easier to transform low-level streaming body types
8//! into higher-level forms (e.g., [`bytes::Bytes`]) for simplified data handling.
9
10use bytes::Bytes;
11use futures::{
12 channel::mpsc::{channel, Sender},
13 StreamExt,
14};
15use http_body_util::{BodyDataStream, BodyExt};
16use wasip3::{
17 http::types::ErrorCode,
18 http_compat::{IncomingBody, IncomingMessage},
19};
20
21/// Extension trait providing convenient methods for consuming an [`IncomingBody`].
22///
23/// This trait defines common patterns for handling HTTP body data in
24/// asynchronous contexts. It allows converting the body into a stream
25/// or fully collecting it into memory as a [`Bytes`] buffer.
26#[allow(async_fn_in_trait)]
27pub trait IncomingBodyExt {
28 /// Convert this [`IncomingBody`] into a [`BodyDataStream`].
29 ///
30 /// This method enables iteration over the body's data chunks as they
31 /// arrive, without collecting them all into memory at once. It is
32 /// suitable for processing large or streaming payloads efficiently.
33 fn stream(self) -> BodyDataStream<Self>
34 where
35 Self: Sized;
36
37 /// Consume this [`IncomingBody`] and collect it into a single [`Bytes`] buffer.
38 ///
39 /// This method reads the entire body asynchronously and returns the
40 /// concatenated contents. It is best suited for small or bounded-size
41 /// payloads where holding all data in memory is acceptable.
42 async fn bytes(self) -> Result<Bytes, ErrorCode>;
43}
44
45impl<T: IncomingMessage> IncomingBodyExt for IncomingBody<T> {
46 /// Convert this [`IncomingBody`] into a [`BodyDataStream`].
47 fn stream(self) -> BodyDataStream<Self>
48 where
49 Self: Sized,
50 {
51 BodyDataStream::new(self)
52 }
53
54 /// Collect the [`IncomingBody`] into a single [`Bytes`] buffer.
55 async fn bytes(self) -> Result<Bytes, ErrorCode> {
56 self.collect().await.map(|c| c.to_bytes())
57 }
58}
59
60/// Create a streaming body, with a `Sender` for writing to the body.
61/// This supports strings, `Bytes`, `Vec<u8>`, and any `IntoIterator<Item = u8>`.
62/// For types which are not `Into<Bytes>`, use [`stream_any`].
63///
64/// # Examples
65///
66/// ```no_run
67/// # use spin_sdk::http::Response;
68/// # use spin_sdk::http::body::stream;
69/// use futures::SinkExt;
70///
71/// let (mut tx, body) = stream::<String>();
72///
73/// spin_sdk::wasip3::spawn(async move {
74/// for i in 0..10000 {
75/// if tx.send(format!("{i}\n")).await.is_err() {
76/// break;
77/// }
78/// }
79/// });
80///
81/// let response = Response::new(body);
82/// ```
83pub fn stream<T: Into<Bytes>>() -> (
84 Sender<T>,
85 impl http_body::Body<Data = Bytes, Error = anyhow::Error>,
86) {
87 stream_any::<T>(|t| t.into())
88}
89
90/// Create a streaming body, with a `Sender` for writing to the body.
91/// This supports any type, but requires you to provider a converter from your
92/// type to `Bytes`. (For types that implement `Into<Bytes>`, use [`stream`]).
93pub fn stream_any<T>(
94 f: impl Fn(T) -> Bytes,
95) -> (
96 Sender<T>,
97 impl http_body::Body<Data = Bytes, Error = anyhow::Error>,
98) {
99 let (tx, rx) = channel::<T>(1024);
100 let stm = rx.map(move |value| Ok(http_body::Frame::data(f(value))));
101 (tx, http_body_util::StreamBody::new(stm))
102}