Skip to main content

hyper_body_utils/
lib.rs

1#[cfg(feature = "http3")]
2use bytes::Buf;
3
4use bytes::Bytes;
5use futures::{ready, stream, FutureExt, Stream, TryStreamExt};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9#[cfg(feature = "http3")]
10use h3::client::RequestStream as ClientRequestStream;
11#[cfg(feature = "http3")]
12use h3::server::RequestStream as ServerRequestStream;
13#[cfg(feature = "http3")]
14use h3_quinn::{RecvStream};
15
16pub use http_body_util::BodyExt;
17
18use http_body_util::{combinators::BoxBody, StreamBody};
19use hyper::body::{Body, Frame, Incoming};
20
21#[cfg(feature = "smol-rt")]
22use smol::fs::File;
23#[cfg(feature = "smol-rt")]
24use smol::io::AsyncReadExt;
25
26#[cfg(feature = "tokio-rt")]
27use tokio::fs::File;
28#[cfg(feature = "tokio-rt")]
29use tokio_util::io::ReaderStream;
30
31#[cfg(test)]
32mod tests;
33
34pub enum HttpBody {
35    Incoming(Incoming),
36    Stream(BoxBody<Bytes, std::io::Error>),
37    #[cfg(feature = "http3")]
38    QuicClientIncoming(ClientRequestStream<RecvStream, Bytes>),
39    #[cfg(feature = "http3")]
40    QuicServerIncoming(ServerRequestStream<RecvStream, Bytes>),
41}
42
43impl HttpBody {
44    pub fn from_incoming(incoming: Incoming) -> Self {
45        HttpBody::Incoming(incoming)
46    }
47
48    #[cfg(feature = "http3")]
49    pub fn from_quic_client(stream: ClientRequestStream<RecvStream, Bytes>) -> Self {
50        HttpBody::QuicClientIncoming(stream)
51    }
52
53    #[cfg(feature = "http3")]
54    pub fn from_quic_server(stream: ServerRequestStream<RecvStream, Bytes>) -> Self {
55        HttpBody::QuicServerIncoming(stream)
56    }
57
58    pub fn from_text(text: &str) -> Self {
59        Self::from_bytes(text.as_bytes())
60    }
61
62    pub fn from_file(file: File) -> Self {
63        #[cfg(feature = "tokio-rt")]
64        {
65            let content = ReaderStream::new(file).map_ok(Frame::data);
66            let body = StreamBody::new(content);
67            HttpBody::Stream(body.boxed())
68        }
69
70        #[cfg(feature = "smol-rt")]
71        {
72            let content = file
73                .bytes()
74                .map_ok(|data| Frame::data(Bytes::copy_from_slice(&[data])));
75            let body = StreamBody::new(content);
76            HttpBody::Stream(body.boxed())
77        }
78    }
79
80    pub fn from_bytes(bytes: &[u8]) -> Self {
81        #[cfg(feature = "tokio-rt")]
82        {
83            let all_bytes = Bytes::copy_from_slice(bytes);
84            let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
85            let body = StreamBody::new(content);
86            HttpBody::Stream(body.boxed())
87        }
88
89        #[cfg(feature = "smol-rt")]
90        {
91            let all_bytes = Bytes::copy_from_slice(bytes);
92            let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
93            let body = StreamBody::new(content);
94            HttpBody::Stream(body.boxed())
95        }
96    }
97}
98
99impl Body for HttpBody {
100    type Data = Bytes;
101
102    type Error = std::io::Error;
103
104    fn poll_frame(
105        self: Pin<&mut Self>,
106        cx: &mut Context<'_>,
107    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
108        match self.get_mut() {
109            HttpBody::Incoming(incoming) => incoming
110                .frame()
111                .poll_unpin(cx)
112                .map_err(std::io::Error::other),
113            HttpBody::Stream(stream) => {
114                stream.frame().poll_unpin(cx).map_err(std::io::Error::other)
115            }
116            #[cfg(feature = "http3")]
117            HttpBody::QuicClientIncoming(stream) => match ready!(stream.poll_recv_data(cx)) {
118                Ok(frame) => match frame {
119                    Some(mut frame) => Poll::Ready(Some(Ok(Frame::data(
120                        frame.copy_to_bytes(frame.remaining()),
121                    )))),
122                    None => {
123                        cx.waker().wake_by_ref();
124                        Poll::Ready(None)
125                    }
126                },
127                Err(e) => {
128                    println!("Error polling frame: {}", e);
129                    Poll::Ready(Some(Err(std::io::Error::other(e))))
130                },
131            },
132            #[cfg(feature = "http3")]
133            HttpBody::QuicServerIncoming(stream) => match ready!(stream.poll_recv_data(cx)) {
134                Ok(frame) => match frame {
135                    Some(mut frame) => Poll::Ready(Some(Ok(Frame::data(
136                        frame.copy_to_bytes(frame.remaining()),
137                    )))),
138                    None => {
139                        cx.waker().wake_by_ref();
140                        Poll::Ready(None)
141                    }
142                },
143                Err(e) => Poll::Ready(Some(Err(std::io::Error::other(e)))),
144            },
145        }
146    }
147}
148
149impl Stream for HttpBody {
150    type Item = Result<Frame<Bytes>, std::io::Error>;
151
152    fn poll_next(
153        self: Pin<&mut Self>,
154        cx: &mut Context<'_>,
155    ) -> Poll<Option<Self::Item>> {
156        self.poll_frame(cx)
157    }
158}