volo_http/
body.rs

1//! HTTP Body implementation for [`http_body::Body`]
2//!
3//! See [`Body`] for more details.
4
5use std::{
6    convert::Infallible,
7    error::Error,
8    fmt,
9    future::Future,
10    pin::Pin,
11    task::{Context, Poll},
12};
13
14use bytes::Bytes;
15use faststr::FastStr;
16use futures_util::stream::Stream;
17use http_body::{Frame, SizeHint};
18use http_body_util::{BodyExt, Full, StreamBody, combinators::BoxBody};
19use hyper::body::Incoming;
20use linkedbytes::{LinkedBytes, Node};
21use pin_project::pin_project;
22#[cfg(feature = "json")]
23use serde::de::DeserializeOwned;
24
25use crate::error::BoxError;
26
27// The `futures_util::stream::BoxStream` does not have `Sync`
28type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'a>>;
29
30/// An implementation for [`http_body::Body`].
31#[pin_project]
32pub struct Body {
33    #[pin]
34    repr: BodyRepr,
35}
36
37#[pin_project(project = BodyProj)]
38enum BodyRepr {
39    /// Complete [`Bytes`], with a certain size and content
40    Full(#[pin] Full<Bytes>),
41    /// Wrapper of [`Incoming`], it usually appears in request of server or response of client.
42    ///
43    /// Althrough [`Incoming`] implements [`http_body::Body`], the type is so commonly used, we
44    /// wrap it here as [`BodyRepr::Hyper`] to avoid cost of [`Box`] with dynamic dispatch.
45    Hyper(#[pin] Incoming),
46    /// Boxed stream with `Item = Result<Frame<Bytes>, BoxError>`
47    Stream(#[pin] StreamBody<BoxStream<'static, Result<Frame<Bytes>, BoxError>>>),
48    /// Boxed [`http_body::Body`]
49    Body(#[pin] BoxBody<Bytes, BoxError>),
50}
51
52impl Default for Body {
53    fn default() -> Self {
54        Body::empty()
55    }
56}
57
58impl Body {
59    /// Create an empty body.
60    pub fn empty() -> Self {
61        Self {
62            repr: BodyRepr::Full(Full::new(Bytes::new())),
63        }
64    }
65
66    /// Create a body by [`Incoming`].
67    ///
68    /// Compared to [`Body::from_body`], this function avoids overhead of allocating by [`Box`]
69    /// and dynamic dispatch by [`dyn http_body::Body`][http_body::Body].
70    pub fn from_incoming(incoming: Incoming) -> Self {
71        Self {
72            repr: BodyRepr::Hyper(incoming),
73        }
74    }
75
76    /// Create a body by a [`Stream`] with `Item = Result<Frame<Bytes>, BoxError>`.
77    pub fn from_stream<S>(stream: S) -> Self
78    where
79        S: Stream<Item = Result<Frame<Bytes>, BoxError>> + Send + Sync + 'static,
80    {
81        Self {
82            repr: BodyRepr::Stream(StreamBody::new(Box::pin(stream))),
83        }
84    }
85
86    /// Create a body by another [`http_body::Body`] instance.
87    pub fn from_body<B>(body: B) -> Self
88    where
89        B: http_body::Body<Data = Bytes> + Send + Sync + 'static,
90        B::Error: Into<BoxError>,
91    {
92        Self {
93            repr: BodyRepr::Body(BoxBody::new(body.map_err(Into::into))),
94        }
95    }
96}
97
98impl http_body::Body for Body {
99    type Data = Bytes;
100    type Error = BoxError;
101
102    fn poll_frame(
103        self: Pin<&mut Self>,
104        cx: &mut Context<'_>,
105    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
106        match self.project().repr.project() {
107            BodyProj::Full(full) => http_body::Body::poll_frame(full, cx).map_err(BoxError::from),
108            BodyProj::Hyper(incoming) => {
109                http_body::Body::poll_frame(incoming, cx).map_err(BoxError::from)
110            }
111            BodyProj::Stream(stream) => http_body::Body::poll_frame(stream, cx),
112            BodyProj::Body(body) => http_body::Body::poll_frame(body, cx),
113        }
114    }
115
116    fn is_end_stream(&self) -> bool {
117        match &self.repr {
118            BodyRepr::Full(full) => http_body::Body::is_end_stream(full),
119            BodyRepr::Hyper(incoming) => http_body::Body::is_end_stream(incoming),
120            BodyRepr::Stream(stream) => http_body::Body::is_end_stream(stream),
121            BodyRepr::Body(body) => http_body::Body::is_end_stream(body),
122        }
123    }
124
125    fn size_hint(&self) -> SizeHint {
126        match &self.repr {
127            BodyRepr::Full(full) => http_body::Body::size_hint(full),
128            BodyRepr::Hyper(incoming) => http_body::Body::size_hint(incoming),
129            BodyRepr::Stream(stream) => http_body::Body::size_hint(stream),
130            BodyRepr::Body(body) => http_body::Body::size_hint(body),
131        }
132    }
133}
134
135impl fmt::Debug for Body {
136    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137        match &self.repr {
138            BodyRepr::Full(_) => f.write_str("Body::Full"),
139            BodyRepr::Hyper(_) => f.write_str("Body::Hyper"),
140            BodyRepr::Stream(_) => f.write_str("Body::Stream"),
141            BodyRepr::Body(_) => f.write_str("Body::Body"),
142        }
143    }
144}
145
146mod sealed {
147    pub trait SealedBody
148    where
149        Self: http_body::Body + Sized + Send,
150        Self::Data: Send,
151    {
152    }
153
154    impl<T> SealedBody for T
155    where
156        T: http_body::Body + Send,
157        T::Data: Send,
158    {
159    }
160}
161
162/// An extend trait for [`http_body::Body`] that can converting a body to other types
163pub trait BodyConversion: sealed::SealedBody
164where
165    <Self as http_body::Body>::Data: Send,
166{
167    /// Consume a body and convert it into [`Bytes`].
168    fn into_bytes(self) -> impl Future<Output = Result<Bytes, BodyConvertError>> + Send {
169        async {
170            Ok(self
171                .collect()
172                .await
173                .map_err(|_| BodyConvertError::BodyCollectionError)?
174                .to_bytes())
175        }
176    }
177
178    /// Consume a body and convert it into [`Vec<u8>`].
179    fn into_vec(self) -> impl Future<Output = Result<Vec<u8>, BodyConvertError>> + Send {
180        async { Ok(self.into_bytes().await?.into()) }
181    }
182
183    /// Consume a body and convert it into [`String`].
184    fn into_string(self) -> impl Future<Output = Result<String, BodyConvertError>> + Send {
185        async {
186            let vec = self.into_vec().await?;
187
188            // SAFETY: The `Vec<u8>` is checked by `simdutf8` and it is a valid `String`
189            let _ =
190                simdutf8::basic::from_utf8(&vec).map_err(|_| BodyConvertError::StringUtf8Error)?;
191            Ok(unsafe { String::from_utf8_unchecked(vec) })
192        }
193    }
194
195    /// Consume a body and convert it into [`String`].
196    ///
197    /// # Safety
198    ///
199    /// It is up to the caller to guarantee that the value really is valid. Using this when the
200    /// content is invalid causes immediate undefined behavior.
201    unsafe fn into_string_unchecked(
202        self,
203    ) -> impl Future<Output = Result<String, BodyConvertError>> + Send {
204        async {
205            let vec = self.into_vec().await?;
206
207            Ok(unsafe { String::from_utf8_unchecked(vec) })
208        }
209    }
210
211    /// Consume a body and convert it into [`FastStr`].
212    fn into_faststr(self) -> impl Future<Output = Result<FastStr, BodyConvertError>> + Send {
213        async {
214            let bytes = self.into_bytes().await?;
215
216            // SAFETY: The `Vec<u8>` is checked by `simdutf8` and it is a valid `String`
217            let _ = simdutf8::basic::from_utf8(&bytes)
218                .map_err(|_| BodyConvertError::StringUtf8Error)?;
219            Ok(unsafe { FastStr::from_bytes_unchecked(bytes) })
220        }
221    }
222
223    /// Consume a body and convert it into [`FastStr`].
224    ///
225    /// # Safety
226    ///
227    /// It is up to the caller to guarantee that the value really is valid. Using this when the
228    /// content is invalid causes immediate undefined behavior.
229    unsafe fn into_faststr_unchecked(
230        self,
231    ) -> impl Future<Output = Result<FastStr, BodyConvertError>> + Send {
232        async {
233            let bytes = self.into_bytes().await?;
234
235            Ok(unsafe { FastStr::from_bytes_unchecked(bytes) })
236        }
237    }
238
239    /// Consume a body and convert it into an instance with [`DeserializeOwned`].
240    #[cfg(feature = "json")]
241    fn into_json<T>(self) -> impl Future<Output = Result<T, BodyConvertError>> + Send
242    where
243        T: DeserializeOwned,
244    {
245        async {
246            let bytes = self.into_bytes().await?;
247            crate::utils::json::deserialize(&bytes).map_err(BodyConvertError::JsonDeserializeError)
248        }
249    }
250}
251
252impl<T> BodyConversion for T
253where
254    T: sealed::SealedBody,
255    <T as http_body::Body>::Data: Send,
256{
257}
258
259/// General error for polling [`http_body::Body`] or converting the [`Bytes`] just polled.
260#[derive(Debug)]
261pub enum BodyConvertError {
262    /// Failed to collect the body
263    BodyCollectionError,
264    /// The body is not a valid utf-8 string
265    StringUtf8Error,
266    /// Failed to deserialize the json
267    #[cfg(feature = "json")]
268    JsonDeserializeError(crate::utils::json::Error),
269}
270
271impl fmt::Display for BodyConvertError {
272    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273        match self {
274            Self::BodyCollectionError => f.write_str("failed to collect body"),
275            Self::StringUtf8Error => f.write_str("body is not a valid string"),
276            #[cfg(feature = "json")]
277            Self::JsonDeserializeError(e) => write!(f, "failed to deserialize body: {e}"),
278        }
279    }
280}
281
282impl Error for BodyConvertError {
283    fn source(&self) -> Option<&(dyn Error + 'static)> {
284        match self {
285            #[cfg(feature = "json")]
286            Self::JsonDeserializeError(e) => Some(e),
287            _ => None,
288        }
289    }
290}
291
292impl From<()> for Body {
293    fn from(_: ()) -> Self {
294        Self::empty()
295    }
296}
297
298impl From<&'static str> for Body {
299    fn from(value: &'static str) -> Self {
300        Self {
301            repr: BodyRepr::Full(Full::new(Bytes::from_static(value.as_bytes()))),
302        }
303    }
304}
305
306impl From<Vec<u8>> for Body {
307    fn from(value: Vec<u8>) -> Self {
308        Self {
309            repr: BodyRepr::Full(Full::new(Bytes::from(value))),
310        }
311    }
312}
313
314impl From<Bytes> for Body {
315    fn from(value: Bytes) -> Self {
316        Self {
317            repr: BodyRepr::Full(Full::new(value)),
318        }
319    }
320}
321
322impl From<FastStr> for Body {
323    fn from(value: FastStr) -> Self {
324        Self {
325            repr: BodyRepr::Full(Full::new(value.into_bytes())),
326        }
327    }
328}
329
330impl From<String> for Body {
331    fn from(value: String) -> Self {
332        Self {
333            repr: BodyRepr::Full(Full::new(Bytes::from(value))),
334        }
335    }
336}
337
338struct LinkedBytesBody<I> {
339    inner: I,
340}
341
342impl<I> http_body::Body for LinkedBytesBody<I>
343where
344    I: Iterator<Item = Node> + Unpin,
345{
346    type Data = Bytes;
347    type Error = Infallible;
348
349    fn poll_frame(
350        self: Pin<&mut Self>,
351        _: &mut Context<'_>,
352    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
353        let this = self.get_mut();
354        let Some(node) = this.inner.next() else {
355            return Poll::Ready(None);
356        };
357        let bytes = match node {
358            Node::Bytes(bytes) => bytes,
359            Node::BytesMut(bytesmut) => bytesmut.freeze(),
360            Node::FastStr(faststr) => faststr.into_bytes(),
361        };
362        Poll::Ready(Some(Ok(Frame::data(bytes))))
363    }
364
365    fn is_end_stream(&self) -> bool {
366        false
367    }
368
369    fn size_hint(&self) -> SizeHint {
370        let (lower, upper) = self.inner.size_hint();
371        let mut size_hint = SizeHint::new();
372        size_hint.set_lower(lower as u64);
373        if let Some(upper) = upper {
374            size_hint.set_upper(upper as u64);
375        }
376        size_hint
377    }
378}
379
380impl From<LinkedBytes> for Body {
381    fn from(value: LinkedBytes) -> Self {
382        Body::from_body(LinkedBytesBody {
383            inner: value.into_iter_list(),
384        })
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use bytes::Bytes;
391    use faststr::FastStr;
392    use linkedbytes::LinkedBytes;
393
394    use super::Body;
395    use crate::body::BodyConversion;
396
397    #[tokio::test]
398    async fn test_from_linked_bytes() {
399        let mut bytes = LinkedBytes::new();
400        bytes.insert(Bytes::from_static(b"Hello, "));
401        bytes.insert_faststr(FastStr::new("world!"));
402        let body = Body::from(bytes);
403        assert_eq!(body.into_string().await.unwrap(), "Hello, world!");
404    }
405}