1use crate::error::{BoxError, Error};
2
3use bytes::Bytes;
4use futures_util::{Stream, TryStream};
5use http_body::{Body as _, Frame};
6use http_body_util::{combinators::UnsyncBoxBody, BodyExt, Empty, Full};
7use pin_project_lite::pin_project;
8use std::{
9 any::Any,
10 pin::Pin,
11 task::{Context, Poll},
12};
13use sync_wrapper::SyncWrapper;
14
15const BODY_LIMITED: usize = 1024 * 1204 * 2;
16
17#[derive(Debug, Default, Clone, Copy)]
18pub struct BodyLimitedSize(pub usize);
19
20impl BodyLimitedSize {
21 pub fn new() -> Self {
22 Self(BODY_LIMITED)
23 }
24}
25
26fn boxed<B>(body: B) -> UnsyncBoxBody<Bytes, Error>
27where
28 B: http_body::Body<Data = Bytes> + Send + 'static,
29 B::Error: Into<BoxError>,
30{
31 try_downcast(body).unwrap_or_else(|body| body.map_err(Error::new).boxed_unsync())
32}
33
34pub(crate) fn try_downcast<T, K>(k: K) -> Result<T, K>
35where
36 T: 'static,
37 K: Send + 'static,
38{
39 let mut k = Some(k);
40 if let Some(k) = <dyn Any>::downcast_mut::<Option<T>>(&mut k) {
41 Ok(k.take().unwrap())
42 } else {
43 Err(k.unwrap())
44 }
45}
46
47pub struct Body(UnsyncBoxBody<Bytes, Error>);
48
49impl Body {
50 pub fn new<B>(body: B) -> Self
51 where
52 B: http_body::Body<Data = Bytes> + Send + 'static,
53 B::Error: Into<BoxError>,
54 {
55 try_downcast(body).unwrap_or_else(|body| Self(boxed(body)))
56 }
57
58 pub fn empty() -> Self {
59 Self::new(Empty::new())
60 }
61
62 pub fn from_stream<S>(stream: S) -> Self
63 where
64 S: TryStream + Send + 'static,
65 S::Ok: Into<Bytes>,
66 S::Error: Into<BoxError>,
67 {
68 Body::new(StreamBody {
69 stream: SyncWrapper::new(stream),
70 })
71 }
72}
73
74impl http_body::Body for Body {
75 type Data = Bytes;
76 type Error = Error;
77
78 #[inline]
79 fn poll_frame(
80 mut self: Pin<&mut Self>,
81 cx: &mut Context<'_>,
82 ) -> std::task::Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
83 Pin::new(&mut self.0).poll_frame(cx)
84 }
85
86 #[inline]
87 fn size_hint(&self) -> http_body::SizeHint {
88 self.0.size_hint()
89 }
90
91 #[inline]
92 fn is_end_stream(&self) -> bool {
93 self.0.is_end_stream()
94 }
95}
96
97impl Stream for Body {
98 type Item = Result<Bytes, Error>;
99
100 #[inline]
101 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
102 loop {
103 match futures_util::ready!(Pin::new(&mut self.0).poll_frame(cx)?) {
104 Some(frame) => match frame.into_data() {
105 Ok(data) => return Poll::Ready(Some(Ok(data))),
106 Err(_frame) => {}
107 },
108 None => return Poll::Ready(None),
109 }
110 }
111 }
112}
113
114macro_rules! body_from_impl {
115 ($ty:ty) => {
116 impl From<$ty> for Body {
117 fn from(value: $ty) -> Self {
118 Self::new(Full::from(value))
119 }
120 }
121 };
122}
123
124body_from_impl!(&'static str);
125body_from_impl!(String);
126body_from_impl!(Bytes);
127
128pin_project! {
129 struct StreamBody<S> {
130 #[pin]
131 stream: SyncWrapper<S>,
132 }
133}
134
135impl<S> http_body::Body for StreamBody<S>
136where
137 S: TryStream,
138 S::Ok: Into<Bytes>,
139 S::Error: Into<BoxError>,
140{
141 type Data = Bytes;
142 type Error = Error;
143
144 fn poll_frame(
145 self: Pin<&mut Self>,
146 cx: &mut Context<'_>,
147 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
148 let stream = self.project().stream.get_pin_mut();
149 match futures_util::ready!(stream.try_poll_next(cx)) {
150 Some(Ok(chunk)) => Poll::Ready(Some(Ok(Frame::data(chunk.into())))),
151 Some(Err(err)) => Poll::Ready(Some(Err(Error::new(err)))),
152 None => Poll::Ready(None),
153 }
154 }
155}