1#![allow(dead_code)]
2use std::fmt;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use bytes::Bytes;
7use futures_core::Stream;
8use http_body::Body as HttpBody;
9use pin_project_lite::pin_project;
10
11use super::*;
12
13pub struct Body {
15 inner: Inner,
16}
17
18pub(crate) struct ImplStream(Body);
20
21enum Inner {
22 Reusable(Bytes),
23 Streaming {
24 body: Pin<
25 Box<
26 dyn HttpBody<Data = Bytes, Error = Box<dyn std::error::Error + Send + Sync>>
27 + Send
28 + Sync,
29 >,
30 >,
31 },
32}
33
34pin_project! {
35 struct WrapStream<S> {
36 #[pin]
37 inner: S,
38 }
39}
40
41impl Body {
42 pub fn as_bytes(&self) -> Option<&[u8]> {
46 match &self.inner {
47 Inner::Reusable(bytes) => Some(bytes.as_ref()),
48 Inner::Streaming { .. } => None,
49 }
50 }
51
52 #[cfg(feature = "stream")]
76 pub fn wrap_stream<S>(stream: S) -> Body
77 where
78 S: futures_core::stream::TryStream + Send + Sync + 'static,
79 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
80 Bytes: From<S::Ok>,
81 {
82 Body::stream(stream)
83 }
84
85 pub(crate) fn stream<S>(stream: S) -> Body
86 where
87 S: futures_core::stream::TryStream + Send + Sync + 'static,
88 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
89 Bytes: From<S::Ok>,
90 {
91 use futures_util::TryStreamExt;
92
93 let body = Box::pin(WrapStream {
94 inner: stream.map_ok(Bytes::from).map_err(Into::into),
95 });
96 Body {
97 inner: Inner::Streaming { body },
98 }
99 }
100
101 pub(crate) fn empty() -> Body {
102 Body::reusable(Bytes::new())
103 }
104
105 pub(crate) fn reusable(chunk: Bytes) -> Body {
106 Body {
107 inner: Inner::Reusable(chunk),
108 }
109 }
110
111 pub(crate) fn try_reuse(self) -> (Option<Bytes>, Self) {
112 let reuse = match self.inner {
113 Inner::Reusable(ref chunk) => Some(chunk.clone()),
114 Inner::Streaming { .. } => None,
115 };
116
117 (reuse, self)
118 }
119
120 pub(crate) fn try_clone(&self) -> Option<Body> {
121 match self.inner {
122 Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())),
123 Inner::Streaming { .. } => None,
124 }
125 }
126
127 pub(crate) fn into_stream(self) -> ImplStream {
128 ImplStream(self)
129 }
130
131 #[cfg(feature = "multipart")]
132 pub(crate) fn content_length(&self) -> Option<u64> {
133 match self.inner {
134 Inner::Reusable(ref bytes) => Some(bytes.len() as u64),
135 Inner::Streaming { ref body, .. } => body.size_hint().exact(),
136 }
137 }
138}
139
140impl From<Bytes> for Body {
141 #[inline]
142 fn from(bytes: Bytes) -> Body {
143 Body::reusable(bytes)
144 }
145}
146
147impl From<Vec<u8>> for Body {
148 #[inline]
149 fn from(vec: Vec<u8>) -> Body {
150 Body::reusable(vec.into())
151 }
152}
153
154impl From<&'static [u8]> for Body {
155 #[inline]
156 fn from(s: &'static [u8]) -> Body {
157 Body::reusable(Bytes::from_static(s))
158 }
159}
160
161impl From<String> for Body {
162 #[inline]
163 fn from(s: String) -> Body {
164 Body::reusable(s.into())
165 }
166}
167
168impl From<&'static str> for Body {
169 #[inline]
170 fn from(s: &'static str) -> Body {
171 s.as_bytes().into()
172 }
173}
174
175impl fmt::Debug for Body {
176 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
177 f.debug_struct("Body").finish()
178 }
179}
180
181impl HttpBody for ImplStream {
184 type Data = Bytes;
185 type Error = Error;
186
187 fn poll_data(
188 mut self: Pin<&mut Self>,
189 cx: &mut Context,
190 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
191 let opt_try_chunk = match self.0.inner {
192 Inner::Streaming { ref mut body } => futures_core::ready!(Pin::new(body).poll_data(cx))
193 .map(|opt_chunk| {
194 opt_chunk
195 .map(Into::into)
196 .map_err(|e| Error::new(ErrorKind::Other, e))
197 }),
198 Inner::Reusable(ref mut bytes) => {
199 if bytes.is_empty() {
200 None
201 } else {
202 Some(Ok(std::mem::replace(bytes, Bytes::new())))
203 }
204 }
205 };
206
207 Poll::Ready(opt_try_chunk)
208 }
209
210 fn poll_trailers(
211 self: Pin<&mut Self>,
212 _cx: &mut Context,
213 ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
214 Poll::Ready(Ok(None))
215 }
216
217 fn is_end_stream(&self) -> bool {
218 match self.0.inner {
219 Inner::Streaming { ref body, .. } => body.is_end_stream(),
220 Inner::Reusable(ref bytes) => bytes.is_empty(),
221 }
222 }
223
224 fn size_hint(&self) -> http_body::SizeHint {
225 match self.0.inner {
226 Inner::Streaming { ref body, .. } => body.size_hint(),
227 Inner::Reusable(ref bytes) => {
228 let mut hint = http_body::SizeHint::default();
229 hint.set_exact(bytes.len() as u64);
230 hint
231 }
232 }
233 }
234}
235
236impl Stream for ImplStream {
237 type Item = Result<Bytes, Error>;
238
239 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
240 self.poll_data(cx)
241 }
242}
243
244impl<S, D, E> HttpBody for WrapStream<S>
247where
248 S: Stream<Item = Result<D, E>>,
249 D: Into<Bytes>,
250 E: Into<Box<dyn std::error::Error + Send + Sync>>,
251{
252 type Data = Bytes;
253 type Error = E;
254
255 fn poll_data(
256 self: Pin<&mut Self>,
257 cx: &mut Context,
258 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
259 let item = futures_core::ready!(self.project().inner.poll_next(cx)?);
260
261 Poll::Ready(item.map(|val| Ok(val.into())))
262 }
263
264 fn poll_trailers(
265 self: Pin<&mut Self>,
266 _cx: &mut Context,
267 ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
268 Poll::Ready(Ok(None))
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use super::Body;
275
276 #[test]
277 fn test_as_bytes() {
278 let test_data = b"Test body";
279 let body = Body::from(&test_data[..]);
280 assert_eq!(body.as_bytes(), Some(&test_data[..]));
281 }
282}