1mod sync_reader;
2use sync_reader::sync_reader_into_bytes;
3pub use sync_reader::BodySyncReader;
4
5mod async_reader;
6use async_reader::async_reader_into_bytes;
7pub use async_reader::BodyAsyncReader;
8
9mod async_bytes_streamer;
10use async_bytes_streamer::async_bytes_streamer_into_bytes;
11pub use async_bytes_streamer::BodyAsyncBytesStreamer;
12
13mod body_http;
14pub use body_http::BodyHttp;
15use body_http::HyperBodyAsAsyncBytesStream;
16
17use std::io::Read as SyncRead;
18use std::pin::Pin;
19use std::time::Duration;
20use std::{fmt, io, mem};
21
22use tokio::io::AsyncRead;
23use tokio::task;
24
25use futures_core::Stream as AsyncStream;
26
27use bytes::Bytes;
28use hyper::body::Incoming;
29
30type PinnedAsyncRead = Pin<Box<dyn AsyncRead + Send + Sync>>;
31type BoxedSyncRead = Box<dyn SyncRead + Send + Sync>;
32type PinnedAsyncBytesStream =
33 Pin<Box<dyn AsyncStream<Item = io::Result<Bytes>> + Send + Sync>>;
34
35enum Inner {
36 Empty,
37 Bytes(Bytes),
39 Hyper(Incoming),
40 SyncReader(BoxedSyncRead),
41 AsyncReader(PinnedAsyncRead),
42 AsyncBytesStreamer(PinnedAsyncBytesStream),
43}
44
45impl fmt::Debug for Inner {
46 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
47 match self {
48 Self::Empty => f.write_str("Empty"),
49 Self::Bytes(b) => f.debug_tuple("Bytes").field(&b.len()).finish(),
50 Self::Hyper(_) => f.write_str("Hyper"),
51 Self::SyncReader(_) => f.write_str("SyncReader"),
52 Self::AsyncReader(_) => f.write_str("AsyncReader"),
53 Self::AsyncBytesStreamer(_) => f.write_str("AsyncBytesStreamer"),
54 }
55 }
56}
57
58impl Default for Inner {
59 fn default() -> Self {
60 Self::Empty
61 }
62}
63
64#[derive(Debug, Clone, Default)]
65struct Constraints {
66 timeout: Option<Duration>,
67 size: Option<usize>,
68}
69
70#[derive(Debug, Default)]
71pub struct Body {
72 inner: Inner,
73 constraints: Constraints,
74}
75
76impl Body {
77 fn new_inner(inner: Inner) -> Self {
78 Self {
79 inner,
80 constraints: Constraints::default(),
81 }
82 }
83
84 pub fn new() -> Self {
86 Self::new_inner(Inner::Empty)
87 }
88
89 pub fn from_bytes(bytes: impl Into<Bytes>) -> Self {
91 let bytes = bytes.into();
92 if !bytes.is_empty() {
93 Self::new_inner(Inner::Bytes(bytes))
94 } else {
95 Self::new()
96 }
97 }
98
99 pub fn copy_from_slice(slice: impl AsRef<[u8]>) -> Self {
101 let slice = slice.as_ref();
102 if !slice.is_empty() {
103 Self::new_inner(Inner::Bytes(Bytes::copy_from_slice(slice)))
104 } else {
105 Self::new()
106 }
107 }
108
109 pub fn from_hyper(body: Incoming) -> Self {
111 Self::new_inner(Inner::Hyper(body))
112 }
113
114 pub fn from_sync_reader<R>(reader: R) -> Self
116 where
117 R: SyncRead + Send + Sync + 'static,
118 {
119 Self::new_inner(Inner::SyncReader(Box::new(reader)))
120 }
121
122 pub fn from_async_reader<R>(reader: R) -> Self
124 where
125 R: AsyncRead + Send + Sync + 'static,
126 {
127 Self::new_inner(Inner::AsyncReader(Box::pin(reader)))
128 }
129
130 pub fn from_async_bytes_streamer<S>(streamer: S) -> Self
133 where
134 S: AsyncStream<Item = io::Result<Bytes>> + Send + Sync + 'static,
135 {
136 Self::new_inner(Inner::AsyncBytesStreamer(Box::pin(streamer)))
137 }
138
139 #[cfg(feature = "json")]
141 #[cfg_attr(docsrs, doc(cfg(feature = "json")))]
142 pub fn serialize<S>(value: &S) -> Result<Self, serde_json::Error>
143 where
144 S: serde::Serialize + ?Sized,
145 {
146 serde_json::to_vec(value).map(|v| v.into())
147 }
148
149 pub fn is_empty(&self) -> bool {
152 matches!(self.inner, Inner::Empty)
155 }
156
157 pub fn len(&self) -> Option<usize> {
159 match &self.inner {
160 Inner::Empty => Some(0),
161 Inner::Bytes(b) => Some(b.len()),
162 _ => None,
163 }
164 }
165
166 pub fn set_size_limit(&mut self, size: Option<usize>) {
168 self.constraints.size = size;
169 }
170
171 pub fn set_timeout(&mut self, timeout: Option<Duration>) {
173 self.constraints.timeout = timeout;
174 }
175
176 pub fn take(&mut self) -> Self {
178 mem::take(self)
179 }
180
181 pub async fn into_bytes(self) -> io::Result<Bytes> {
183 match self.inner {
184 Inner::Empty => Ok(Bytes::new()),
185 Inner::Bytes(b) => {
186 if let Some(size_limit) = self.constraints.size {
187 if b.len() > size_limit {
188 return Err(size_limit_reached("Bytes to big"));
189 }
190 }
191 Ok(b)
192 }
193 Inner::Hyper(i) => {
194 async_bytes_streamer_into_bytes(
195 HyperBodyAsAsyncBytesStream::new(i),
196 self.constraints,
197 )
198 .await
199 }
200 Inner::SyncReader(r) => task::spawn_blocking(|| {
201 sync_reader_into_bytes(r, self.constraints)
202 })
203 .await
204 .map_err(join_error)?,
205 Inner::AsyncReader(r) => {
206 async_reader_into_bytes(r, self.constraints).await
207 }
208 Inner::AsyncBytesStreamer(s) => {
209 async_bytes_streamer_into_bytes(s, self.constraints).await
210 }
211 }
212 }
213
214 pub async fn into_string(self) -> io::Result<String> {
216 let bytes = self.into_bytes().await?;
217 String::from_utf8(bytes.into())
218 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
219 }
220
221 pub fn into_sync_reader(self) -> BodySyncReader {
223 BodySyncReader::new(self.inner, self.constraints)
224 }
225
226 pub fn into_async_reader(self) -> BodyAsyncReader {
228 BodyAsyncReader::new(self.inner, self.constraints)
229 }
230
231 pub fn into_async_bytes_streamer(self) -> BodyAsyncBytesStreamer {
234 BodyAsyncBytesStreamer::new(self.inner, self.constraints)
235 }
236
237 pub fn into_http_body(self) -> BodyHttp {
239 BodyHttp::new(self.inner, self.constraints)
240 }
241
242 #[cfg(feature = "json")]
244 #[cfg_attr(docsrs, doc(cfg(feature = "json")))]
245 pub async fn deserialize<D>(self) -> Result<D, serde_json::Error>
246 where
247 D: serde::de::DeserializeOwned + Send + 'static,
248 {
249 let reader = self.into_sync_reader();
250 if reader.needs_spawn_blocking() {
251 task::spawn_blocking(|| serde_json::from_reader(reader))
252 .await
253 .expect("deserializing panicked")
254 } else {
255 serde_json::from_reader(reader)
256 }
257 }
258}
259
260impl From<Bytes> for Body {
261 fn from(b: Bytes) -> Self {
262 Self::from_bytes(b)
263 }
264}
265
266impl From<Vec<u8>> for Body {
267 fn from(b: Vec<u8>) -> Self {
268 Self::from_bytes(b)
269 }
270}
271
272impl From<String> for Body {
273 fn from(s: String) -> Self {
274 Self::from_bytes(s)
275 }
276}
277
278impl From<&'static str> for Body {
279 fn from(s: &'static str) -> Self {
280 Self::from_bytes(Bytes::from_static(s.as_bytes()))
281 }
282}
283
284impl From<Incoming> for Body {
285 fn from(i: Incoming) -> Self {
286 Self::from_hyper(i)
287 }
288}
289
290fn size_limit_reached(msg: &'static str) -> io::Error {
291 io::Error::new(io::ErrorKind::UnexpectedEof, msg)
292}
293
294fn timed_out(msg: &'static str) -> io::Error {
295 io::Error::new(io::ErrorKind::TimedOut, msg)
296}
297
298fn join_error(error: task::JoinError) -> io::Error {
299 io::Error::new(io::ErrorKind::Other, error)
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305
306 fn is_unpin<T: Unpin>() {}
307 fn is_send<T: Send>() {}
308 fn is_sync<T: Sync>() {}
309
310 #[test]
311 fn test_body() {
312 is_unpin::<Body>();
313 is_send::<Body>();
314 is_sync::<Body>();
315 }
316}
317
318#[cfg(all(test, feature = "json"))]
319mod json_tests {
320 use super::*;
321 use serde::{Deserialize, Serialize};
322
323 #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
324 enum SomeEnum {
325 Abc(String),
326 }
327
328 #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
329 struct Struct1 {
330 some_data: String,
331 some_enum: SomeEnum,
332 }
333
334 #[tokio::test]
335 async fn test_serde() {
336 let s1 = Struct1 {
337 some_data: "test".into(),
338 some_enum: SomeEnum::Abc("test2".into()),
339 };
340
341 let body = Body::serialize(&s1).unwrap();
342 let v: Struct1 = body.deserialize().await.unwrap();
343
344 assert_eq!(s1, v);
345 }
346}