Skip to main content

hyper_body_utils/
lib.rs

1use bytes::Bytes;
2use futures::{FutureExt, Stream, TryStreamExt, stream};
3use http_body_util::{BodyExt, StreamBody, combinators::BoxBody};
4use hyper::body::{Body, Frame, Incoming};
5
6#[cfg(feature = "smol-rt")]
7use smol::fs::File;
8
9#[cfg(feature = "tokio-rt")]
10use tokio::fs::File;
11#[cfg(feature = "tokio-rt")]
12use tokio_util::io::ReaderStream;
13
14pub enum HttpBody {
15    Incoming(Incoming),
16    Stream(BoxBody<Bytes, std::io::Error>),
17}
18
19impl HttpBody {
20    pub fn from_incoming(incoming: Incoming) -> Self {
21        HttpBody::Incoming(incoming)
22    }
23
24    pub fn from_text(text: &str) -> Self {
25        Self::from_bytes(text.as_bytes())
26    }
27
28    pub fn from_file(file: File) -> Self {
29        #[cfg(feature = "tokio-rt")]
30        {
31            let content = ReaderStream::new(file).map_ok(Frame::data);
32            let body = StreamBody::new(content);
33            HttpBody::Stream(body.boxed())
34        }
35
36        #[cfg(feature = "smol-rt")]
37        {
38            // TODO: This is not right, I'm mapping all slices and placing them in memory
39            let content = file
40                .bytes()
41                .map_ok(|data| Frame::data(bytes::Bytes::copy_from_slice(&[data])));
42            let body = StreamBody::new(content);
43            HttpBody::Stream(body.boxed());
44        }
45    }
46
47    pub fn from_bytes(bytes: &[u8]) -> Self {
48        #[cfg(feature = "tokio-rt")]
49        {
50            let all_bytes = Bytes::copy_from_slice(bytes);
51            let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
52            let body = StreamBody::new(content);
53            HttpBody::Stream(body.boxed())
54        }
55
56        #[cfg(feature = "smol-rt")]
57        {
58            let all_bytes = Bytes::copy_from_slice(request.as_ref().raw_body());
59            let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
60            let body = StreamBody::new(content);
61            HttpBody::Stream(body.boxed())
62        }
63    }
64}
65
66impl Body for HttpBody {
67    type Data = Bytes;
68
69    type Error = std::io::Error;
70
71    fn poll_frame(
72        self: std::pin::Pin<&mut Self>,
73        cx: &mut std::task::Context<'_>,
74    ) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
75        match self.get_mut() {
76            HttpBody::Incoming(incoming) => incoming
77                .frame()
78                .poll_unpin(cx)
79                .map_err(std::io::Error::other),
80            HttpBody::Stream(stream) => {
81                stream.frame().poll_unpin(cx).map_err(std::io::Error::other)
82            }
83        }
84    }
85}
86
87impl Stream for HttpBody {
88    type Item = Result<Frame<Bytes>, std::io::Error>;
89
90    fn poll_next(
91        self: std::pin::Pin<&mut Self>,
92        cx: &mut std::task::Context<'_>,
93    ) -> std::task::Poll<Option<Self::Item>> {
94        self.poll_frame(cx)
95    }
96}