http_request_body/
lib.rs

1#[cfg(feature = "hyper-request-body")]
2pub use hyper_request_body::{self, Body as HyperRequestBody};
3#[cfg(feature = "warp-request-body")]
4pub use warp_request_body::{self, Body as WarpRequestBody};
5
6use core::{
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11use bytes::Bytes;
12use futures_util::Stream;
13use pin_project_lite::pin_project;
14
15pub mod error;
16mod utils;
17
18use error::Error;
19
20//
21pin_project! {
22    #[project = BodyProj]
23    pub enum Body {
24        Bytes { inner: Bytes },
25        Stream { #[pin] inner: Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send + 'static>> },
26    }
27}
28
29impl core::fmt::Debug for Body {
30    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
31        match self {
32            Self::Bytes { inner } => f.debug_tuple("Bytes").field(&inner).finish(),
33            Self::Stream { inner: _ } => write!(f, "Stream"),
34        }
35    }
36}
37
38impl core::fmt::Display for Body {
39    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
40        write!(f, "{self:?}")
41    }
42}
43
44impl Default for Body {
45    fn default() -> Self {
46        Self::Bytes {
47            inner: Bytes::default(),
48        }
49    }
50}
51
52//
53impl Body {
54    pub fn with_bytes_from<T>(v: T) -> Self
55    where
56        T: Into<Bytes>,
57    {
58        Self::Bytes { inner: v.into() }
59    }
60
61    pub fn with_bytes(bytes: Bytes) -> Self {
62        Self::Bytes { inner: bytes }
63    }
64
65    pub fn with_stream(stream: impl Stream<Item = Result<Bytes, Error>> + Send + 'static) -> Self {
66        Self::Stream {
67            inner: Box::pin(stream),
68        }
69    }
70
71    #[cfg(feature = "hyper-request-body")]
72    pub fn from_hyper_body(body: hyper_request_body::HyperBody) -> Self {
73        use futures_util::TryStreamExt as _;
74
75        Self::with_stream(body.map_err(|err| Error::Other(err.into())))
76    }
77
78    #[cfg(feature = "hyper-request-body")]
79    pub fn from_hyper_request_body(body: hyper_request_body::Body) -> Self {
80        use futures_util::TryStreamExt as _;
81        use hyper_request_body::error::Error as HyperRequestBodyError;
82
83        match body {
84            hyper_request_body::Body::HyperBody { inner } => {
85                Self::with_stream(inner.map_err(|err| Error::Other(err.into())))
86            }
87            body => Self::with_stream(body.map_err(|err| match err {
88                HyperRequestBodyError::HyperError(err) => Error::Other(err.into()),
89                HyperRequestBodyError::Other(err) => Error::Other(err),
90            })),
91        }
92    }
93
94    #[cfg(feature = "warp-request-body")]
95    pub fn from_warp_request_body(body: warp_request_body::Body) -> Self {
96        use futures_util::TryStreamExt as _;
97        use warp_request_body::error::Error as WarpRequestBodyError;
98
99        match body {
100            warp_request_body::Body::HyperBody { inner } => {
101                Self::with_stream(inner.map_err(|err| Error::Other(err.into())))
102            }
103            body => Self::with_stream(body.map_err(|err| match err {
104                WarpRequestBodyError::HyperError(err) => Error::Other(err.into()),
105                WarpRequestBodyError::WarpError(err) => Error::Other(err.into()),
106            })),
107        }
108    }
109}
110
111#[cfg(feature = "hyper-request-body")]
112impl From<hyper_request_body::HyperBody> for Body {
113    fn from(body: hyper_request_body::HyperBody) -> Self {
114        Self::from_hyper_body(body)
115    }
116}
117
118#[cfg(feature = "hyper-request-body")]
119impl From<hyper_request_body::Body> for Body {
120    fn from(body: hyper_request_body::Body) -> Self {
121        Self::from_hyper_request_body(body)
122    }
123}
124
125#[cfg(feature = "warp-request-body")]
126impl From<warp_request_body::Body> for Body {
127    fn from(body: warp_request_body::Body) -> Self {
128        Self::from_warp_request_body(body)
129    }
130}
131
132impl Body {
133    pub fn require_to_bytes_async(&self) -> bool {
134        matches!(self, Self::Stream { inner: _ })
135    }
136
137    pub fn to_bytes(self) -> Bytes {
138        match self {
139            Self::Bytes { inner } => inner,
140            Self::Stream { inner: _ } => panic!("Please call require_to_bytes_async first"),
141        }
142    }
143
144    pub async fn to_bytes_async(self) -> Result<Bytes, Error> {
145        match self {
146            Self::Bytes { inner } => Ok(inner),
147            Self::Stream { inner } => utils::bytes_stream_to_bytes(inner)
148                .await
149                .map_err(Into::into),
150        }
151    }
152}
153
154//
155impl Stream for Body {
156    type Item = Result<Bytes, Error>;
157
158    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
159        match self.project() {
160            BodyProj::Bytes { inner } => {
161                if !inner.is_empty() {
162                    let bytes = inner.clone();
163                    inner.clear();
164                    Poll::Ready(Some(Ok(bytes)))
165                } else {
166                    Poll::Ready(None)
167                }
168            }
169            BodyProj::Stream { inner } => inner.poll_next(cx).map_err(Into::into),
170        }
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use futures_util::StreamExt as _;
177
178    use super::*;
179
180    #[tokio::test]
181    async fn test_with_bytes() {
182        //
183        let bytes = Bytes::from_static(b"foo");
184        let body = Body::with_bytes(bytes);
185        assert!(matches!(body, Body::Bytes { inner: _ }));
186        assert!(!body.require_to_bytes_async());
187        assert_eq!(body.to_bytes(), Bytes::copy_from_slice(b"foo"));
188    }
189
190    #[tokio::test]
191    async fn test_with_stream() {
192        //
193        let stream =
194            futures_util::stream::once(async { Ok(Bytes::copy_from_slice(b"foo")) }).boxed();
195        let body = Body::with_stream(stream);
196        assert!(matches!(body, Body::Stream { inner: _ }));
197        assert!(body.require_to_bytes_async());
198        assert_eq!(
199            body.to_bytes_async().await.unwrap(),
200            Bytes::copy_from_slice(b"foo")
201        );
202    }
203
204    #[cfg(feature = "hyper-request-body")]
205    #[tokio::test]
206    async fn test_from_hyper_body() {
207        //
208        let body = hyper_request_body::HyperBody::from("foo");
209        let body = Body::from_hyper_body(body);
210        assert!(matches!(body, Body::Stream { inner: _ }));
211        assert!(body.require_to_bytes_async());
212        assert_eq!(
213            body.to_bytes_async().await.unwrap(),
214            Bytes::copy_from_slice(b"foo")
215        );
216    }
217
218    #[cfg(feature = "hyper-request-body")]
219    #[tokio::test]
220    async fn test_from_hyper_request_body() {
221        //
222        let body = hyper_request_body::Body::HyperBody {
223            inner: hyper_request_body::HyperBody::from("foo"),
224        };
225        let body = Body::from_hyper_request_body(body);
226        assert!(matches!(body, Body::Stream { inner: _ }));
227        assert!(body.require_to_bytes_async());
228        assert_eq!(
229            body.to_bytes_async().await.unwrap(),
230            Bytes::copy_from_slice(b"foo")
231        );
232    }
233
234    #[cfg(feature = "warp-request-body")]
235    #[tokio::test]
236    async fn test_from_warp_request_body() {
237        //
238        let body = warp_request_body::Body::Bytes {
239            inner: Bytes::from_static(b"foo"),
240        };
241        let body = Body::from_warp_request_body(body);
242        assert!(matches!(body, Body::Stream { inner: _ }));
243        assert!(body.require_to_bytes_async());
244        assert_eq!(
245            body.to_bytes_async().await.unwrap(),
246            Bytes::copy_from_slice(b"foo")
247        );
248    }
249}