chuchi_core/body/
mod.rs

1mod sync_reader;
2use sync_reader::sync_reader_into_bytes;
3pub use sync_reader::BodySyncReader;
4
5mod async_reader;
6use async_reader::async_reader_into_bytes;
7pub use async_reader::BodyAsyncReader;
8
9mod async_bytes_streamer;
10use async_bytes_streamer::async_bytes_streamer_into_bytes;
11pub use async_bytes_streamer::BodyAsyncBytesStreamer;
12
13mod body_http;
14pub use body_http::BodyHttp;
15use body_http::HyperBodyAsAsyncBytesStream;
16
17use std::io::Read as SyncRead;
18use std::pin::Pin;
19use std::time::Duration;
20use std::{fmt, io, mem};
21
22use tokio::io::AsyncRead;
23use tokio::task;
24
25use futures_core::Stream as AsyncStream;
26
27use bytes::Bytes;
28use hyper::body::Incoming;
29
30type PinnedAsyncRead = Pin<Box<dyn AsyncRead + Send + Sync>>;
31type BoxedSyncRead = Box<dyn SyncRead + Send + Sync>;
32type PinnedAsyncBytesStream =
33	Pin<Box<dyn AsyncStream<Item = io::Result<Bytes>> + Send + Sync>>;
34
35enum Inner {
36	Empty,
37	// Bytes will never be empty
38	Bytes(Bytes),
39	Hyper(Incoming),
40	SyncReader(BoxedSyncRead),
41	AsyncReader(PinnedAsyncRead),
42	AsyncBytesStreamer(PinnedAsyncBytesStream),
43}
44
45impl fmt::Debug for Inner {
46	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
47		match self {
48			Self::Empty => f.write_str("Empty"),
49			Self::Bytes(b) => f.debug_tuple("Bytes").field(&b.len()).finish(),
50			Self::Hyper(_) => f.write_str("Hyper"),
51			Self::SyncReader(_) => f.write_str("SyncReader"),
52			Self::AsyncReader(_) => f.write_str("AsyncReader"),
53			Self::AsyncBytesStreamer(_) => f.write_str("AsyncBytesStreamer"),
54		}
55	}
56}
57
58impl Default for Inner {
59	fn default() -> Self {
60		Self::Empty
61	}
62}
63
64#[derive(Debug, Clone, Default)]
65struct Constraints {
66	timeout: Option<Duration>,
67	size: Option<usize>,
68}
69
70#[derive(Debug, Default)]
71pub struct Body {
72	inner: Inner,
73	constraints: Constraints,
74}
75
76impl Body {
77	fn new_inner(inner: Inner) -> Self {
78		Self {
79			inner,
80			constraints: Constraints::default(),
81		}
82	}
83
84	/// Creates a new empty `Body`.
85	pub fn new() -> Self {
86		Self::new_inner(Inner::Empty)
87	}
88
89	/// Creates a new `Body` from the given bytes.
90	pub fn from_bytes(bytes: impl Into<Bytes>) -> Self {
91		let bytes = bytes.into();
92		if !bytes.is_empty() {
93			Self::new_inner(Inner::Bytes(bytes))
94		} else {
95			Self::new()
96		}
97	}
98
99	/// Creates a new `Body` from the given bytes.
100	pub fn copy_from_slice(slice: impl AsRef<[u8]>) -> Self {
101		let slice = slice.as_ref();
102		if !slice.is_empty() {
103			Self::new_inner(Inner::Bytes(Bytes::copy_from_slice(slice)))
104		} else {
105			Self::new()
106		}
107	}
108
109	/// Creates a new Body from a `hyper::body::Incoming`.
110	pub fn from_hyper(body: Incoming) -> Self {
111		Self::new_inner(Inner::Hyper(body))
112	}
113
114	/// Creates a new Body from a `Read` implementation.
115	pub fn from_sync_reader<R>(reader: R) -> Self
116	where
117		R: SyncRead + Send + Sync + 'static,
118	{
119		Self::new_inner(Inner::SyncReader(Box::new(reader)))
120	}
121
122	/// Creates a new Body from an `AsyncRead` implementation.
123	pub fn from_async_reader<R>(reader: R) -> Self
124	where
125		R: AsyncRead + Send + Sync + 'static,
126	{
127		Self::new_inner(Inner::AsyncReader(Box::pin(reader)))
128	}
129
130	/// Creates a new Body from a `Stream<Item=io::Result<Bytes>>`
131	/// implementation.
132	pub fn from_async_bytes_streamer<S>(streamer: S) -> Self
133	where
134		S: AsyncStream<Item = io::Result<Bytes>> + Send + Sync + 'static,
135	{
136		Self::new_inner(Inner::AsyncBytesStreamer(Box::pin(streamer)))
137	}
138
139	/// Creates a new Body from a serializeable object.
140	#[cfg(feature = "json")]
141	#[cfg_attr(docsrs, doc(cfg(feature = "json")))]
142	pub fn serialize<S>(value: &S) -> Result<Self, serde_json::Error>
143	where
144		S: serde::Serialize + ?Sized,
145	{
146		serde_json::to_vec(value).map(|v| v.into())
147	}
148
149	/// Returns true if we know the body is empty, the body still might be empty
150	/// but we just don't know it yet
151	pub fn is_empty(&self) -> bool {
152		// we don't need to check the Inner::Bytes(b) since it will never
153		// be empty
154		matches!(self.inner, Inner::Empty)
155	}
156
157	/// Returns a length if it is already known.
158	pub fn len(&self) -> Option<usize> {
159		match &self.inner {
160			Inner::Empty => Some(0),
161			Inner::Bytes(b) => Some(b.len()),
162			_ => None,
163		}
164	}
165
166	/// Sets a read size limit.
167	pub fn set_size_limit(&mut self, size: Option<usize>) {
168		self.constraints.size = size;
169	}
170
171	/// Sets a read timeout, the timer starts counting after you call into_*
172	pub fn set_timeout(&mut self, timeout: Option<Duration>) {
173		self.constraints.timeout = timeout;
174	}
175
176	/// Takes the body and replaces it with an empty one.
177	pub fn take(&mut self) -> Self {
178		mem::take(self)
179	}
180
181	/// Converts the Body into Bytes.
182	pub async fn into_bytes(self) -> io::Result<Bytes> {
183		match self.inner {
184			Inner::Empty => Ok(Bytes::new()),
185			Inner::Bytes(b) => {
186				if let Some(size_limit) = self.constraints.size {
187					if b.len() > size_limit {
188						return Err(size_limit_reached("Bytes to big"));
189					}
190				}
191				Ok(b)
192			}
193			Inner::Hyper(i) => {
194				async_bytes_streamer_into_bytes(
195					HyperBodyAsAsyncBytesStream::new(i),
196					self.constraints,
197				)
198				.await
199			}
200			Inner::SyncReader(r) => task::spawn_blocking(|| {
201				sync_reader_into_bytes(r, self.constraints)
202			})
203			.await
204			.map_err(join_error)?,
205			Inner::AsyncReader(r) => {
206				async_reader_into_bytes(r, self.constraints).await
207			}
208			Inner::AsyncBytesStreamer(s) => {
209				async_bytes_streamer_into_bytes(s, self.constraints).await
210			}
211		}
212	}
213
214	/// Converts the Body into a string.
215	pub async fn into_string(self) -> io::Result<String> {
216		let bytes = self.into_bytes().await?;
217		String::from_utf8(bytes.into())
218			.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
219	}
220
221	/// Converts the Body into a type that implements `Read`.
222	pub fn into_sync_reader(self) -> BodySyncReader {
223		BodySyncReader::new(self.inner, self.constraints)
224	}
225
226	/// Converts the Body into a type that implements `AsyncRead`.
227	pub fn into_async_reader(self) -> BodyAsyncReader {
228		BodyAsyncReader::new(self.inner, self.constraints)
229	}
230
231	/// Converts the Body into a type that implements
232	/// `Stream<Item=io::Result<Bytes>>`.
233	pub fn into_async_bytes_streamer(self) -> BodyAsyncBytesStreamer {
234		BodyAsyncBytesStreamer::new(self.inner, self.constraints)
235	}
236
237	/// Converts the Body into a type that implements `hyper::body::Body`.
238	pub fn into_http_body(self) -> BodyHttp {
239		BodyHttp::new(self.inner, self.constraints)
240	}
241
242	/// Converts the Body into a deserializeable type.
243	#[cfg(feature = "json")]
244	#[cfg_attr(docsrs, doc(cfg(feature = "json")))]
245	pub async fn deserialize<D>(self) -> Result<D, serde_json::Error>
246	where
247		D: serde::de::DeserializeOwned + Send + 'static,
248	{
249		let reader = self.into_sync_reader();
250		if reader.needs_spawn_blocking() {
251			task::spawn_blocking(|| serde_json::from_reader(reader))
252				.await
253				.expect("deserializing panicked")
254		} else {
255			serde_json::from_reader(reader)
256		}
257	}
258}
259
260impl From<Bytes> for Body {
261	fn from(b: Bytes) -> Self {
262		Self::from_bytes(b)
263	}
264}
265
266impl From<Vec<u8>> for Body {
267	fn from(b: Vec<u8>) -> Self {
268		Self::from_bytes(b)
269	}
270}
271
272impl From<String> for Body {
273	fn from(s: String) -> Self {
274		Self::from_bytes(s)
275	}
276}
277
278impl From<&'static str> for Body {
279	fn from(s: &'static str) -> Self {
280		Self::from_bytes(Bytes::from_static(s.as_bytes()))
281	}
282}
283
284impl From<Incoming> for Body {
285	fn from(i: Incoming) -> Self {
286		Self::from_hyper(i)
287	}
288}
289
290fn size_limit_reached(msg: &'static str) -> io::Error {
291	io::Error::new(io::ErrorKind::UnexpectedEof, msg)
292}
293
294fn timed_out(msg: &'static str) -> io::Error {
295	io::Error::new(io::ErrorKind::TimedOut, msg)
296}
297
298fn join_error(error: task::JoinError) -> io::Error {
299	io::Error::new(io::ErrorKind::Other, error)
300}
301
302#[cfg(test)]
303mod tests {
304	use super::*;
305
306	fn is_unpin<T: Unpin>() {}
307	fn is_send<T: Send>() {}
308	fn is_sync<T: Sync>() {}
309
310	#[test]
311	fn test_body() {
312		is_unpin::<Body>();
313		is_send::<Body>();
314		is_sync::<Body>();
315	}
316}
317
318#[cfg(all(test, feature = "json"))]
319mod json_tests {
320	use super::*;
321	use serde::{Deserialize, Serialize};
322
323	#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
324	enum SomeEnum {
325		Abc(String),
326	}
327
328	#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
329	struct Struct1 {
330		some_data: String,
331		some_enum: SomeEnum,
332	}
333
334	#[tokio::test]
335	async fn test_serde() {
336		let s1 = Struct1 {
337			some_data: "test".into(),
338			some_enum: SomeEnum::Abc("test2".into()),
339		};
340
341		let body = Body::serialize(&s1).unwrap();
342		let v: Struct1 = body.deserialize().await.unwrap();
343
344		assert_eq!(s1, v);
345	}
346}