dusks_reqwest/async_impl/
body.rs1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use bytes::Bytes;
8use http_body::Body as HttpBody;
9use http_body_util::combinators::BoxBody;
10use pin_project_lite::pin_project;
12#[cfg(feature = "stream")]
13use tokio::fs::File;
14use tokio::time::Sleep;
15#[cfg(feature = "stream")]
16use tokio_util::io::ReaderStream;
17
18pub struct Body {
20 inner: Inner,
21}
22
23enum Inner {
24 Reusable(Bytes),
25 Streaming(BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>),
26}
27
28pin_project! {
29 pub(crate) struct TotalTimeoutBody<B> {
34 #[pin]
35 inner: B,
36 timeout: Pin<Box<Sleep>>,
37 }
38}
39
40pin_project! {
41 pub(crate) struct ReadTimeoutBody<B> {
42 #[pin]
43 inner: B,
44 #[pin]
45 sleep: Option<Sleep>,
46 timeout: Duration,
47 }
48}
49
50pub(crate) struct DataStream<B>(pub(crate) B);
52
53impl Body {
54 pub fn as_bytes(&self) -> Option<&[u8]> {
58 match &self.inner {
59 Inner::Reusable(bytes) => Some(bytes.as_ref()),
60 Inner::Streaming(..) => None,
61 }
62 }
63
64 #[cfg(feature = "stream")]
88 #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
89 pub fn wrap_stream<S>(stream: S) -> Body
90 where
91 S: futures_core::stream::TryStream + Send + Sync + 'static,
92 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
93 Bytes: From<S::Ok>,
94 {
95 Body::stream(stream)
96 }
97
98 #[cfg(any(feature = "stream", feature = "multipart", feature = "blocking"))]
99 pub(crate) fn stream<S>(stream: S) -> Body
100 where
101 S: futures_core::stream::TryStream + Send + Sync + 'static,
102 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
103 Bytes: From<S::Ok>,
104 {
105 use futures_util::TryStreamExt;
106 use http_body::Frame;
107 use http_body_util::StreamBody;
108
109 let body = http_body_util::BodyExt::boxed(StreamBody::new(
110 stream
111 .map_ok(|d| Frame::data(Bytes::from(d)))
112 .map_err(Into::into),
113 ));
114 Body {
115 inner: Inner::Streaming(body),
116 }
117 }
118
119 pub(crate) fn empty() -> Body {
131 Body::reusable(Bytes::new())
132 }
133
134 pub(crate) fn reusable(chunk: Bytes) -> Body {
135 Body {
136 inner: Inner::Reusable(chunk),
137 }
138 }
139
140 pub(crate) fn streaming<B>(inner: B) -> Body
142 where
143 B: HttpBody + Send + Sync + 'static,
144 B::Data: Into<Bytes>,
145 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
146 {
147 use http_body_util::BodyExt;
148
149 let boxed = inner
150 .map_frame(|f| f.map_data(Into::into))
151 .map_err(Into::into)
152 .boxed();
153
154 Body {
155 inner: Inner::Streaming(boxed),
156 }
157 }
158
159 pub(crate) fn try_reuse(self) -> (Option<Bytes>, Self) {
160 let reuse = match self.inner {
161 Inner::Reusable(ref chunk) => Some(chunk.clone()),
162 Inner::Streaming { .. } => None,
163 };
164
165 (reuse, self)
166 }
167
168 pub(crate) fn try_clone(&self) -> Option<Body> {
169 match self.inner {
170 Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())),
171 Inner::Streaming { .. } => None,
172 }
173 }
174
175 #[cfg(feature = "multipart")]
176 pub(crate) fn into_stream(self) -> DataStream<Body> {
177 DataStream(self)
178 }
179
180 #[cfg(feature = "multipart")]
181 pub(crate) fn content_length(&self) -> Option<u64> {
182 match self.inner {
183 Inner::Reusable(ref bytes) => Some(bytes.len() as u64),
184 Inner::Streaming(ref body) => body.size_hint().exact(),
185 }
186 }
187}
188
189impl Default for Body {
190 #[inline]
191 fn default() -> Body {
192 Body::empty()
193 }
194}
195
196impl From<Bytes> for Body {
210 #[inline]
211 fn from(bytes: Bytes) -> Body {
212 Body::reusable(bytes)
213 }
214}
215
216impl From<Vec<u8>> for Body {
217 #[inline]
218 fn from(vec: Vec<u8>) -> Body {
219 Body::reusable(vec.into())
220 }
221}
222
223impl From<&'static [u8]> for Body {
224 #[inline]
225 fn from(s: &'static [u8]) -> Body {
226 Body::reusable(Bytes::from_static(s))
227 }
228}
229
230impl From<String> for Body {
231 #[inline]
232 fn from(s: String) -> Body {
233 Body::reusable(s.into())
234 }
235}
236
237impl From<&'static str> for Body {
238 #[inline]
239 fn from(s: &'static str) -> Body {
240 s.as_bytes().into()
241 }
242}
243
244#[cfg(feature = "stream")]
245#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
246impl From<File> for Body {
247 #[inline]
248 fn from(file: File) -> Body {
249 Body::wrap_stream(ReaderStream::new(file))
250 }
251}
252
253impl fmt::Debug for Body {
254 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
255 f.debug_struct("Body").finish()
256 }
257}
258
259impl HttpBody for Body {
260 type Data = Bytes;
261 type Error = crate::Error;
262
263 fn poll_frame(
264 mut self: Pin<&mut Self>,
265 cx: &mut Context,
266 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
267 match self.inner {
268 Inner::Reusable(ref mut bytes) => {
269 let out = bytes.split_off(0);
270 if out.is_empty() {
271 Poll::Ready(None)
272 } else {
273 Poll::Ready(Some(Ok(hyper::body::Frame::data(out))))
274 }
275 }
276 Inner::Streaming(ref mut body) => Poll::Ready(
277 futures_core::ready!(Pin::new(body).poll_frame(cx))
278 .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
279 ),
280 }
281 }
282
283 fn size_hint(&self) -> http_body::SizeHint {
284 match self.inner {
285 Inner::Reusable(ref bytes) => http_body::SizeHint::with_exact(bytes.len() as u64),
286 Inner::Streaming(ref body) => body.size_hint(),
287 }
288 }
289
290 fn is_end_stream(&self) -> bool {
291 match self.inner {
292 Inner::Reusable(ref bytes) => bytes.is_empty(),
293 Inner::Streaming(ref body) => body.is_end_stream(),
294 }
295 }
296}
297
298pub(crate) fn total_timeout<B>(body: B, timeout: Pin<Box<Sleep>>) -> TotalTimeoutBody<B> {
301 TotalTimeoutBody {
302 inner: body,
303 timeout,
304 }
305}
306
307pub(crate) fn with_read_timeout<B>(body: B, timeout: Duration) -> ReadTimeoutBody<B> {
308 ReadTimeoutBody {
309 inner: body,
310 sleep: None,
311 timeout,
312 }
313}
314
315impl<B> hyper::body::Body for TotalTimeoutBody<B>
316where
317 B: hyper::body::Body,
318 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
319{
320 type Data = B::Data;
321 type Error = crate::Error;
322
323 fn poll_frame(
324 self: Pin<&mut Self>,
325 cx: &mut Context,
326 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
327 let this = self.project();
328 if let Poll::Ready(()) = this.timeout.as_mut().poll(cx) {
329 return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
330 }
331 Poll::Ready(
332 futures_core::ready!(this.inner.poll_frame(cx))
333 .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
334 )
335 }
336
337 #[inline]
338 fn size_hint(&self) -> http_body::SizeHint {
339 self.inner.size_hint()
340 }
341
342 #[inline]
343 fn is_end_stream(&self) -> bool {
344 self.inner.is_end_stream()
345 }
346}
347
348impl<B> hyper::body::Body for ReadTimeoutBody<B>
349where
350 B: hyper::body::Body,
351 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
352{
353 type Data = B::Data;
354 type Error = crate::Error;
355
356 fn poll_frame(
357 self: Pin<&mut Self>,
358 cx: &mut Context,
359 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
360 let mut this = self.project();
361
362 let sleep_pinned = if let Some(some) = this.sleep.as_mut().as_pin_mut() {
364 some
365 } else {
366 this.sleep.set(Some(tokio::time::sleep(*this.timeout)));
367 this.sleep.as_mut().as_pin_mut().unwrap()
368 };
369
370 if let Poll::Ready(()) = sleep_pinned.poll(cx) {
372 return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
373 }
374
375 let item = futures_core::ready!(this.inner.poll_frame(cx))
376 .map(|opt_chunk| opt_chunk.map_err(crate::error::body));
377 this.sleep.set(None);
379 Poll::Ready(item)
380 }
381
382 #[inline]
383 fn size_hint(&self) -> http_body::SizeHint {
384 self.inner.size_hint()
385 }
386
387 #[inline]
388 fn is_end_stream(&self) -> bool {
389 self.inner.is_end_stream()
390 }
391}
392
393pub(crate) type ResponseBody =
394 http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
395
396pub(crate) fn boxed<B>(body: B) -> ResponseBody
397where
398 B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
399 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
400{
401 use http_body_util::BodyExt;
402
403 body.map_err(box_err).boxed()
404}
405
406pub(crate) fn response<B>(
407 body: B,
408 deadline: Option<Pin<Box<Sleep>>>,
409 read_timeout: Option<Duration>,
410) -> ResponseBody
411where
412 B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
413 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
414{
415 use http_body_util::BodyExt;
416
417 match (deadline, read_timeout) {
418 (Some(total), Some(read)) => {
419 let body = with_read_timeout(body, read).map_err(box_err);
420 total_timeout(body, total).map_err(box_err).boxed()
421 }
422 (Some(total), None) => total_timeout(body, total).map_err(box_err).boxed(),
423 (None, Some(read)) => with_read_timeout(body, read).map_err(box_err).boxed(),
424 (None, None) => body.map_err(box_err).boxed(),
425 }
426}
427
428fn box_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync>
429where
430 E: Into<Box<dyn std::error::Error + Send + Sync>>,
431{
432 err.into()
433}
434
435impl<B> futures_core::Stream for DataStream<B>
438where
439 B: HttpBody<Data = Bytes> + Unpin,
440{
441 type Item = Result<Bytes, B::Error>;
442
443 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
444 loop {
445 return match futures_core::ready!(Pin::new(&mut self.0).poll_frame(cx)) {
446 Some(Ok(frame)) => {
447 if let Ok(buf) = frame.into_data() {
449 Poll::Ready(Some(Ok(buf)))
450 } else {
451 continue;
452 }
453 }
454 Some(Err(err)) => Poll::Ready(Some(Err(err))),
455 None => Poll::Ready(None),
456 };
457 }
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use http_body::Body as _;
464
465 use super::Body;
466
467 #[test]
468 fn test_as_bytes() {
469 let test_data = b"Test body";
470 let body = Body::from(&test_data[..]);
471 assert_eq!(body.as_bytes(), Some(&test_data[..]));
472 }
473
474 #[test]
475 fn body_exact_length() {
476 let empty_body = Body::empty();
477 assert!(empty_body.is_end_stream());
478 assert_eq!(empty_body.size_hint().exact(), Some(0));
479
480 let bytes_body = Body::reusable("abc".into());
481 assert!(!bytes_body.is_end_stream());
482 assert_eq!(bytes_body.size_hint().exact(), Some(3));
483
484 let stream_body = Body::streaming(bytes_body);
485 assert!(!stream_body.is_end_stream());
486 assert_eq!(stream_body.size_hint().exact(), None);
487 }
488}