Skip to main content

hyper_body_utils/
lib.rs

1#[cfg(feature = "http3")]
2use bytes::Buf;
3
4use bytes::Bytes;
5use futures::{stream, FutureExt, Stream, TryStreamExt};
6
7#[cfg(feature = "http3")]
8use h3::client::RequestStream as ClientRequestStream;
9#[cfg(feature = "http3")]
10use h3::server::RequestStream as ServerRequestStream;
11#[cfg(feature = "http3")]
12use h3_quinn::{SendStream, RecvStream};
13
14use http_body_util::{combinators::BoxBody, BodyExt, StreamBody};
15use hyper::body::{Body, Frame, Incoming};
16
17#[cfg(feature = "smol-rt")]
18use smol::fs::File;
19#[cfg(feature = "smol-rt")]
20use smol::io::AsyncReadExt;
21
22#[cfg(feature = "tokio-rt")]
23use tokio::fs::File;
24#[cfg(feature = "tokio-rt")]
25use tokio_util::io::ReaderStream;
26
27pub enum HttpBody {
28    Incoming(Incoming),
29    Stream(BoxBody<Bytes, std::io::Error>),
30    #[cfg(feature = "http3")]
31    QuicClientIncoming(ClientRequestStream<RecvStream, Bytes>),
32    #[cfg(feature = "http3")]
33    QuicServerIncoming(ServerRequestStream<RecvStream, Bytes>),
34}
35
36impl HttpBody {
37    pub fn from_incoming(incoming: Incoming) -> Self {
38        HttpBody::Incoming(incoming)
39    }
40
41    #[cfg(feature = "http3")]
42    pub fn from_quic_client(stream: ClientRequestStream<RecvStream, Bytes>) -> Self {
43        HttpBody::QuicClientIncoming(stream)
44    }
45
46    #[cfg(feature = "http3")]
47    pub fn from_quic_server(stream: ServerRequestStream<RecvStream, Bytes>) -> Self {
48        HttpBody::QuicServerIncoming(stream)
49    }
50
51    pub fn from_text(text: &str) -> Self {
52        Self::from_bytes(text.as_bytes())
53    }
54
55    pub fn from_file(file: File) -> Self {
56        #[cfg(feature = "tokio-rt")]
57        {
58            let content = ReaderStream::new(file).map_ok(Frame::data);
59            let body = StreamBody::new(content);
60            HttpBody::Stream(body.boxed())
61        }
62
63        #[cfg(feature = "smol-rt")]
64        {
65            // TODO: This is not right, I'm mapping all slices and placing them in memory
66            let content = file
67                .bytes()
68                .map_ok(|data| Frame::data(bytes::Bytes::copy_from_slice(&[data])));
69            let body = StreamBody::new(content);
70            HttpBody::Stream(body.boxed())
71        }
72    }
73
74    pub fn from_bytes(bytes: &[u8]) -> Self {
75        #[cfg(feature = "tokio-rt")]
76        {
77            let all_bytes = Bytes::copy_from_slice(bytes);
78            let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
79            let body = StreamBody::new(content);
80            HttpBody::Stream(body.boxed())
81        }
82
83        #[cfg(feature = "smol-rt")]
84        {
85            let all_bytes = Bytes::copy_from_slice(bytes);
86            let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
87            let body = StreamBody::new(content);
88            HttpBody::Stream(body.boxed())
89        }
90    }
91}
92
93impl Body for HttpBody {
94    type Data = Bytes;
95
96    type Error = std::io::Error;
97
98    fn poll_frame(
99        self: std::pin::Pin<&mut Self>,
100        cx: &mut std::task::Context<'_>,
101    ) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
102        match self.get_mut() {
103            HttpBody::Incoming(incoming) => incoming
104                .frame()
105                .poll_unpin(cx)
106                .map_err(std::io::Error::other),
107            HttpBody::Stream(stream) => {
108                stream.frame().poll_unpin(cx).map_err(std::io::Error::other)
109            }
110            #[cfg(feature = "http3")]
111            HttpBody::QuicClientIncoming(stream) => stream.poll_recv_data(cx).map(|value| match value {
112                Ok(Some(mut value)) => {
113                    Some(Ok(Frame::data(value.copy_to_bytes(value.remaining()))))
114                }
115                Ok(None) => None,
116                Err(e) => Some(Err(std::io::Error::other(e))),
117            }),
118            #[cfg(feature = "http3")]
119            HttpBody::QuicServerIncoming(stream) => stream.poll_recv_data(cx).map(|value| match value {
120                Ok(Some(mut value)) => {
121                    Some(Ok(Frame::data(value.copy_to_bytes(value.remaining()))))
122                }
123                Ok(None) => None,
124                Err(e) => Some(Err(std::io::Error::other(e))),
125            }),
126        }
127    }
128}
129
130impl Stream for HttpBody {
131    type Item = Result<Frame<Bytes>, std::io::Error>;
132
133    fn poll_next(
134        self: std::pin::Pin<&mut Self>,
135        cx: &mut std::task::Context<'_>,
136    ) -> std::task::Poll<Option<Self::Item>> {
137        self.poll_frame(cx)
138    }
139}