Skip to main content

tako_rs_core/
body.rs

1//! HTTP request and response body handling utilities for efficient data processing.
2//!
3//! This module provides `TakoBody`, a flexible wrapper around HTTP body implementations
4//! that supports various data sources including static content, streams, and dynamic
5//! generation. It integrates with Hyper's body system while providing convenience methods
6//! for common use cases like creating empty bodies, streaming data, and converting from
7//! different input types with efficient memory management.
8//!
9//! # Examples
10//!
11//! ```rust
12//! use tako::body::TakoBody;
13//! use bytes::Bytes;
14//! use futures_util::stream;
15//!
16//! // Create empty body
17//! let empty = TakoBody::empty();
18//!
19//! // Create from string
20//! let text_body = TakoBody::from("Hello, World!");
21//!
22//! // Create from bytes
23//! let bytes_body = TakoBody::from(Bytes::from("Binary data"));
24//!
25//! // Create from stream
26//! let stream_data = stream::iter(vec![
27//!     Ok(Bytes::from("chunk1")),
28//!     Ok(Bytes::from("chunk2")),
29//! ]);
30//! let stream_body = TakoBody::from_stream(stream_data);
31//! ```
32
33use std::convert::Infallible;
34use std::fmt::Debug;
35use std::pin::Pin;
36use std::task::Context;
37use std::task::Poll;
38
39use anyhow::Result;
40use bytes::Bytes;
41use futures_util::Stream;
42use futures_util::TryStream;
43use futures_util::TryStreamExt;
44use http_body::Body;
45use http_body::Frame;
46use http_body::SizeHint;
47use http_body_util::BodyExt;
48use http_body_util::Empty;
49use http_body_util::Full;
50use http_body_util::StreamBody;
51
52use crate::types::BoxBody;
53use crate::types::BoxError;
54
55/// Internal enum to avoid heap-boxing for the most common body kinds.
56/// `Full`, `Empty`, and `Incoming` are stored inline (zero allocations).
57/// Anything else (streams, mapped bodies, etc.) goes through the `Boxed` variant.
58#[allow(dead_code)]
59enum BodyInner {
60  Full(Full<Bytes>),
61  Empty(Empty<Bytes>),
62  /// Hyper's incoming request body — stored inline to avoid boxing on every request.
63  Incoming(hyper::body::Incoming),
64  Boxed(BoxBody),
65}
66
67/// HTTP body wrapper with streaming and conversion support.
68///
69/// `TakoBody` provides a unified interface for handling HTTP request and response bodies
70/// with support for various data sources. It wraps Hyper's body system with additional
71/// convenience methods and efficient conversion capabilities. The implementation supports
72/// both static content and streaming data while maintaining performance through zero-copy
73/// operations where possible.
74///
75/// # Examples
76///
77/// ```rust
78/// use tako::body::TakoBody;
79/// use http_body_util::Full;
80/// use bytes::Bytes;
81///
82/// // Static content
83/// let static_body = TakoBody::from("Static response");
84///
85/// // Dynamic content
86/// let dynamic = format!("User count: {}", 42);
87/// let dynamic_body = TakoBody::from(dynamic);
88///
89/// // Binary data
90/// let binary_data = vec![0u8, 1, 2, 3, 4];
91/// let binary_body = TakoBody::from(binary_data);
92///
93/// // Empty response
94/// let empty_body = TakoBody::empty();
95/// ```
96pub struct TakoBody(BodyInner);
97
98impl std::fmt::Debug for TakoBody {
99  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100    f.debug_struct("TakoBody").finish_non_exhaustive()
101  }
102}
103
104impl TakoBody {
105  /// Creates a new body from any type implementing the `Body` trait.
106  ///
107  /// This is the generic (boxing) path — prefer [`full`](Self::full) or
108  /// [`empty`](Self::empty) when the concrete type is known.
109  #[inline]
110  pub fn new<B>(body: B) -> Self
111  where
112    B: Body<Data = Bytes> + Send + 'static,
113    B::Error: Into<BoxError>,
114  {
115    Self(BodyInner::Boxed(
116      body.map_err(std::convert::Into::into).boxed_unsync(),
117    ))
118  }
119
120  /// Creates a body from a `Full<Bytes>` **without heap-boxing**.
121  #[inline]
122  pub fn full(body: Full<Bytes>) -> Self {
123    Self(BodyInner::Full(body))
124  }
125
126  /// Wraps a `hyper::body::Incoming` **without heap-boxing**.
127  #[inline]
128  #[doc(hidden)]
129  pub fn incoming(body: hyper::body::Incoming) -> Self {
130    Self(BodyInner::Incoming(body))
131  }
132
133  /// Creates a body from a stream of byte results.
134  #[inline]
135  pub fn from_stream<S, E>(stream: S) -> Self
136  where
137    S: Stream<Item = Result<Bytes, E>> + Send + 'static,
138    E: Into<BoxError> + Debug + 'static,
139  {
140    let stream = stream.map_err(Into::into).map_ok(http_body::Frame::data);
141    let body = StreamBody::new(stream).boxed_unsync();
142    Self(BodyInner::Boxed(body))
143  }
144
145  /// Creates a body from a stream of HTTP frames.
146  #[inline]
147  pub fn from_try_stream<S, E>(stream: S) -> Self
148  where
149    S: TryStream<Ok = Frame<Bytes>, Error = E> + Send + 'static,
150    E: Into<BoxError> + 'static,
151  {
152    let body = StreamBody::new(stream.map_err(Into::into)).boxed_unsync();
153    Self(BodyInner::Boxed(body))
154  }
155
156  /// Creates an empty body with no content **without heap-boxing**.
157  #[inline]
158  #[must_use]
159  pub fn empty() -> Self {
160    Self(BodyInner::Empty(Empty::new()))
161  }
162}
163
164/// Provides a default empty body implementation.
165impl Default for TakoBody {
166  fn default() -> Self {
167    Self::empty()
168  }
169}
170
171impl From<()> for TakoBody {
172  fn from((): ()) -> Self {
173    Self::empty()
174  }
175}
176
177impl From<&str> for TakoBody {
178  fn from(buf: &str) -> Self {
179    Self::full(Full::from(Bytes::from(buf.to_owned())))
180  }
181}
182
183impl From<String> for TakoBody {
184  fn from(buf: String) -> Self {
185    Self::full(Full::from(Bytes::from(buf)))
186  }
187}
188
189impl From<Vec<u8>> for TakoBody {
190  fn from(buf: Vec<u8>) -> Self {
191    Self::full(Full::from(Bytes::from(buf)))
192  }
193}
194
195impl From<Bytes> for TakoBody {
196  fn from(buf: Bytes) -> Self {
197    Self::full(Full::from(buf))
198  }
199}
200
201/// Converts an `Infallible` poll result into a `BoxError` poll result at zero cost.
202#[inline]
203fn map_infallible_frame(
204  poll: Poll<Option<core::result::Result<Frame<Bytes>, Infallible>>>,
205) -> Poll<Option<core::result::Result<Frame<Bytes>, BoxError>>> {
206  poll.map(|opt| opt.map(|res| res.map_err(|e| match e {})))
207}
208
209/// Converts a `hyper::Error` poll result into a `BoxError` poll result.
210#[inline]
211fn map_hyper_frame(
212  poll: Poll<Option<core::result::Result<Frame<Bytes>, hyper::Error>>>,
213) -> Poll<Option<core::result::Result<Frame<Bytes>, BoxError>>> {
214  poll.map(|opt| opt.map(|res| res.map_err(Into::into)))
215}
216
217impl Body for TakoBody {
218  type Data = Bytes;
219  type Error = BoxError;
220
221  #[inline]
222  fn poll_frame(
223    self: Pin<&mut Self>,
224    cx: &mut Context<'_>,
225  ) -> Poll<Option<core::result::Result<Frame<Self::Data>, Self::Error>>> {
226    // All variants are Unpin, so get_mut is safe.
227    match &mut self.get_mut().0 {
228      BodyInner::Full(body) => map_infallible_frame(Pin::new(body).poll_frame(cx)),
229      BodyInner::Empty(body) => map_infallible_frame(Pin::new(body).poll_frame(cx)),
230      BodyInner::Incoming(body) => map_hyper_frame(Pin::new(body).poll_frame(cx)),
231      BodyInner::Boxed(body) => Pin::new(body).poll_frame(cx),
232    }
233  }
234
235  #[inline]
236  fn size_hint(&self) -> SizeHint {
237    match &self.0 {
238      BodyInner::Full(body) => body.size_hint(),
239      BodyInner::Empty(body) => body.size_hint(),
240      BodyInner::Incoming(body) => body.size_hint(),
241      BodyInner::Boxed(body) => body.size_hint(),
242    }
243  }
244
245  #[inline]
246  fn is_end_stream(&self) -> bool {
247    match &self.0 {
248      BodyInner::Full(body) => body.is_end_stream(),
249      BodyInner::Empty(body) => body.is_end_stream(),
250      BodyInner::Incoming(body) => body.is_end_stream(),
251      BodyInner::Boxed(body) => body.is_end_stream(),
252    }
253  }
254}