client_util/
body.rs

1use bytes::Bytes;
2#[cfg(feature = "stream")]
3use futures_core::Stream;
4use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
5pub type Body = BoxBody<Bytes, crate::error::BoxError>;
6
7/// Create a new full body.
8pub fn full<B>(body: B) -> Full<Bytes>
9where
10    B: Into<Bytes>,
11{
12    let body = body.into();
13    Full::new(body)
14}
15
16pub fn boxed_full<B>(body: B) -> Body
17where
18    B: Into<Bytes>,
19{
20    full(body).map_err(crate::util::never).boxed()
21}
22
23/// Create a new empty body.
24pub fn empty() -> Empty<Bytes> {
25    Empty::new()
26}
27
28pub fn boxed_empty() -> Body {
29    empty().map_err(crate::util::never).boxed()
30}
31
32#[cfg(feature = "stream")]
33#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
34pub fn stream<S>(stream: S) -> http_body_util::StreamBody<S>
35where
36    S: Stream<Item = Result<http_body::Frame<Bytes>, crate::error::BoxError>> + Send + 'static,
37{
38    http_body_util::StreamBody::new(stream)
39}
40
41pub fn boxed_stream<S>(s: S) -> Body
42where
43    S: Stream<Item = Result<http_body::Frame<Bytes>, crate::error::BoxError>>
44        + Send
45        + Sync
46        + 'static,
47{
48    stream(s).map_err(crate::util::never).boxed()
49}
50
51#[cfg(feature = "io-tokio")]
52#[cfg_attr(docsrs, doc(cfg(feature = "io-tokio")))]
53pub fn tokio_async_read<R>(reader: R) -> Body
54where
55    R: tokio::io::AsyncRead + Send + Sync + 'static,
56{
57    use futures_util::TryStreamExt;
58    use http_body::Frame;
59    use http_body_util::BodyExt;
60
61    let stream = tokio_util::io::ReaderStream::new(reader);
62    let body = http_body_util::StreamBody::new(
63        stream
64            .map_ok(Frame::data)
65            .map_err(|e| Box::new(e) as crate::error::BoxError),
66    );
67    body.boxed()
68}