wasi_net/reqwest/
body.rs

1#![allow(dead_code)]
2use std::fmt;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use bytes::Bytes;
7use futures_core::Stream;
8use http_body::Body as HttpBody;
9use pin_project_lite::pin_project;
10
11use super::*;
12
13/// An asynchronous request body.
14pub struct Body {
15    inner: Inner,
16}
17
18// The `Stream` trait isn't stable, so the impl isn't public.
19pub(crate) struct ImplStream(Body);
20
21enum Inner {
22    Reusable(Bytes),
23    Streaming {
24        body: Pin<
25            Box<
26                dyn HttpBody<Data = Bytes, Error = Box<dyn std::error::Error + Send + Sync>>
27                    + Send
28                    + Sync,
29            >,
30        >,
31    },
32}
33
34pin_project! {
35    struct WrapStream<S> {
36        #[pin]
37        inner: S,
38    }
39}
40
41impl Body {
42    /// Returns a reference to the internal data of the `Body`.
43    ///
44    /// `None` is returned, if the underlying data is a stream.
45    pub fn as_bytes(&self) -> Option<&[u8]> {
46        match &self.inner {
47            Inner::Reusable(bytes) => Some(bytes.as_ref()),
48            Inner::Streaming { .. } => None,
49        }
50    }
51
52    /// Wrap a futures `Stream` in a box inside `Body`.
53    ///
54    /// # Example
55    ///
56    /// ```
57    /// # use reqwest::Body;
58    /// # use futures_util;
59    /// # fn main() {
60    /// let chunks: Vec<Result<_, ::std::io::Error>> = vec![
61    ///     Ok("hello"),
62    ///     Ok(" "),
63    ///     Ok("world"),
64    /// ];
65    ///
66    /// let stream = futures_util::stream::iter(chunks);
67    ///
68    /// let body = Body::wrap_stream(stream);
69    /// # }
70    /// ```
71    ///
72    /// # Optional
73    ///
74    /// This requires the `stream` feature to be enabled.
75    #[cfg(feature = "stream")]
76    pub fn wrap_stream<S>(stream: S) -> Body
77    where
78        S: futures_core::stream::TryStream + Send + Sync + 'static,
79        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
80        Bytes: From<S::Ok>,
81    {
82        Body::stream(stream)
83    }
84
85    pub(crate) fn stream<S>(stream: S) -> Body
86    where
87        S: futures_core::stream::TryStream + Send + Sync + 'static,
88        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
89        Bytes: From<S::Ok>,
90    {
91        use futures_util::TryStreamExt;
92
93        let body = Box::pin(WrapStream {
94            inner: stream.map_ok(Bytes::from).map_err(Into::into),
95        });
96        Body {
97            inner: Inner::Streaming { body },
98        }
99    }
100
101    pub(crate) fn empty() -> Body {
102        Body::reusable(Bytes::new())
103    }
104
105    pub(crate) fn reusable(chunk: Bytes) -> Body {
106        Body {
107            inner: Inner::Reusable(chunk),
108        }
109    }
110
111    pub(crate) fn try_reuse(self) -> (Option<Bytes>, Self) {
112        let reuse = match self.inner {
113            Inner::Reusable(ref chunk) => Some(chunk.clone()),
114            Inner::Streaming { .. } => None,
115        };
116
117        (reuse, self)
118    }
119
120    pub(crate) fn try_clone(&self) -> Option<Body> {
121        match self.inner {
122            Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())),
123            Inner::Streaming { .. } => None,
124        }
125    }
126
127    pub(crate) fn into_stream(self) -> ImplStream {
128        ImplStream(self)
129    }
130
131    #[cfg(feature = "multipart")]
132    pub(crate) fn content_length(&self) -> Option<u64> {
133        match self.inner {
134            Inner::Reusable(ref bytes) => Some(bytes.len() as u64),
135            Inner::Streaming { ref body, .. } => body.size_hint().exact(),
136        }
137    }
138}
139
140impl From<Bytes> for Body {
141    #[inline]
142    fn from(bytes: Bytes) -> Body {
143        Body::reusable(bytes)
144    }
145}
146
147impl From<Vec<u8>> for Body {
148    #[inline]
149    fn from(vec: Vec<u8>) -> Body {
150        Body::reusable(vec.into())
151    }
152}
153
154impl From<&'static [u8]> for Body {
155    #[inline]
156    fn from(s: &'static [u8]) -> Body {
157        Body::reusable(Bytes::from_static(s))
158    }
159}
160
161impl From<String> for Body {
162    #[inline]
163    fn from(s: String) -> Body {
164        Body::reusable(s.into())
165    }
166}
167
168impl From<&'static str> for Body {
169    #[inline]
170    fn from(s: &'static str) -> Body {
171        s.as_bytes().into()
172    }
173}
174
175impl fmt::Debug for Body {
176    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
177        f.debug_struct("Body").finish()
178    }
179}
180
181// ===== impl ImplStream =====
182
183impl HttpBody for ImplStream {
184    type Data = Bytes;
185    type Error = Error;
186
187    fn poll_data(
188        mut self: Pin<&mut Self>,
189        cx: &mut Context,
190    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
191        let opt_try_chunk = match self.0.inner {
192            Inner::Streaming { ref mut body } => futures_core::ready!(Pin::new(body).poll_data(cx))
193                .map(|opt_chunk| {
194                    opt_chunk
195                        .map(Into::into)
196                        .map_err(|e| Error::new(ErrorKind::Other, e))
197                }),
198            Inner::Reusable(ref mut bytes) => {
199                if bytes.is_empty() {
200                    None
201                } else {
202                    Some(Ok(std::mem::replace(bytes, Bytes::new())))
203                }
204            }
205        };
206
207        Poll::Ready(opt_try_chunk)
208    }
209
210    fn poll_trailers(
211        self: Pin<&mut Self>,
212        _cx: &mut Context,
213    ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
214        Poll::Ready(Ok(None))
215    }
216
217    fn is_end_stream(&self) -> bool {
218        match self.0.inner {
219            Inner::Streaming { ref body, .. } => body.is_end_stream(),
220            Inner::Reusable(ref bytes) => bytes.is_empty(),
221        }
222    }
223
224    fn size_hint(&self) -> http_body::SizeHint {
225        match self.0.inner {
226            Inner::Streaming { ref body, .. } => body.size_hint(),
227            Inner::Reusable(ref bytes) => {
228                let mut hint = http_body::SizeHint::default();
229                hint.set_exact(bytes.len() as u64);
230                hint
231            }
232        }
233    }
234}
235
236impl Stream for ImplStream {
237    type Item = Result<Bytes, Error>;
238
239    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
240        self.poll_data(cx)
241    }
242}
243
244// ===== impl WrapStream =====
245
246impl<S, D, E> HttpBody for WrapStream<S>
247where
248    S: Stream<Item = Result<D, E>>,
249    D: Into<Bytes>,
250    E: Into<Box<dyn std::error::Error + Send + Sync>>,
251{
252    type Data = Bytes;
253    type Error = E;
254
255    fn poll_data(
256        self: Pin<&mut Self>,
257        cx: &mut Context,
258    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
259        let item = futures_core::ready!(self.project().inner.poll_next(cx)?);
260
261        Poll::Ready(item.map(|val| Ok(val.into())))
262    }
263
264    fn poll_trailers(
265        self: Pin<&mut Self>,
266        _cx: &mut Context,
267    ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
268        Poll::Ready(Ok(None))
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::Body;
275
276    #[test]
277    fn test_as_bytes() {
278        let test_data = b"Test body";
279        let body = Body::from(&test_data[..]);
280        assert_eq!(body.as_bytes(), Some(&test_data[..]));
281    }
282}