hyper_request_body/
lib.rs1pub use hyper::Body as HyperBody;
2#[cfg(feature = "warp-request-body")]
3pub use warp_request_body;
4
5use core::{
6 fmt,
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11use bytes::Bytes;
12use futures_util::Stream;
13use hyper::Request as HyperRequest;
14use pin_project_lite::pin_project;
15
16pub mod error;
17mod utils;
18
19use error::Error;
20
21pin_project! {
23 #[project = BodyProj]
24 pub enum Body {
25 HyperBody { #[pin] inner: HyperBody },
26 Stream { #[pin] inner: Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send + 'static>> },
27 }
28}
29
30impl fmt::Debug for Body {
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 match self {
33 Self::HyperBody { inner: _ } => write!(f, "HyperBody"),
34 Self::Stream { inner: _ } => write!(f, "Stream"),
35 }
36 }
37}
38
39impl fmt::Display for Body {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 write!(f, "{self:?}")
42 }
43}
44
45impl Default for Body {
46 fn default() -> Self {
47 Self::HyperBody {
48 inner: HyperBody::default(),
49 }
50 }
51}
52
53impl Body {
55 pub fn with_hyper_body(hyper_body: HyperBody) -> Self {
56 Self::HyperBody { inner: hyper_body }
57 }
58
59 pub fn with_stream(stream: impl Stream<Item = Result<Bytes, Error>> + Send + 'static) -> Self {
60 Self::Stream {
61 inner: Box::pin(stream),
62 }
63 }
64
65 #[cfg(feature = "warp-request-body")]
66 pub fn from_warp_request_body(body: warp_request_body::Body) -> Self {
67 use futures_util::TryStreamExt as _;
68 use warp_request_body::error::Error as WarpRequestBodyError;
69
70 match body {
71 warp_request_body::Body::HyperBody { inner } => Self::with_hyper_body(inner),
72 body => Self::with_stream(body.map_err(|err| match err {
73 WarpRequestBodyError::HyperError(err) => Error::HyperError(err),
74 WarpRequestBodyError::WarpError(err) => Error::Other(err.into()),
75 })),
76 }
77 }
78
79 #[cfg(feature = "warp")]
80 pub fn from_warp_body_stream(
82 stream: impl Stream<Item = Result<impl warp::Buf + 'static, warp::Error>> + Send + 'static,
83 ) -> Self {
84 use futures_util::TryStreamExt as _;
85
86 fn buf_to_bytes(mut buf: impl warp::Buf) -> Bytes {
88 let mut bytes_mut = bytes::BytesMut::new();
89 while buf.has_remaining() {
90 bytes_mut.extend_from_slice(buf.chunk());
91 let cnt = buf.chunk().len();
92 buf.advance(cnt);
93 }
94 bytes_mut.freeze()
95 }
96
97 Self::with_stream(
98 stream
99 .map_ok(buf_to_bytes)
100 .map_err(|err| Error::Other(err.into())),
101 )
102 }
103}
104
105impl From<HyperBody> for Body {
106 fn from(body: HyperBody) -> Self {
107 Self::with_hyper_body(body)
108 }
109}
110
111#[cfg(feature = "warp-request-body")]
112impl From<warp_request_body::Body> for Body {
113 fn from(body: warp_request_body::Body) -> Self {
114 Self::from_warp_request_body(body)
115 }
116}
117
118impl Body {
119 pub async fn to_bytes_async(self) -> Result<Bytes, Error> {
120 match self {
121 Self::HyperBody { inner } => {
122 utils::hyper_body_to_bytes(inner).await.map_err(Into::into)
123 }
124 Self::Stream { inner } => utils::bytes_stream_to_bytes(inner)
125 .await
126 .map_err(Into::into),
127 }
128 }
129}
130
131impl Stream for Body {
133 type Item = Result<Bytes, Error>;
134
135 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
136 match self.project() {
137 BodyProj::HyperBody { inner } => inner.poll_next(cx).map_err(Into::into),
138 BodyProj::Stream { inner } => inner.poll_next(cx).map_err(Into::into),
139 }
140 }
141}
142
143pub fn hyper_body_request_to_body_request(req: HyperRequest<HyperBody>) -> HyperRequest<Body> {
145 let (parts, body) = req.into_parts();
146 HyperRequest::from_parts(parts, Body::with_hyper_body(body))
147}
148
149pub fn stream_request_to_body_request(
150 req: HyperRequest<impl Stream<Item = Result<Bytes, Error>> + Send + 'static>,
151) -> HyperRequest<Body> {
152 let (parts, body) = req.into_parts();
153 HyperRequest::from_parts(parts, Body::with_stream(body))
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159
160 use futures_util::{stream::BoxStream, StreamExt as _, TryStreamExt};
161
162 #[tokio::test]
163 async fn test_with_hyper_body() {
164 let hyper_body = HyperBody::from("foo");
166 let body = Body::with_hyper_body(hyper_body);
167 assert!(matches!(body, Body::HyperBody { inner: _ }));
168 assert_eq!(
169 body.to_bytes_async().await.unwrap(),
170 Bytes::copy_from_slice(b"foo")
171 );
172
173 let hyper_body = HyperBody::from("foo");
175 let mut body = Body::with_hyper_body(hyper_body);
176 assert_eq!(
177 body.next().await.unwrap().unwrap(),
178 Bytes::copy_from_slice(b"foo")
179 );
180 assert!(body.next().await.is_none());
181
182 let req = HyperRequest::new(HyperBody::from("foo"));
184 let (_, body) = hyper_body_request_to_body_request(req).into_parts();
185 assert!(matches!(body, Body::HyperBody { inner: _ }));
186 assert_eq!(
187 body.to_bytes_async().await.unwrap(),
188 Bytes::copy_from_slice(b"foo")
189 );
190 }
191
192 #[tokio::test]
193 async fn test_with_stream() {
194 let stream =
196 futures_util::stream::once(async { Ok(Bytes::copy_from_slice(b"foo")) }).boxed();
197 let body = Body::with_stream(stream);
198 assert!(matches!(body, Body::Stream { inner: _ }));
199 assert_eq!(
200 body.to_bytes_async().await.unwrap(),
201 Bytes::copy_from_slice(b"foo")
202 );
203
204 let stream =
206 futures_util::stream::once(async { Ok(Bytes::copy_from_slice(b"foo")) }).boxed();
207 let mut body = Body::with_stream(stream);
208 assert_eq!(
209 body.next().await.unwrap().unwrap(),
210 Bytes::copy_from_slice(b"foo")
211 );
212 assert!(body.next().await.is_none());
213
214 let stream =
216 futures_util::stream::once(async { Ok(Bytes::copy_from_slice(b"foo")) }).boxed();
217 let req = HyperRequest::new(stream);
218 let (_, body) = stream_request_to_body_request(req).into_parts();
219 assert!(matches!(body, Body::Stream { inner: _ }));
220 assert_eq!(
221 body.to_bytes_async().await.unwrap(),
222 Bytes::copy_from_slice(b"foo")
223 );
224 }
225
226 #[cfg(feature = "warp-request-body")]
227 #[tokio::test]
228 async fn test_from_warp_request_body() {
229 let hyper_body = HyperBody::from("foo");
231 let warp_body = warp_request_body::Body::with_hyper_body(hyper_body);
232 let body = Body::from_warp_request_body(warp_body);
233 assert!(matches!(body, Body::HyperBody { inner: _ }));
234 assert_eq!(
235 body.to_bytes_async().await.unwrap(),
236 Bytes::copy_from_slice(b"foo")
237 );
238
239 let warp_body = warp_request_body::Body::with_bytes(Bytes::copy_from_slice(b"foo"));
241 let body = Body::from_warp_request_body(warp_body);
242 assert!(matches!(body, Body::Stream { inner: _ }));
243 assert_eq!(
244 body.to_bytes_async().await.unwrap(),
245 Bytes::copy_from_slice(b"foo")
246 );
247
248 let stream =
250 futures_util::stream::once(async { Ok(Bytes::copy_from_slice(b"foo")) }).boxed();
251 let warp_body = warp_request_body::Body::with_stream(stream);
252 let body = Body::from_warp_request_body(warp_body);
253 assert!(matches!(body, Body::Stream { inner: _ }));
254 assert_eq!(
255 body.to_bytes_async().await.unwrap(),
256 Bytes::copy_from_slice(b"foo")
257 );
258 }
259
260 #[cfg(feature = "warp")]
261 #[tokio::test]
262 async fn test_from_warp_body_stream() {
263 let stream = warp::test::request()
265 .body("foo")
266 .filter(&warp::body::stream())
267 .await
268 .unwrap();
269 let body = Body::from_warp_body_stream(stream);
270 assert!(matches!(body, Body::Stream { inner: _ }));
271 assert_eq!(
272 body.to_bytes_async().await.unwrap(),
273 Bytes::copy_from_slice(b"foo")
274 );
275 }
276
277 pin_project! {
278 pub struct BodyWrapper {
279 #[pin]
280 inner: BoxStream<'static, Result<Bytes, Box<dyn std::error::Error + Send + Sync + 'static>>>
281 }
282 }
283 #[tokio::test]
284 async fn test_wrapper() {
285 let hyper_body = HyperBody::from("foo");
287 let body = Body::with_hyper_body(hyper_body);
288 let _ = BodyWrapper {
289 inner: body.err_into().boxed(),
290 };
291
292 let stream =
294 futures_util::stream::once(async { Ok(Bytes::copy_from_slice(b"foo")) }).boxed();
295 let body = Body::with_stream(stream);
296 let _ = BodyWrapper {
297 inner: body.err_into().boxed(),
298 };
299 }
300}