1use crate::util::try_downcast;
4use crate::{BoxError, Error};
5use bytes::Bytes;
6use futures_util::stream::Stream;
7use futures_util::TryStream;
8use http_body::{Body as _, Frame};
9use http_body_util::BodyExt;
10use pin_project_lite::pin_project;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13use sync_wrapper::SyncWrapper;
14
15type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, Error>;
16
17fn boxed<B>(body: B) -> BoxBody
18where
19 B: http_body::Body<Data = Bytes> + Send + 'static,
20 B::Error: Into<BoxError>,
21{
22 try_downcast(body).unwrap_or_else(|body| body.map_err(|e| Error::new(e.into())).boxed_unsync())
23}
24
25#[derive(Debug)]
27pub struct Body(BoxBody);
28
29impl Body {
30 pub fn new<B>(body: B) -> Self
32 where
33 B: http_body::Body<Data = Bytes> + Send + 'static,
34 B::Error: Into<BoxError>,
35 {
36 try_downcast(body).unwrap_or_else(|body| Self(boxed(body)))
37 }
38
39 pub fn empty() -> Self {
41 Self::new(http_body_util::Empty::new())
42 }
43
44 pub fn from_stream<S>(stream: S) -> Self
48 where
49 S: TryStream + Send + 'static,
50 S::Ok: Into<Bytes>,
51 S::Error: Into<BoxError>,
52 {
53 Self::new(StreamBody {
54 stream: SyncWrapper::new(stream),
55 })
56 }
57
58 pub fn into_data_stream(self) -> BodyDataStream {
65 BodyDataStream { inner: self }
66 }
67}
68
69impl Default for Body {
70 fn default() -> Self {
71 Self::empty()
72 }
73}
74
75impl From<()> for Body {
76 fn from(_: ()) -> Self {
77 Self::empty()
78 }
79}
80
81macro_rules! body_from_impl {
82 ($ty:ty) => {
83 impl From<$ty> for Body {
84 fn from(buf: $ty) -> Self {
85 Self::new(http_body_util::Full::from(buf))
86 }
87 }
88 };
89}
90
91body_from_impl!(&'static [u8]);
92body_from_impl!(std::borrow::Cow<'static, [u8]>);
93body_from_impl!(Vec<u8>);
94
95body_from_impl!(&'static str);
96body_from_impl!(std::borrow::Cow<'static, str>);
97body_from_impl!(String);
98
99body_from_impl!(Bytes);
100
101impl http_body::Body for Body {
102 type Data = Bytes;
103 type Error = Error;
104
105 #[inline]
106 fn poll_frame(
107 mut self: Pin<&mut Self>,
108 cx: &mut Context<'_>,
109 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
110 Pin::new(&mut self.0).poll_frame(cx)
111 }
112
113 #[inline]
114 fn is_end_stream(&self) -> bool {
115 self.0.is_end_stream()
116 }
117
118 #[inline]
119 fn size_hint(&self) -> http_body::SizeHint {
120 self.0.size_hint()
121 }
122}
123
124#[derive(Debug)]
128pub struct BodyDataStream {
129 inner: Body,
130}
131
132impl Stream for BodyDataStream {
133 type Item = Result<Bytes, Error>;
134
135 #[inline]
136 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
137 loop {
138 match futures_util::ready!(Pin::new(&mut self.inner).poll_frame(cx)?) {
139 Some(frame) => match frame.into_data() {
140 Ok(data) => return Poll::Ready(Some(Ok(data))),
141 Err(_frame) => {}
142 },
143 None => return Poll::Ready(None),
144 }
145 }
146 }
147}
148
149impl http_body::Body for BodyDataStream {
150 type Data = Bytes;
151 type Error = Error;
152
153 #[inline]
154 fn poll_frame(
155 mut self: Pin<&mut Self>,
156 cx: &mut Context<'_>,
157 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
158 Pin::new(&mut self.inner).poll_frame(cx)
159 }
160
161 #[inline]
162 fn is_end_stream(&self) -> bool {
163 self.inner.is_end_stream()
164 }
165
166 #[inline]
167 fn size_hint(&self) -> http_body::SizeHint {
168 self.inner.size_hint()
169 }
170}
171
172pin_project! {
173 struct StreamBody<S> {
174 #[pin]
175 stream: SyncWrapper<S>,
176 }
177}
178
179impl<S> http_body::Body for StreamBody<S>
180where
181 S: TryStream,
182 S::Ok: Into<Bytes>,
183 S::Error: Into<BoxError>,
184{
185 type Data = Bytes;
186 type Error = Error;
187
188 fn poll_frame(
189 self: Pin<&mut Self>,
190 cx: &mut Context<'_>,
191 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
192 let stream = self.project().stream.get_pin_mut();
193 match futures_util::ready!(stream.try_poll_next(cx)) {
194 Some(Ok(chunk)) => Poll::Ready(Some(Ok(Frame::data(chunk.into())))),
195 Some(Err(e)) => Poll::Ready(Some(Err(Error::new(e.into())))),
196 None => Poll::Ready(None),
197 }
198 }
199}