1use crate::dep::{
4 http_body::{self, Body as _, Frame},
5 http_body_util::{self, BodyExt},
6};
7use bytes::Bytes;
8use futures_core::TryStream;
9use futures_lite::stream::Stream;
10use pin_project_lite::pin_project;
11use rama_error::{BoxError, OpaqueError};
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use sync_wrapper::SyncWrapper;
15
16type BoxBody = http_body_util::combinators::BoxBody<Bytes, BoxError>;
17
18fn boxed<B>(body: B) -> BoxBody
19where
20 B: http_body::Body<Data = Bytes, Error: Into<BoxError>> + Send + Sync + 'static,
21{
22 try_downcast(body).unwrap_or_else(|body| body.map_err(Into::into).boxed())
23}
24
25pub(crate) fn try_downcast<T, K>(k: K) -> Result<T, K>
26where
27 T: 'static,
28 K: Send + 'static,
29{
30 let mut k = Some(k);
31 if let Some(k) = <dyn std::any::Any>::downcast_mut::<Option<T>>(&mut k) {
32 Ok(k.take().unwrap())
33 } else {
34 Err(k.unwrap())
35 }
36}
37
38#[derive(Debug)]
40pub struct Body(BoxBody);
41
42impl Body {
43 pub fn new<B>(body: B) -> Self
45 where
46 B: http_body::Body<Data = Bytes, Error: Into<BoxError>> + Send + Sync + 'static,
47 {
48 try_downcast(body).unwrap_or_else(|body| Self(boxed(body)))
49 }
50
51 pub fn with_limit<B>(body: B, limit: usize) -> Self
53 where
54 B: http_body::Body<Data = Bytes, Error: Into<BoxError>> + Send + Sync + 'static,
55 {
56 Self::new(crate::dep::http_body_util::Limited::new(body, limit))
57 }
58
59 pub fn empty() -> Self {
61 Self::new(http_body_util::Empty::new())
62 }
63
64 pub fn from_stream<S>(stream: S) -> Self
68 where
69 S: TryStream<Ok: Into<Bytes>, Error: Into<BoxError>> + Send + 'static,
70 {
71 Self::new(StreamBody {
72 stream: SyncWrapper::new(stream),
73 })
74 }
75
76 pub fn limited(self, limit: usize) -> Self {
78 Self::new(crate::dep::http_body_util::Limited::new(self.0, limit))
79 }
80
81 pub fn into_data_stream(self) -> BodyDataStream {
88 BodyDataStream { inner: self }
89 }
90}
91
92impl Default for Body {
93 fn default() -> Self {
94 Self::empty()
95 }
96}
97
98impl From<()> for Body {
99 fn from(_: ()) -> Self {
100 Self::empty()
101 }
102}
103
104macro_rules! body_from_impl {
105 ($ty:ty) => {
106 impl From<$ty> for Body {
107 fn from(buf: $ty) -> Self {
108 Self::new(http_body_util::Full::from(buf))
109 }
110 }
111 };
112}
113
114body_from_impl!(&'static [u8]);
115body_from_impl!(std::borrow::Cow<'static, [u8]>);
116body_from_impl!(Vec<u8>);
117
118body_from_impl!(&'static str);
119body_from_impl!(std::borrow::Cow<'static, str>);
120body_from_impl!(String);
121
122body_from_impl!(Bytes);
123
124impl http_body::Body for Body {
125 type Data = Bytes;
126 type Error = OpaqueError;
127
128 #[inline]
129 fn poll_frame(
130 mut self: Pin<&mut Self>,
131 cx: &mut Context<'_>,
132 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
133 Pin::new(&mut self.0)
134 .poll_frame(cx)
135 .map_err(OpaqueError::from_boxed)
136 }
137
138 #[inline]
139 fn size_hint(&self) -> http_body::SizeHint {
140 self.0.size_hint()
141 }
142
143 #[inline]
144 fn is_end_stream(&self) -> bool {
145 self.0.is_end_stream()
146 }
147}
148
149#[derive(Debug)]
153pub struct BodyDataStream {
154 inner: Body,
155}
156
157impl Stream for BodyDataStream {
158 type Item = Result<Bytes, BoxError>;
159
160 #[inline]
161 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
162 loop {
163 match futures_lite::ready!(Pin::new(&mut self.inner).poll_frame(cx)?) {
164 Some(frame) => match frame.into_data() {
165 Ok(data) => return Poll::Ready(Some(Ok(data))),
166 Err(_frame) => {}
167 },
168 None => return Poll::Ready(None),
169 }
170 }
171 }
172}
173
174impl http_body::Body for BodyDataStream {
175 type Data = Bytes;
176 type Error = BoxError;
177
178 #[inline]
179 fn poll_frame(
180 mut self: Pin<&mut Self>,
181 cx: &mut Context<'_>,
182 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
183 Pin::new(&mut self.inner).poll_frame(cx).map_err(Into::into)
184 }
185
186 #[inline]
187 fn is_end_stream(&self) -> bool {
188 self.inner.is_end_stream()
189 }
190
191 #[inline]
192 fn size_hint(&self) -> http_body::SizeHint {
193 self.inner.size_hint()
194 }
195}
196
197pin_project! {
198 struct StreamBody<S> {
199 #[pin]
200 stream: SyncWrapper<S>,
201 }
202}
203
204impl<S> http_body::Body for StreamBody<S>
205where
206 S: TryStream<Ok: Into<Bytes>, Error: Into<BoxError>>,
207{
208 type Data = Bytes;
209 type Error = BoxError;
210
211 fn poll_frame(
212 self: Pin<&mut Self>,
213 cx: &mut Context<'_>,
214 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
215 let stream = self.project().stream.get_pin_mut();
216 match futures_lite::ready!(stream.try_poll_next(cx)) {
217 Some(Ok(chunk)) => Poll::Ready(Some(Ok(Frame::data(chunk.into())))),
218 Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
219 None => Poll::Ready(None),
220 }
221 }
222}
223
224#[test]
225fn test_try_downcast() {
226 assert_eq!(try_downcast::<i32, _>(5_u32), Err(5_u32));
227 assert_eq!(try_downcast::<i32, _>(5_i32), Ok(5_i32));
228}