Skip to main content

hyper_body_utils/
lib.rs

1#[cfg(feature = "http3")]
2use bytes::Buf;
3#[cfg(feature = "http3")]
4use futures::ready;
5
6use bytes::{Bytes};
7use futures::{stream, FutureExt, Stream, TryStreamExt};
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11#[cfg(feature = "http3")]
12use h3::client::RequestStream as ClientRequestStream;
13#[cfg(feature = "http3")]
14use h3::server::RequestStream as ServerRequestStream;
15#[cfg(feature = "http3")]
16use h3_quinn::RecvStream;
17
18pub use http_body_util::BodyExt;
19
20use http_body_util::{combinators::BoxBody, StreamBody};
21use hyper::body::{Body, Frame, Incoming};
22
23#[cfg(feature = "smol-rt")]
24use smol::fs::File;
25
26#[cfg(feature = "tokio-rt")]
27use tokio::fs::File;
28
29#[cfg(test)]
30mod tests;
31
32pub enum HttpBody {
33    Incoming(Incoming),
34    Stream(BoxBody<Bytes, std::io::Error>),
35    #[cfg(feature = "http3")]
36    QuicClientIncoming(ClientRequestStream<RecvStream, Bytes>),
37    #[cfg(feature = "http3")]
38    QuicServerIncoming(ServerRequestStream<RecvStream, Bytes>),
39}
40
41impl HttpBody {
42    pub fn from_incoming(incoming: Incoming) -> Self {
43        HttpBody::Incoming(incoming)
44    }
45
46    #[cfg(feature = "http3")]
47    pub fn from_quic_client(stream: ClientRequestStream<RecvStream, Bytes>) -> Self {
48        HttpBody::QuicClientIncoming(stream)
49    }
50
51    #[cfg(feature = "http3")]
52    pub fn from_quic_server(stream: ServerRequestStream<RecvStream, Bytes>) -> Self {
53        HttpBody::QuicServerIncoming(stream)
54    }
55
56    pub fn from_text(text: &str) -> Self {
57        Self::from_bytes(text.as_bytes())
58    }
59
60    pub fn from_file(file: File) -> Self {
61        #[cfg(feature = "tokio-rt")]
62        {
63            let content = tokio_util::io::ReaderStream::new(file).map_ok(Frame::data);
64            let body = StreamBody::new(content);
65            HttpBody::Stream(BodyExt::boxed(body))
66        }
67
68        #[cfg(feature = "smol-rt")]
69        {
70            let content = file
71                .bytes()
72                .map_ok(|data| Frame::data(Bytes::copy_from_slice(&[data])));
73            let body = StreamBody::new(content);
74            HttpBody::Stream(BodyExt::boxed(body))
75        }
76    }
77
78    pub fn from_bytes(bytes: &[u8]) -> Self {
79        #[cfg(feature = "tokio-rt")]
80        {
81            let all_bytes = Bytes::copy_from_slice(bytes);
82            let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
83            let body = StreamBody::new(content);
84            HttpBody::Stream(BodyExt::boxed(body))
85        }
86
87        #[cfg(feature = "smol-rt")]
88        {
89            let all_bytes = Bytes::copy_from_slice(bytes);
90            let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
91            let body = StreamBody::new(content);
92            HttpBody::Stream(BodyExt::boxed(body))
93        }
94    }
95}
96
97impl Body for HttpBody {
98    type Data = Bytes;
99
100    type Error = std::io::Error;
101
102    fn poll_frame(
103        self: Pin<&mut Self>,
104        cx: &mut Context<'_>,
105    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
106        match self.get_mut() {
107            HttpBody::Incoming(incoming) => incoming
108                .frame()
109                .poll_unpin(cx)
110                .map_err(std::io::Error::other),
111            HttpBody::Stream(stream) => {
112                stream.frame().poll_unpin(cx).map_err(std::io::Error::other)
113            }
114            #[cfg(feature = "http3")]
115            HttpBody::QuicClientIncoming(stream) => match ready!(stream.poll_recv_data(cx)) {
116                Ok(frame) => match frame {
117                    Some(mut frame) => Poll::Ready(Some(Ok(Frame::data(
118                        frame.copy_to_bytes(frame.remaining()),
119                    )))),
120                    None => {
121                        cx.waker().wake_by_ref();
122                        Poll::Ready(None)
123                    }
124                },
125                Err(e) => {
126                    println!("Error polling frame: {}", e);
127                    Poll::Ready(Some(Err(std::io::Error::other(e))))
128                }
129            },
130            #[cfg(feature = "http3")]
131            HttpBody::QuicServerIncoming(stream) => match ready!(stream.poll_recv_data(cx)) {
132                Ok(frame) => match frame {
133                    Some(mut frame) => Poll::Ready(Some(Ok(Frame::data(
134                        frame.copy_to_bytes(frame.remaining()),
135                    )))),
136                    None => {
137                        cx.waker().wake_by_ref();
138                        Poll::Ready(None)
139                    }
140                },
141                Err(e) => Poll::Ready(Some(Err(std::io::Error::other(e)))),
142            },
143        }
144    }
145}
146
147impl Stream for HttpBody {
148    type Item = Result<Frame<Bytes>, std::io::Error>;
149
150    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
151        self.poll_frame(cx)
152    }
153}
154
155/*
156pub struct FileStream {
157    file: OwnedMutexGuard<File>,
158}
159
160impl FileStream {
161    pub fn new(file: OwnedMutexGuard<File>) -> Self {
162        FileStream { file }
163    }
164}
165
166impl Stream for FileStream {
167    type Item = Result<Frame<Bytes>, std::io::Error>;
168
169    #[hotpath::measure]
170    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
171        let mut me = self.as_mut();
172        let mut buf = BytesMut::with_capacity(70);
173        let mut read_buf = Box::pin(me.file.read_buf(&mut buf));
174        match read_buf.poll_unpin(cx) {
175            Poll::Ready(value) => match value {
176                Ok(size) => {
177                    if size == 0 {
178                        Poll::Ready(None)
179                    } else {
180                        Poll::Ready(Some(Ok(Frame::data(buf.into()))))
181                    }
182                }
183                Err(e) => Poll::Ready(Some(Err(e))),
184            },
185            Poll::Pending => Poll::Pending,
186        }
187    }
188}
189*/