1use std::{
6 convert::Infallible,
7 error::Error,
8 fmt,
9 future::Future,
10 pin::Pin,
11 task::{Context, Poll},
12};
13
14use bytes::Bytes;
15use faststr::FastStr;
16use futures_util::stream::Stream;
17use http_body::{Frame, SizeHint};
18use http_body_util::{BodyExt, Full, StreamBody, combinators::BoxBody};
19use hyper::body::Incoming;
20use linkedbytes::{LinkedBytes, Node};
21use pin_project::pin_project;
22#[cfg(feature = "json")]
23use serde::de::DeserializeOwned;
24
25use crate::error::BoxError;
26
27type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'a>>;
29
30#[pin_project]
32pub struct Body {
33 #[pin]
34 repr: BodyRepr,
35}
36
37#[pin_project(project = BodyProj)]
38enum BodyRepr {
39 Full(#[pin] Full<Bytes>),
41 Hyper(#[pin] Incoming),
46 Stream(#[pin] StreamBody<BoxStream<'static, Result<Frame<Bytes>, BoxError>>>),
48 Body(#[pin] BoxBody<Bytes, BoxError>),
50}
51
52impl Default for Body {
53 fn default() -> Self {
54 Body::empty()
55 }
56}
57
58impl Body {
59 pub fn empty() -> Self {
61 Self {
62 repr: BodyRepr::Full(Full::new(Bytes::new())),
63 }
64 }
65
66 pub fn from_incoming(incoming: Incoming) -> Self {
71 Self {
72 repr: BodyRepr::Hyper(incoming),
73 }
74 }
75
76 pub fn from_stream<S>(stream: S) -> Self
78 where
79 S: Stream<Item = Result<Frame<Bytes>, BoxError>> + Send + Sync + 'static,
80 {
81 Self {
82 repr: BodyRepr::Stream(StreamBody::new(Box::pin(stream))),
83 }
84 }
85
86 pub fn from_body<B>(body: B) -> Self
88 where
89 B: http_body::Body<Data = Bytes> + Send + Sync + 'static,
90 B::Error: Into<BoxError>,
91 {
92 Self {
93 repr: BodyRepr::Body(BoxBody::new(body.map_err(Into::into))),
94 }
95 }
96}
97
98impl http_body::Body for Body {
99 type Data = Bytes;
100 type Error = BoxError;
101
102 fn poll_frame(
103 self: Pin<&mut Self>,
104 cx: &mut Context<'_>,
105 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
106 match self.project().repr.project() {
107 BodyProj::Full(full) => http_body::Body::poll_frame(full, cx).map_err(BoxError::from),
108 BodyProj::Hyper(incoming) => {
109 http_body::Body::poll_frame(incoming, cx).map_err(BoxError::from)
110 }
111 BodyProj::Stream(stream) => http_body::Body::poll_frame(stream, cx),
112 BodyProj::Body(body) => http_body::Body::poll_frame(body, cx),
113 }
114 }
115
116 fn is_end_stream(&self) -> bool {
117 match &self.repr {
118 BodyRepr::Full(full) => http_body::Body::is_end_stream(full),
119 BodyRepr::Hyper(incoming) => http_body::Body::is_end_stream(incoming),
120 BodyRepr::Stream(stream) => http_body::Body::is_end_stream(stream),
121 BodyRepr::Body(body) => http_body::Body::is_end_stream(body),
122 }
123 }
124
125 fn size_hint(&self) -> SizeHint {
126 match &self.repr {
127 BodyRepr::Full(full) => http_body::Body::size_hint(full),
128 BodyRepr::Hyper(incoming) => http_body::Body::size_hint(incoming),
129 BodyRepr::Stream(stream) => http_body::Body::size_hint(stream),
130 BodyRepr::Body(body) => http_body::Body::size_hint(body),
131 }
132 }
133}
134
135impl fmt::Debug for Body {
136 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137 match &self.repr {
138 BodyRepr::Full(_) => f.write_str("Body::Full"),
139 BodyRepr::Hyper(_) => f.write_str("Body::Hyper"),
140 BodyRepr::Stream(_) => f.write_str("Body::Stream"),
141 BodyRepr::Body(_) => f.write_str("Body::Body"),
142 }
143 }
144}
145
146mod sealed {
147 pub trait SealedBody
148 where
149 Self: http_body::Body + Sized + Send,
150 Self::Data: Send,
151 {
152 }
153
154 impl<T> SealedBody for T
155 where
156 T: http_body::Body + Send,
157 T::Data: Send,
158 {
159 }
160}
161
162pub trait BodyConversion: sealed::SealedBody
164where
165 <Self as http_body::Body>::Data: Send,
166{
167 fn into_bytes(self) -> impl Future<Output = Result<Bytes, BodyConvertError>> + Send {
169 async {
170 Ok(self
171 .collect()
172 .await
173 .map_err(|_| BodyConvertError::BodyCollectionError)?
174 .to_bytes())
175 }
176 }
177
178 fn into_vec(self) -> impl Future<Output = Result<Vec<u8>, BodyConvertError>> + Send {
180 async { Ok(self.into_bytes().await?.into()) }
181 }
182
183 fn into_string(self) -> impl Future<Output = Result<String, BodyConvertError>> + Send {
185 async {
186 let vec = self.into_vec().await?;
187
188 let _ =
190 simdutf8::basic::from_utf8(&vec).map_err(|_| BodyConvertError::StringUtf8Error)?;
191 Ok(unsafe { String::from_utf8_unchecked(vec) })
192 }
193 }
194
195 unsafe fn into_string_unchecked(
202 self,
203 ) -> impl Future<Output = Result<String, BodyConvertError>> + Send {
204 async {
205 let vec = self.into_vec().await?;
206
207 Ok(unsafe { String::from_utf8_unchecked(vec) })
208 }
209 }
210
211 fn into_faststr(self) -> impl Future<Output = Result<FastStr, BodyConvertError>> + Send {
213 async {
214 let bytes = self.into_bytes().await?;
215
216 let _ = simdutf8::basic::from_utf8(&bytes)
218 .map_err(|_| BodyConvertError::StringUtf8Error)?;
219 Ok(unsafe { FastStr::from_bytes_unchecked(bytes) })
220 }
221 }
222
223 unsafe fn into_faststr_unchecked(
230 self,
231 ) -> impl Future<Output = Result<FastStr, BodyConvertError>> + Send {
232 async {
233 let bytes = self.into_bytes().await?;
234
235 Ok(unsafe { FastStr::from_bytes_unchecked(bytes) })
236 }
237 }
238
239 #[cfg(feature = "json")]
241 fn into_json<T>(self) -> impl Future<Output = Result<T, BodyConvertError>> + Send
242 where
243 T: DeserializeOwned,
244 {
245 async {
246 let bytes = self.into_bytes().await?;
247 crate::utils::json::deserialize(&bytes).map_err(BodyConvertError::JsonDeserializeError)
248 }
249 }
250}
251
252impl<T> BodyConversion for T
253where
254 T: sealed::SealedBody,
255 <T as http_body::Body>::Data: Send,
256{
257}
258
259#[derive(Debug)]
261pub enum BodyConvertError {
262 BodyCollectionError,
264 StringUtf8Error,
266 #[cfg(feature = "json")]
268 JsonDeserializeError(crate::utils::json::Error),
269}
270
271impl fmt::Display for BodyConvertError {
272 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273 match self {
274 Self::BodyCollectionError => f.write_str("failed to collect body"),
275 Self::StringUtf8Error => f.write_str("body is not a valid string"),
276 #[cfg(feature = "json")]
277 Self::JsonDeserializeError(e) => write!(f, "failed to deserialize body: {e}"),
278 }
279 }
280}
281
282impl Error for BodyConvertError {
283 fn source(&self) -> Option<&(dyn Error + 'static)> {
284 match self {
285 #[cfg(feature = "json")]
286 Self::JsonDeserializeError(e) => Some(e),
287 _ => None,
288 }
289 }
290}
291
292impl From<()> for Body {
293 fn from(_: ()) -> Self {
294 Self::empty()
295 }
296}
297
298impl From<&'static str> for Body {
299 fn from(value: &'static str) -> Self {
300 Self {
301 repr: BodyRepr::Full(Full::new(Bytes::from_static(value.as_bytes()))),
302 }
303 }
304}
305
306impl From<Vec<u8>> for Body {
307 fn from(value: Vec<u8>) -> Self {
308 Self {
309 repr: BodyRepr::Full(Full::new(Bytes::from(value))),
310 }
311 }
312}
313
314impl From<Bytes> for Body {
315 fn from(value: Bytes) -> Self {
316 Self {
317 repr: BodyRepr::Full(Full::new(value)),
318 }
319 }
320}
321
322impl From<FastStr> for Body {
323 fn from(value: FastStr) -> Self {
324 Self {
325 repr: BodyRepr::Full(Full::new(value.into_bytes())),
326 }
327 }
328}
329
330impl From<String> for Body {
331 fn from(value: String) -> Self {
332 Self {
333 repr: BodyRepr::Full(Full::new(Bytes::from(value))),
334 }
335 }
336}
337
338struct LinkedBytesBody<I> {
339 inner: I,
340}
341
342impl<I> http_body::Body for LinkedBytesBody<I>
343where
344 I: Iterator<Item = Node> + Unpin,
345{
346 type Data = Bytes;
347 type Error = Infallible;
348
349 fn poll_frame(
350 self: Pin<&mut Self>,
351 _: &mut Context<'_>,
352 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
353 let this = self.get_mut();
354 let Some(node) = this.inner.next() else {
355 return Poll::Ready(None);
356 };
357 let bytes = match node {
358 Node::Bytes(bytes) => bytes,
359 Node::BytesMut(bytesmut) => bytesmut.freeze(),
360 Node::FastStr(faststr) => faststr.into_bytes(),
361 };
362 Poll::Ready(Some(Ok(Frame::data(bytes))))
363 }
364
365 fn is_end_stream(&self) -> bool {
366 false
367 }
368
369 fn size_hint(&self) -> SizeHint {
370 let (lower, upper) = self.inner.size_hint();
371 let mut size_hint = SizeHint::new();
372 size_hint.set_lower(lower as u64);
373 if let Some(upper) = upper {
374 size_hint.set_upper(upper as u64);
375 }
376 size_hint
377 }
378}
379
380impl From<LinkedBytes> for Body {
381 fn from(value: LinkedBytes) -> Self {
382 Body::from_body(LinkedBytesBody {
383 inner: value.into_iter_list(),
384 })
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use bytes::Bytes;
391 use faststr::FastStr;
392 use linkedbytes::LinkedBytes;
393
394 use super::Body;
395 use crate::body::BodyConversion;
396
397 #[tokio::test]
398 async fn test_from_linked_bytes() {
399 let mut bytes = LinkedBytes::new();
400 bytes.insert(Bytes::from_static(b"Hello, "));
401 bytes.insert_faststr(FastStr::new("world!"));
402 let body = Body::from(bytes);
403 assert_eq!(body.into_string().await.unwrap(), "Hello, world!");
404 }
405}