dusks_reqwest/async_impl/
body.rs

1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use bytes::Bytes;
8use http_body::Body as HttpBody;
9use http_body_util::combinators::BoxBody;
10//use sync_wrapper::SyncWrapper;
11use pin_project_lite::pin_project;
12#[cfg(feature = "stream")]
13use tokio::fs::File;
14use tokio::time::Sleep;
15#[cfg(feature = "stream")]
16use tokio_util::io::ReaderStream;
17
18/// An asynchronous request body.
19pub struct Body {
20    inner: Inner,
21}
22
23enum Inner {
24    Reusable(Bytes),
25    Streaming(BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>),
26}
27
28pin_project! {
29    /// A body with a total timeout.
30    ///
31    /// The timeout does not reset upon each chunk, but rather requires the whole
32    /// body be streamed before the deadline is reached.
33    pub(crate) struct TotalTimeoutBody<B> {
34        #[pin]
35        inner: B,
36        timeout: Pin<Box<Sleep>>,
37    }
38}
39
40pin_project! {
41    pub(crate) struct ReadTimeoutBody<B> {
42        #[pin]
43        inner: B,
44        #[pin]
45        sleep: Option<Sleep>,
46        timeout: Duration,
47    }
48}
49
50/// Converts any `impl Body` into a `impl Stream` of just its DATA frames.
51pub(crate) struct DataStream<B>(pub(crate) B);
52
53impl Body {
54    /// Returns a reference to the internal data of the `Body`.
55    ///
56    /// `None` is returned, if the underlying data is a stream.
57    pub fn as_bytes(&self) -> Option<&[u8]> {
58        match &self.inner {
59            Inner::Reusable(bytes) => Some(bytes.as_ref()),
60            Inner::Streaming(..) => None,
61        }
62    }
63
64    /// Wrap a futures `Stream` in a box inside `Body`.
65    ///
66    /// # Example
67    ///
68    /// ```
69    /// # use reqwest::Body;
70    /// # use futures_util;
71    /// # fn main() {
72    /// let chunks: Vec<Result<_, ::std::io::Error>> = vec![
73    ///     Ok("hello"),
74    ///     Ok(" "),
75    ///     Ok("world"),
76    /// ];
77    ///
78    /// let stream = futures_util::stream::iter(chunks);
79    ///
80    /// let body = Body::wrap_stream(stream);
81    /// # }
82    /// ```
83    ///
84    /// # Optional
85    ///
86    /// This requires the `stream` feature to be enabled.
87    #[cfg(feature = "stream")]
88    #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
89    pub fn wrap_stream<S>(stream: S) -> Body
90    where
91        S: futures_core::stream::TryStream + Send + Sync + 'static,
92        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
93        Bytes: From<S::Ok>,
94    {
95        Body::stream(stream)
96    }
97
98    #[cfg(any(feature = "stream", feature = "multipart", feature = "blocking"))]
99    pub(crate) fn stream<S>(stream: S) -> Body
100    where
101        S: futures_core::stream::TryStream + Send + Sync + 'static,
102        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
103        Bytes: From<S::Ok>,
104    {
105        use futures_util::TryStreamExt;
106        use http_body::Frame;
107        use http_body_util::StreamBody;
108
109        let body = http_body_util::BodyExt::boxed(StreamBody::new(
110            stream
111                .map_ok(|d| Frame::data(Bytes::from(d)))
112                .map_err(Into::into),
113        ));
114        Body {
115            inner: Inner::Streaming(body),
116        }
117    }
118
119    /*
120    #[cfg(feature = "blocking")]
121    pub(crate) fn wrap(body: hyper::Body) -> Body {
122        Body {
123            inner: Inner::Streaming {
124                body: Box::pin(WrapHyper(body)),
125            },
126        }
127    }
128    */
129
130    pub(crate) fn empty() -> Body {
131        Body::reusable(Bytes::new())
132    }
133
134    pub(crate) fn reusable(chunk: Bytes) -> Body {
135        Body {
136            inner: Inner::Reusable(chunk),
137        }
138    }
139
140    // pub?
141    pub(crate) fn streaming<B>(inner: B) -> Body
142    where
143        B: HttpBody + Send + Sync + 'static,
144        B::Data: Into<Bytes>,
145        B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
146    {
147        use http_body_util::BodyExt;
148
149        let boxed = inner
150            .map_frame(|f| f.map_data(Into::into))
151            .map_err(Into::into)
152            .boxed();
153
154        Body {
155            inner: Inner::Streaming(boxed),
156        }
157    }
158
159    pub(crate) fn try_reuse(self) -> (Option<Bytes>, Self) {
160        let reuse = match self.inner {
161            Inner::Reusable(ref chunk) => Some(chunk.clone()),
162            Inner::Streaming { .. } => None,
163        };
164
165        (reuse, self)
166    }
167
168    pub(crate) fn try_clone(&self) -> Option<Body> {
169        match self.inner {
170            Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())),
171            Inner::Streaming { .. } => None,
172        }
173    }
174
175    #[cfg(feature = "multipart")]
176    pub(crate) fn into_stream(self) -> DataStream<Body> {
177        DataStream(self)
178    }
179
180    #[cfg(feature = "multipart")]
181    pub(crate) fn content_length(&self) -> Option<u64> {
182        match self.inner {
183            Inner::Reusable(ref bytes) => Some(bytes.len() as u64),
184            Inner::Streaming(ref body) => body.size_hint().exact(),
185        }
186    }
187}
188
189impl Default for Body {
190    #[inline]
191    fn default() -> Body {
192        Body::empty()
193    }
194}
195
196/*
197impl From<hyper::Body> for Body {
198    #[inline]
199    fn from(body: hyper::Body) -> Body {
200        Self {
201            inner: Inner::Streaming {
202                body: Box::pin(WrapHyper(body)),
203            },
204        }
205    }
206}
207*/
208
209impl From<Bytes> for Body {
210    #[inline]
211    fn from(bytes: Bytes) -> Body {
212        Body::reusable(bytes)
213    }
214}
215
216impl From<Vec<u8>> for Body {
217    #[inline]
218    fn from(vec: Vec<u8>) -> Body {
219        Body::reusable(vec.into())
220    }
221}
222
223impl From<&'static [u8]> for Body {
224    #[inline]
225    fn from(s: &'static [u8]) -> Body {
226        Body::reusable(Bytes::from_static(s))
227    }
228}
229
230impl From<String> for Body {
231    #[inline]
232    fn from(s: String) -> Body {
233        Body::reusable(s.into())
234    }
235}
236
237impl From<&'static str> for Body {
238    #[inline]
239    fn from(s: &'static str) -> Body {
240        s.as_bytes().into()
241    }
242}
243
244#[cfg(feature = "stream")]
245#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
246impl From<File> for Body {
247    #[inline]
248    fn from(file: File) -> Body {
249        Body::wrap_stream(ReaderStream::new(file))
250    }
251}
252
253impl fmt::Debug for Body {
254    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
255        f.debug_struct("Body").finish()
256    }
257}
258
259impl HttpBody for Body {
260    type Data = Bytes;
261    type Error = crate::Error;
262
263    fn poll_frame(
264        mut self: Pin<&mut Self>,
265        cx: &mut Context,
266    ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
267        match self.inner {
268            Inner::Reusable(ref mut bytes) => {
269                let out = bytes.split_off(0);
270                if out.is_empty() {
271                    Poll::Ready(None)
272                } else {
273                    Poll::Ready(Some(Ok(hyper::body::Frame::data(out))))
274                }
275            }
276            Inner::Streaming(ref mut body) => Poll::Ready(
277                futures_core::ready!(Pin::new(body).poll_frame(cx))
278                    .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
279            ),
280        }
281    }
282
283    fn size_hint(&self) -> http_body::SizeHint {
284        match self.inner {
285            Inner::Reusable(ref bytes) => http_body::SizeHint::with_exact(bytes.len() as u64),
286            Inner::Streaming(ref body) => body.size_hint(),
287        }
288    }
289
290    fn is_end_stream(&self) -> bool {
291        match self.inner {
292            Inner::Reusable(ref bytes) => bytes.is_empty(),
293            Inner::Streaming(ref body) => body.is_end_stream(),
294        }
295    }
296}
297
298// ===== impl TotalTimeoutBody =====
299
300pub(crate) fn total_timeout<B>(body: B, timeout: Pin<Box<Sleep>>) -> TotalTimeoutBody<B> {
301    TotalTimeoutBody {
302        inner: body,
303        timeout,
304    }
305}
306
307pub(crate) fn with_read_timeout<B>(body: B, timeout: Duration) -> ReadTimeoutBody<B> {
308    ReadTimeoutBody {
309        inner: body,
310        sleep: None,
311        timeout,
312    }
313}
314
315impl<B> hyper::body::Body for TotalTimeoutBody<B>
316where
317    B: hyper::body::Body,
318    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
319{
320    type Data = B::Data;
321    type Error = crate::Error;
322
323    fn poll_frame(
324        self: Pin<&mut Self>,
325        cx: &mut Context,
326    ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
327        let this = self.project();
328        if let Poll::Ready(()) = this.timeout.as_mut().poll(cx) {
329            return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
330        }
331        Poll::Ready(
332            futures_core::ready!(this.inner.poll_frame(cx))
333                .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
334        )
335    }
336
337    #[inline]
338    fn size_hint(&self) -> http_body::SizeHint {
339        self.inner.size_hint()
340    }
341
342    #[inline]
343    fn is_end_stream(&self) -> bool {
344        self.inner.is_end_stream()
345    }
346}
347
348impl<B> hyper::body::Body for ReadTimeoutBody<B>
349where
350    B: hyper::body::Body,
351    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
352{
353    type Data = B::Data;
354    type Error = crate::Error;
355
356    fn poll_frame(
357        self: Pin<&mut Self>,
358        cx: &mut Context,
359    ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
360        let mut this = self.project();
361
362        // Start the `Sleep` if not active.
363        let sleep_pinned = if let Some(some) = this.sleep.as_mut().as_pin_mut() {
364            some
365        } else {
366            this.sleep.set(Some(tokio::time::sleep(*this.timeout)));
367            this.sleep.as_mut().as_pin_mut().unwrap()
368        };
369
370        // Error if the timeout has expired.
371        if let Poll::Ready(()) = sleep_pinned.poll(cx) {
372            return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
373        }
374
375        let item = futures_core::ready!(this.inner.poll_frame(cx))
376            .map(|opt_chunk| opt_chunk.map_err(crate::error::body));
377        // a ready frame means timeout is reset
378        this.sleep.set(None);
379        Poll::Ready(item)
380    }
381
382    #[inline]
383    fn size_hint(&self) -> http_body::SizeHint {
384        self.inner.size_hint()
385    }
386
387    #[inline]
388    fn is_end_stream(&self) -> bool {
389        self.inner.is_end_stream()
390    }
391}
392
393pub(crate) type ResponseBody =
394    http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
395
396pub(crate) fn boxed<B>(body: B) -> ResponseBody
397where
398    B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
399    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
400{
401    use http_body_util::BodyExt;
402
403    body.map_err(box_err).boxed()
404}
405
406pub(crate) fn response<B>(
407    body: B,
408    deadline: Option<Pin<Box<Sleep>>>,
409    read_timeout: Option<Duration>,
410) -> ResponseBody
411where
412    B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
413    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
414{
415    use http_body_util::BodyExt;
416
417    match (deadline, read_timeout) {
418        (Some(total), Some(read)) => {
419            let body = with_read_timeout(body, read).map_err(box_err);
420            total_timeout(body, total).map_err(box_err).boxed()
421        }
422        (Some(total), None) => total_timeout(body, total).map_err(box_err).boxed(),
423        (None, Some(read)) => with_read_timeout(body, read).map_err(box_err).boxed(),
424        (None, None) => body.map_err(box_err).boxed(),
425    }
426}
427
428fn box_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync>
429where
430    E: Into<Box<dyn std::error::Error + Send + Sync>>,
431{
432    err.into()
433}
434
435// ===== impl DataStream =====
436
437impl<B> futures_core::Stream for DataStream<B>
438where
439    B: HttpBody<Data = Bytes> + Unpin,
440{
441    type Item = Result<Bytes, B::Error>;
442
443    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
444        loop {
445            return match futures_core::ready!(Pin::new(&mut self.0).poll_frame(cx)) {
446                Some(Ok(frame)) => {
447                    // skip non-data frames
448                    if let Ok(buf) = frame.into_data() {
449                        Poll::Ready(Some(Ok(buf)))
450                    } else {
451                        continue;
452                    }
453                }
454                Some(Err(err)) => Poll::Ready(Some(Err(err))),
455                None => Poll::Ready(None),
456            };
457        }
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use http_body::Body as _;
464
465    use super::Body;
466
467    #[test]
468    fn test_as_bytes() {
469        let test_data = b"Test body";
470        let body = Body::from(&test_data[..]);
471        assert_eq!(body.as_bytes(), Some(&test_data[..]));
472    }
473
474    #[test]
475    fn body_exact_length() {
476        let empty_body = Body::empty();
477        assert!(empty_body.is_end_stream());
478        assert_eq!(empty_body.size_hint().exact(), Some(0));
479
480        let bytes_body = Body::reusable("abc".into());
481        assert!(!bytes_body.is_end_stream());
482        assert_eq!(bytes_body.size_hint().exact(), Some(3));
483
484        let stream_body = Body::streaming(bytes_body);
485        assert!(!stream_body.is_end_stream());
486        assert_eq!(stream_body.size_hint().exact(), None);
487    }
488}