1pub use bytes::Bytes;
4pub use http_body::{Body as HttpBody, Frame, SizeHint};
5
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use futures_core::{Stream, TryStream};
10use http_body_util::{BodyExt, Empty, Full};
11
12use crate::BoxError;
13
14type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, BoxError>;
15
16fn boxed<B>(body: B) -> BoxBody
17where
18 B: HttpBody<Data = Bytes> + Send + 'static,
19 B::Error: Into<BoxError>,
20{
21 crate::util::__try_downcast(body).unwrap_or_else(|body| body.map_err(Into::into).boxed_unsync())
22}
23
24#[derive(Debug)]
26pub struct Body(BoxBody);
27
28impl Body {
29 pub fn new<B>(body: B) -> Self
31 where
32 B: HttpBody<Data = Bytes> + Send + 'static,
33 B::Error: Into<BoxError>,
34 {
35 crate::util::__try_downcast(body).unwrap_or_else(|body| Self(boxed(body)))
36 }
37
38 pub fn empty() -> Self {
40 Self::new(Empty::new())
41 }
42
43 pub fn from_data_stream<S>(stream: S) -> Self
45 where
46 S: TryStream + Send + 'static,
47 S::Ok: Into<Bytes>,
48 S::Error: Into<BoxError>,
49 {
50 Self::new(StreamBody { stream })
51 }
52
53 pub fn into_data_stream(self) -> BodyDataStream {
55 BodyDataStream { inner: self }
56 }
57
58 pub async fn to_bytes(self) -> Result<Bytes, BoxError> {
60 self.collect().await.map(|col| col.to_bytes())
61 }
62}
63
64impl Default for Body {
65 fn default() -> Self {
66 Self::empty()
67 }
68}
69
70impl From<()> for Body {
71 fn from(_: ()) -> Self {
72 Self::empty()
73 }
74}
75
76macro_rules! body_from_impl {
77 ($ty:ty) => {
78 impl From<$ty> for Body {
79 fn from(buf: $ty) -> Self {
80 Self::new(Full::from(buf))
81 }
82 }
83 };
84}
85
86body_from_impl!(&'static [u8]);
87body_from_impl!(std::borrow::Cow<'static, [u8]>);
88body_from_impl!(Vec<u8>);
89
90body_from_impl!(&'static str);
91body_from_impl!(std::borrow::Cow<'static, str>);
92body_from_impl!(String);
93
94body_from_impl!(Bytes);
95
96impl HttpBody for Body {
97 type Data = Bytes;
98 type Error = BoxError;
99
100 #[inline]
101 fn poll_frame(
102 mut self: Pin<&mut Self>,
103 cx: &mut Context<'_>,
104 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
105 Pin::new(&mut self.0).poll_frame(cx)
106 }
107
108 #[inline]
109 fn size_hint(&self) -> SizeHint {
110 self.0.size_hint()
111 }
112
113 #[inline]
114 fn is_end_stream(&self) -> bool {
115 self.0.is_end_stream()
116 }
117}
118
119#[derive(Debug)]
121pub struct BodyDataStream {
122 inner: Body,
123}
124
125impl Stream for BodyDataStream {
126 type Item = Result<Bytes, BoxError>;
127
128 #[inline]
129 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
130 loop {
131 match futures_core::ready!(Pin::new(&mut self.inner).poll_frame(cx)?) {
132 Some(frame) => match frame.into_data() {
133 Ok(data) => return Poll::Ready(Some(Ok(data))),
134 Err(_frame) => {}
135 },
136 None => return Poll::Ready(None),
137 }
138 }
139 }
140}
141
142impl HttpBody for BodyDataStream {
143 type Data = Bytes;
144 type Error = BoxError;
145
146 #[inline]
147 fn poll_frame(
148 mut self: Pin<&mut Self>,
149 cx: &mut Context<'_>,
150 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
151 Pin::new(&mut self.inner).poll_frame(cx)
152 }
153
154 #[inline]
155 fn is_end_stream(&self) -> bool {
156 self.inner.is_end_stream()
157 }
158
159 #[inline]
160 fn size_hint(&self) -> SizeHint {
161 self.inner.size_hint()
162 }
163}
164
165pin_project_lite::pin_project! {
166 struct StreamBody<S> {
167 #[pin]
168 stream: S,
169 }
170}
171
172impl<S> HttpBody for StreamBody<S>
173where
174 S: TryStream,
175 S::Ok: Into<Bytes>,
176 S::Error: Into<BoxError>,
177{
178 type Data = Bytes;
179 type Error = BoxError;
180
181 fn poll_frame(
182 self: Pin<&mut Self>,
183 cx: &mut Context<'_>,
184 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
185 let this = self.project();
186 match futures_core::ready!(this.stream.try_poll_next(cx)) {
187 Some(Ok(chunk)) => Poll::Ready(Some(Ok(Frame::data(chunk.into())))),
188 Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
189 None => Poll::Ready(None),
190 }
191 }
192}