1#[cfg(feature = "hyper-request-body")]
2pub use hyper_request_body::{self, Body as HyperRequestBody};
3#[cfg(feature = "warp-request-body")]
4pub use warp_request_body::{self, Body as WarpRequestBody};
5
6use core::{
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11use bytes::Bytes;
12use futures_util::Stream;
13use pin_project_lite::pin_project;
14
15pub mod error;
16mod utils;
17
18use error::Error;
19
20pin_project! {
22 #[project = BodyProj]
23 pub enum Body {
24 Bytes { inner: Bytes },
25 Stream { #[pin] inner: Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send + 'static>> },
26 }
27}
28
29impl core::fmt::Debug for Body {
30 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
31 match self {
32 Self::Bytes { inner } => f.debug_tuple("Bytes").field(&inner).finish(),
33 Self::Stream { inner: _ } => write!(f, "Stream"),
34 }
35 }
36}
37
38impl core::fmt::Display for Body {
39 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
40 write!(f, "{self:?}")
41 }
42}
43
44impl Default for Body {
45 fn default() -> Self {
46 Self::Bytes {
47 inner: Bytes::default(),
48 }
49 }
50}
51
52impl Body {
54 pub fn with_bytes_from<T>(v: T) -> Self
55 where
56 T: Into<Bytes>,
57 {
58 Self::Bytes { inner: v.into() }
59 }
60
61 pub fn with_bytes(bytes: Bytes) -> Self {
62 Self::Bytes { inner: bytes }
63 }
64
65 pub fn with_stream(stream: impl Stream<Item = Result<Bytes, Error>> + Send + 'static) -> Self {
66 Self::Stream {
67 inner: Box::pin(stream),
68 }
69 }
70
71 #[cfg(feature = "hyper-request-body")]
72 pub fn from_hyper_body(body: hyper_request_body::HyperBody) -> Self {
73 use futures_util::TryStreamExt as _;
74
75 Self::with_stream(body.map_err(|err| Error::Other(err.into())))
76 }
77
78 #[cfg(feature = "hyper-request-body")]
79 pub fn from_hyper_request_body(body: hyper_request_body::Body) -> Self {
80 use futures_util::TryStreamExt as _;
81 use hyper_request_body::error::Error as HyperRequestBodyError;
82
83 match body {
84 hyper_request_body::Body::HyperBody { inner } => {
85 Self::with_stream(inner.map_err(|err| Error::Other(err.into())))
86 }
87 body => Self::with_stream(body.map_err(|err| match err {
88 HyperRequestBodyError::HyperError(err) => Error::Other(err.into()),
89 HyperRequestBodyError::Other(err) => Error::Other(err),
90 })),
91 }
92 }
93
94 #[cfg(feature = "warp-request-body")]
95 pub fn from_warp_request_body(body: warp_request_body::Body) -> Self {
96 use futures_util::TryStreamExt as _;
97 use warp_request_body::error::Error as WarpRequestBodyError;
98
99 match body {
100 warp_request_body::Body::HyperBody { inner } => {
101 Self::with_stream(inner.map_err(|err| Error::Other(err.into())))
102 }
103 body => Self::with_stream(body.map_err(|err| match err {
104 WarpRequestBodyError::HyperError(err) => Error::Other(err.into()),
105 WarpRequestBodyError::WarpError(err) => Error::Other(err.into()),
106 })),
107 }
108 }
109}
110
111#[cfg(feature = "hyper-request-body")]
112impl From<hyper_request_body::HyperBody> for Body {
113 fn from(body: hyper_request_body::HyperBody) -> Self {
114 Self::from_hyper_body(body)
115 }
116}
117
118#[cfg(feature = "hyper-request-body")]
119impl From<hyper_request_body::Body> for Body {
120 fn from(body: hyper_request_body::Body) -> Self {
121 Self::from_hyper_request_body(body)
122 }
123}
124
125#[cfg(feature = "warp-request-body")]
126impl From<warp_request_body::Body> for Body {
127 fn from(body: warp_request_body::Body) -> Self {
128 Self::from_warp_request_body(body)
129 }
130}
131
132impl Body {
133 pub fn require_to_bytes_async(&self) -> bool {
134 matches!(self, Self::Stream { inner: _ })
135 }
136
137 pub fn to_bytes(self) -> Bytes {
138 match self {
139 Self::Bytes { inner } => inner,
140 Self::Stream { inner: _ } => panic!("Please call require_to_bytes_async first"),
141 }
142 }
143
144 pub async fn to_bytes_async(self) -> Result<Bytes, Error> {
145 match self {
146 Self::Bytes { inner } => Ok(inner),
147 Self::Stream { inner } => utils::bytes_stream_to_bytes(inner)
148 .await
149 .map_err(Into::into),
150 }
151 }
152}
153
154impl Stream for Body {
156 type Item = Result<Bytes, Error>;
157
158 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
159 match self.project() {
160 BodyProj::Bytes { inner } => {
161 if !inner.is_empty() {
162 let bytes = inner.clone();
163 inner.clear();
164 Poll::Ready(Some(Ok(bytes)))
165 } else {
166 Poll::Ready(None)
167 }
168 }
169 BodyProj::Stream { inner } => inner.poll_next(cx).map_err(Into::into),
170 }
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use futures_util::StreamExt as _;
177
178 use super::*;
179
180 #[tokio::test]
181 async fn test_with_bytes() {
182 let bytes = Bytes::from_static(b"foo");
184 let body = Body::with_bytes(bytes);
185 assert!(matches!(body, Body::Bytes { inner: _ }));
186 assert!(!body.require_to_bytes_async());
187 assert_eq!(body.to_bytes(), Bytes::copy_from_slice(b"foo"));
188 }
189
190 #[tokio::test]
191 async fn test_with_stream() {
192 let stream =
194 futures_util::stream::once(async { Ok(Bytes::copy_from_slice(b"foo")) }).boxed();
195 let body = Body::with_stream(stream);
196 assert!(matches!(body, Body::Stream { inner: _ }));
197 assert!(body.require_to_bytes_async());
198 assert_eq!(
199 body.to_bytes_async().await.unwrap(),
200 Bytes::copy_from_slice(b"foo")
201 );
202 }
203
204 #[cfg(feature = "hyper-request-body")]
205 #[tokio::test]
206 async fn test_from_hyper_body() {
207 let body = hyper_request_body::HyperBody::from("foo");
209 let body = Body::from_hyper_body(body);
210 assert!(matches!(body, Body::Stream { inner: _ }));
211 assert!(body.require_to_bytes_async());
212 assert_eq!(
213 body.to_bytes_async().await.unwrap(),
214 Bytes::copy_from_slice(b"foo")
215 );
216 }
217
218 #[cfg(feature = "hyper-request-body")]
219 #[tokio::test]
220 async fn test_from_hyper_request_body() {
221 let body = hyper_request_body::Body::HyperBody {
223 inner: hyper_request_body::HyperBody::from("foo"),
224 };
225 let body = Body::from_hyper_request_body(body);
226 assert!(matches!(body, Body::Stream { inner: _ }));
227 assert!(body.require_to_bytes_async());
228 assert_eq!(
229 body.to_bytes_async().await.unwrap(),
230 Bytes::copy_from_slice(b"foo")
231 );
232 }
233
234 #[cfg(feature = "warp-request-body")]
235 #[tokio::test]
236 async fn test_from_warp_request_body() {
237 let body = warp_request_body::Body::Bytes {
239 inner: Bytes::from_static(b"foo"),
240 };
241 let body = Body::from_warp_request_body(body);
242 assert!(matches!(body, Body::Stream { inner: _ }));
243 assert!(body.require_to_bytes_async());
244 assert_eq!(
245 body.to_bytes_async().await.unwrap(),
246 Bytes::copy_from_slice(b"foo")
247 );
248 }
249}