Skip to main content

hyper_body_utils/
lib.rs

1use bytes::Bytes;
2use futures::{FutureExt, Stream, TryStreamExt, stream};
3use http_body_util::{BodyExt, StreamBody, combinators::BoxBody};
4use hyper::body::{Body, Frame, Incoming};
5
6#[cfg(feature = "smol-rt")]
7use smol::fs::File;
8#[cfg(feature = "smol-rt")]
9use smol::io::AsyncReadExt;
10
11#[cfg(feature = "tokio-rt")]
12use tokio::fs::File;
13#[cfg(feature = "tokio-rt")]
14use tokio_util::io::ReaderStream;
15
16pub enum HttpBody {
17    Incoming(Incoming),
18    Stream(BoxBody<Bytes, std::io::Error>),
19}
20
21impl HttpBody {
22    pub fn from_incoming(incoming: Incoming) -> Self {
23        HttpBody::Incoming(incoming)
24    }
25
26    pub fn from_text(text: &str) -> Self {
27        Self::from_bytes(text.as_bytes())
28    }
29
30    pub fn from_file(file: File) -> Self {
31        #[cfg(feature = "tokio-rt")]
32        {
33            let content = ReaderStream::new(file).map_ok(Frame::data);
34            let body = StreamBody::new(content);
35            HttpBody::Stream(body.boxed())
36        }
37
38        #[cfg(feature = "smol-rt")]
39        {
40            // TODO: This is not right, I'm mapping all slices and placing them in memory
41            let content = file
42                .bytes()
43                .map_ok(|data| Frame::data(bytes::Bytes::copy_from_slice(&[data])));
44            let body = StreamBody::new(content);
45            HttpBody::Stream(body.boxed())
46        }
47    }
48
49    pub fn from_bytes(bytes: &[u8]) -> Self {
50        #[cfg(feature = "tokio-rt")]
51        {
52            let all_bytes = Bytes::copy_from_slice(bytes);
53            let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
54            let body = StreamBody::new(content);
55            HttpBody::Stream(body.boxed())
56        }
57
58        #[cfg(feature = "smol-rt")]
59        {
60            let all_bytes = Bytes::copy_from_slice(bytes);
61            let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
62            let body = StreamBody::new(content);
63            HttpBody::Stream(body.boxed())
64        }
65    }
66}
67
68impl Body for HttpBody {
69    type Data = Bytes;
70
71    type Error = std::io::Error;
72
73    fn poll_frame(
74        self: std::pin::Pin<&mut Self>,
75        cx: &mut std::task::Context<'_>,
76    ) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
77        match self.get_mut() {
78            HttpBody::Incoming(incoming) => incoming
79                .frame()
80                .poll_unpin(cx)
81                .map_err(std::io::Error::other),
82            HttpBody::Stream(stream) => {
83                stream.frame().poll_unpin(cx).map_err(std::io::Error::other)
84            }
85        }
86    }
87}
88
89impl Stream for HttpBody {
90    type Item = Result<Frame<Bytes>, std::io::Error>;
91
92    fn poll_next(
93        self: std::pin::Pin<&mut Self>,
94        cx: &mut std::task::Context<'_>,
95    ) -> std::task::Poll<Option<Self::Item>> {
96        self.poll_frame(cx)
97    }
98}