Skip to main content

hyper_body_utils/
lib.rs

1#[cfg(feature = "http3")]
2use bytes::Buf;
3
4use bytes::Bytes;
5use futures::{stream, FutureExt, Stream, TryStreamExt};
6
7#[cfg(feature = "http3")]
8use h3::client::RequestStream as ClientRequestStream;
9#[cfg(feature = "http3")]
10use h3::server::RequestStream as ServerRequestStream;
11#[cfg(feature = "http3")]
12use h3_quinn::{SendStream, RecvStream};
13
14use http_body_util::{combinators::BoxBody, BodyExt, StreamBody};
15use hyper::body::{Body, Frame, Incoming};
16
17#[cfg(feature = "smol-rt")]
18use smol::fs::File;
19#[cfg(feature = "smol-rt")]
20use smol::io::AsyncReadExt;
21
22#[cfg(feature = "tokio-rt")]
23use tokio::fs::File;
24#[cfg(feature = "tokio-rt")]
25use tokio_util::io::ReaderStream;
26
27#[cfg(test)]
28mod tests;
29
30pub enum HttpBody {
31    Incoming(Incoming),
32    Stream(BoxBody<Bytes, std::io::Error>),
33    #[cfg(feature = "http3")]
34    QuicClientIncoming(ClientRequestStream<RecvStream, Bytes>),
35    #[cfg(feature = "http3")]
36    QuicServerIncoming(ServerRequestStream<RecvStream, Bytes>),
37}
38
39impl HttpBody {
40    pub fn from_incoming(incoming: Incoming) -> Self {
41        HttpBody::Incoming(incoming)
42    }
43
44    #[cfg(feature = "http3")]
45    pub fn from_quic_client(stream: ClientRequestStream<RecvStream, Bytes>) -> Self {
46        HttpBody::QuicClientIncoming(stream)
47    }
48
49    #[cfg(feature = "http3")]
50    pub fn from_quic_server(stream: ServerRequestStream<RecvStream, Bytes>) -> Self {
51        HttpBody::QuicServerIncoming(stream)
52    }
53
54    pub fn from_text(text: &str) -> Self {
55        Self::from_bytes(text.as_bytes())
56    }
57
58    pub fn from_file(file: File) -> Self {
59        #[cfg(feature = "tokio-rt")]
60        {
61            let content = ReaderStream::new(file).map_ok(Frame::data);
62            let body = StreamBody::new(content);
63            HttpBody::Stream(body.boxed())
64        }
65
66        #[cfg(feature = "smol-rt")]
67        {
68            // TODO: This is not right, I'm mapping all slices and placing them in memory
69            let content = file
70                .bytes()
71                .map_ok(|data| Frame::data(
72                  Bytes::copy_from_slice(&[data])));
73            let body = StreamBody::new(content);
74            HttpBody::Stream(body.boxed())
75        }
76    }
77
78    pub fn from_bytes(bytes: &[u8]) -> Self {
79        #[cfg(feature = "tokio-rt")]
80        {
81            let all_bytes = Bytes::copy_from_slice(bytes);
82            let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
83            let body = StreamBody::new(content);
84            HttpBody::Stream(body.boxed())
85        }
86
87        #[cfg(feature = "smol-rt")]
88        {
89            let all_bytes = Bytes::copy_from_slice(bytes);
90            let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
91            let body = StreamBody::new(content);
92            HttpBody::Stream(body.boxed())
93        }
94    }
95}
96
97impl Body for HttpBody {
98    type Data = Bytes;
99
100    type Error = std::io::Error;
101
102    fn poll_frame(
103        self: std::pin::Pin<&mut Self>,
104        cx: &mut std::task::Context<'_>,
105    ) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
106        match self.get_mut() {
107            HttpBody::Incoming(incoming) => incoming
108                .frame()
109                .poll_unpin(cx)
110                .map_err(std::io::Error::other),
111            HttpBody::Stream(stream) => {
112                stream.frame().poll_unpin(cx).map_err(std::io::Error::other)
113            }
114            #[cfg(feature = "http3")]
115            HttpBody::QuicClientIncoming(stream) => stream.poll_recv_data(cx).map(|value| match value {
116                Ok(Some(mut value)) => {
117                    Some(Ok(Frame::data(value.copy_to_bytes(value.remaining()))))
118                }
119                Ok(None) => None,
120                Err(e) => Some(Err(std::io::Error::other(e))),
121            }),
122            #[cfg(feature = "http3")]
123            HttpBody::QuicServerIncoming(stream) => stream.poll_recv_data(cx).map(|value| match value {
124                Ok(Some(mut value)) => {
125                    Some(Ok(Frame::data(value.copy_to_bytes(value.remaining()))))
126                }
127                Ok(None) => None,
128                Err(e) => Some(Err(std::io::Error::other(e))),
129            }),
130        }
131    }
132}
133
134impl Stream for HttpBody {
135    type Item = Result<Frame<Bytes>, std::io::Error>;
136
137    fn poll_next(
138        self: std::pin::Pin<&mut Self>,
139        cx: &mut std::task::Context<'_>,
140    ) -> std::task::Poll<Option<Self::Item>> {
141        self.poll_frame(cx)
142    }
143}