hyper_request_body/
lib.rs

1pub use hyper::Body as HyperBody;
2#[cfg(feature = "warp-request-body")]
3pub use warp_request_body;
4
5use core::{
6    fmt,
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11use bytes::Bytes;
12use futures_util::Stream;
13use hyper::Request as HyperRequest;
14use pin_project_lite::pin_project;
15
16pub mod error;
17mod utils;
18
19use error::Error;
20
21//
22pin_project! {
23    #[project = BodyProj]
24    pub enum Body {
25        HyperBody { #[pin] inner: HyperBody },
26        Stream { #[pin] inner: Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send + 'static>> },
27    }
28}
29
30impl fmt::Debug for Body {
31    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32        match self {
33            Self::HyperBody { inner: _ } => write!(f, "HyperBody"),
34            Self::Stream { inner: _ } => write!(f, "Stream"),
35        }
36    }
37}
38
39impl fmt::Display for Body {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        write!(f, "{self:?}")
42    }
43}
44
45impl Default for Body {
46    fn default() -> Self {
47        Self::HyperBody {
48            inner: HyperBody::default(),
49        }
50    }
51}
52
53//
54impl Body {
55    pub fn with_hyper_body(hyper_body: HyperBody) -> Self {
56        Self::HyperBody { inner: hyper_body }
57    }
58
59    pub fn with_stream(stream: impl Stream<Item = Result<Bytes, Error>> + Send + 'static) -> Self {
60        Self::Stream {
61            inner: Box::pin(stream),
62        }
63    }
64
65    #[cfg(feature = "warp-request-body")]
66    pub fn from_warp_request_body(body: warp_request_body::Body) -> Self {
67        use futures_util::TryStreamExt as _;
68        use warp_request_body::error::Error as WarpRequestBodyError;
69
70        match body {
71            warp_request_body::Body::HyperBody { inner } => Self::with_hyper_body(inner),
72            body => Self::with_stream(body.map_err(|err| match err {
73                WarpRequestBodyError::HyperError(err) => Error::HyperError(err),
74                WarpRequestBodyError::WarpError(err) => Error::Other(err.into()),
75            })),
76        }
77    }
78
79    #[cfg(feature = "warp")]
80    // https://github.com/seanmonstar/warp/blob/v0.3.2/src/filters/body.rs#L291
81    pub fn from_warp_body_stream(
82        stream: impl Stream<Item = Result<impl warp::Buf + 'static, warp::Error>> + Send + 'static,
83    ) -> Self {
84        use futures_util::TryStreamExt as _;
85
86        // Copy from warp_request_body::utils::buf_to_bytes
87        fn buf_to_bytes(mut buf: impl warp::Buf) -> Bytes {
88            let mut bytes_mut = bytes::BytesMut::new();
89            while buf.has_remaining() {
90                bytes_mut.extend_from_slice(buf.chunk());
91                let cnt = buf.chunk().len();
92                buf.advance(cnt);
93            }
94            bytes_mut.freeze()
95        }
96
97        Self::with_stream(
98            stream
99                .map_ok(buf_to_bytes)
100                .map_err(|err| Error::Other(err.into())),
101        )
102    }
103}
104
105impl From<HyperBody> for Body {
106    fn from(body: HyperBody) -> Self {
107        Self::with_hyper_body(body)
108    }
109}
110
111#[cfg(feature = "warp-request-body")]
112impl From<warp_request_body::Body> for Body {
113    fn from(body: warp_request_body::Body) -> Self {
114        Self::from_warp_request_body(body)
115    }
116}
117
118impl Body {
119    pub async fn to_bytes_async(self) -> Result<Bytes, Error> {
120        match self {
121            Self::HyperBody { inner } => {
122                utils::hyper_body_to_bytes(inner).await.map_err(Into::into)
123            }
124            Self::Stream { inner } => utils::bytes_stream_to_bytes(inner)
125                .await
126                .map_err(Into::into),
127        }
128    }
129}
130
131//
132impl Stream for Body {
133    type Item = Result<Bytes, Error>;
134
135    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
136        match self.project() {
137            BodyProj::HyperBody { inner } => inner.poll_next(cx).map_err(Into::into),
138            BodyProj::Stream { inner } => inner.poll_next(cx).map_err(Into::into),
139        }
140    }
141}
142
143//
144pub fn hyper_body_request_to_body_request(req: HyperRequest<HyperBody>) -> HyperRequest<Body> {
145    let (parts, body) = req.into_parts();
146    HyperRequest::from_parts(parts, Body::with_hyper_body(body))
147}
148
149pub fn stream_request_to_body_request(
150    req: HyperRequest<impl Stream<Item = Result<Bytes, Error>> + Send + 'static>,
151) -> HyperRequest<Body> {
152    let (parts, body) = req.into_parts();
153    HyperRequest::from_parts(parts, Body::with_stream(body))
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159
160    use futures_util::{stream::BoxStream, StreamExt as _, TryStreamExt};
161
162    #[tokio::test]
163    async fn test_with_hyper_body() {
164        //
165        let hyper_body = HyperBody::from("foo");
166        let body = Body::with_hyper_body(hyper_body);
167        assert!(matches!(body, Body::HyperBody { inner: _ }));
168        assert_eq!(
169            body.to_bytes_async().await.unwrap(),
170            Bytes::copy_from_slice(b"foo")
171        );
172
173        //
174        let hyper_body = HyperBody::from("foo");
175        let mut body = Body::with_hyper_body(hyper_body);
176        assert_eq!(
177            body.next().await.unwrap().unwrap(),
178            Bytes::copy_from_slice(b"foo")
179        );
180        assert!(body.next().await.is_none());
181
182        //
183        let req = HyperRequest::new(HyperBody::from("foo"));
184        let (_, body) = hyper_body_request_to_body_request(req).into_parts();
185        assert!(matches!(body, Body::HyperBody { inner: _ }));
186        assert_eq!(
187            body.to_bytes_async().await.unwrap(),
188            Bytes::copy_from_slice(b"foo")
189        );
190    }
191
192    #[tokio::test]
193    async fn test_with_stream() {
194        //
195        let stream =
196            futures_util::stream::once(async { Ok(Bytes::copy_from_slice(b"foo")) }).boxed();
197        let body = Body::with_stream(stream);
198        assert!(matches!(body, Body::Stream { inner: _ }));
199        assert_eq!(
200            body.to_bytes_async().await.unwrap(),
201            Bytes::copy_from_slice(b"foo")
202        );
203
204        //
205        let stream =
206            futures_util::stream::once(async { Ok(Bytes::copy_from_slice(b"foo")) }).boxed();
207        let mut body = Body::with_stream(stream);
208        assert_eq!(
209            body.next().await.unwrap().unwrap(),
210            Bytes::copy_from_slice(b"foo")
211        );
212        assert!(body.next().await.is_none());
213
214        //
215        let stream =
216            futures_util::stream::once(async { Ok(Bytes::copy_from_slice(b"foo")) }).boxed();
217        let req = HyperRequest::new(stream);
218        let (_, body) = stream_request_to_body_request(req).into_parts();
219        assert!(matches!(body, Body::Stream { inner: _ }));
220        assert_eq!(
221            body.to_bytes_async().await.unwrap(),
222            Bytes::copy_from_slice(b"foo")
223        );
224    }
225
226    #[cfg(feature = "warp-request-body")]
227    #[tokio::test]
228    async fn test_from_warp_request_body() {
229        //
230        let hyper_body = HyperBody::from("foo");
231        let warp_body = warp_request_body::Body::with_hyper_body(hyper_body);
232        let body = Body::from_warp_request_body(warp_body);
233        assert!(matches!(body, Body::HyperBody { inner: _ }));
234        assert_eq!(
235            body.to_bytes_async().await.unwrap(),
236            Bytes::copy_from_slice(b"foo")
237        );
238
239        //
240        let warp_body = warp_request_body::Body::with_bytes(Bytes::copy_from_slice(b"foo"));
241        let body = Body::from_warp_request_body(warp_body);
242        assert!(matches!(body, Body::Stream { inner: _ }));
243        assert_eq!(
244            body.to_bytes_async().await.unwrap(),
245            Bytes::copy_from_slice(b"foo")
246        );
247
248        //
249        let stream =
250            futures_util::stream::once(async { Ok(Bytes::copy_from_slice(b"foo")) }).boxed();
251        let warp_body = warp_request_body::Body::with_stream(stream);
252        let body = Body::from_warp_request_body(warp_body);
253        assert!(matches!(body, Body::Stream { inner: _ }));
254        assert_eq!(
255            body.to_bytes_async().await.unwrap(),
256            Bytes::copy_from_slice(b"foo")
257        );
258    }
259
260    #[cfg(feature = "warp")]
261    #[tokio::test]
262    async fn test_from_warp_body_stream() {
263        //
264        let stream = warp::test::request()
265            .body("foo")
266            .filter(&warp::body::stream())
267            .await
268            .unwrap();
269        let body = Body::from_warp_body_stream(stream);
270        assert!(matches!(body, Body::Stream { inner: _ }));
271        assert_eq!(
272            body.to_bytes_async().await.unwrap(),
273            Bytes::copy_from_slice(b"foo")
274        );
275    }
276
277    pin_project! {
278        pub struct BodyWrapper {
279            #[pin]
280            inner: BoxStream<'static, Result<Bytes, Box<dyn std::error::Error + Send + Sync + 'static>>>
281        }
282    }
283    #[tokio::test]
284    async fn test_wrapper() {
285        //
286        let hyper_body = HyperBody::from("foo");
287        let body = Body::with_hyper_body(hyper_body);
288        let _ = BodyWrapper {
289            inner: body.err_into().boxed(),
290        };
291
292        //
293        let stream =
294            futures_util::stream::once(async { Ok(Bytes::copy_from_slice(b"foo")) }).boxed();
295        let body = Body::with_stream(stream);
296        let _ = BodyWrapper {
297            inner: body.err_into().boxed(),
298        };
299    }
300}