Skip to main content

hyper_body_utils/
lib.rs

1#[cfg(feature = "http3")]
2use bytes::Buf;
3#[cfg(feature = "http3")]
4use futures::ready;
5
6use bytes::{Bytes};
7use futures::{stream, FutureExt, Stream, TryStreamExt};
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11#[cfg(feature = "http3")]
12use h3::client::RequestStream as ClientRequestStream;
13#[cfg(feature = "http3")]
14use h3::server::RequestStream as ServerRequestStream;
15#[cfg(feature = "http3")]
16use h3_quinn::RecvStream;
17
18pub use http_body_util::BodyExt;
19
20use http_body_util::{combinators::BoxBody, StreamBody};
21use hyper::body::{Body, Frame, Incoming};
22
23#[cfg(feature = "smol-rt")]
24use smol::fs::File;
25#[cfg(feature = "smol-rt")]
26use smol::io::AsyncReadExt;
27
28#[cfg(feature = "tokio-rt")]
29use tokio::fs::File;
30
31#[cfg(test)]
32mod tests;
33
34pub enum HttpBody {
35    Incoming(Incoming),
36    Stream(BoxBody<Bytes, std::io::Error>),
37    #[cfg(feature = "http3")]
38    QuicClientIncoming(ClientRequestStream<RecvStream, Bytes>),
39    #[cfg(feature = "http3")]
40    QuicServerIncoming(ServerRequestStream<RecvStream, Bytes>),
41}
42
43impl HttpBody {
44    pub fn from_incoming(incoming: Incoming) -> Self {
45        HttpBody::Incoming(incoming)
46    }
47
48    #[cfg(feature = "http3")]
49    pub fn from_quic_client(stream: ClientRequestStream<RecvStream, Bytes>) -> Self {
50        HttpBody::QuicClientIncoming(stream)
51    }
52
53    #[cfg(feature = "http3")]
54    pub fn from_quic_server(stream: ServerRequestStream<RecvStream, Bytes>) -> Self {
55        HttpBody::QuicServerIncoming(stream)
56    }
57
58    pub fn from_text(text: &str) -> Self {
59        Self::from_bytes(text.as_bytes())
60    }
61
62    pub fn from_file(file: File) -> Self {
63        #[cfg(feature = "tokio-rt")]
64        {
65            let content = tokio_util::io::ReaderStream::new(file).map_ok(Frame::data);
66            let body = StreamBody::new(content);
67            HttpBody::Stream(BodyExt::boxed(body))
68        }
69
70        #[cfg(feature = "smol-rt")]
71        {
72            let content = file
73                .bytes()
74                .map_ok(|data| Frame::data(Bytes::copy_from_slice(&[data])));
75            let body = StreamBody::new(content);
76            HttpBody::Stream(BodyExt::boxed(body))
77        }
78    }
79
80    pub fn from_bytes(bytes: &[u8]) -> Self {
81        #[cfg(feature = "tokio-rt")]
82        {
83            let all_bytes = Bytes::copy_from_slice(bytes);
84            let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
85            let body = StreamBody::new(content);
86            HttpBody::Stream(BodyExt::boxed(body))
87        }
88
89        #[cfg(feature = "smol-rt")]
90        {
91            let all_bytes = Bytes::copy_from_slice(bytes);
92            let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
93            let body = StreamBody::new(content);
94            HttpBody::Stream(BodyExt::boxed(body))
95        }
96    }
97}
98
99impl Body for HttpBody {
100    type Data = Bytes;
101
102    type Error = std::io::Error;
103
104    fn poll_frame(
105        self: Pin<&mut Self>,
106        cx: &mut Context<'_>,
107    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
108        match self.get_mut() {
109            HttpBody::Incoming(incoming) => incoming
110                .frame()
111                .poll_unpin(cx)
112                .map_err(std::io::Error::other),
113            HttpBody::Stream(stream) => {
114                stream.frame().poll_unpin(cx).map_err(std::io::Error::other)
115            }
116            #[cfg(feature = "http3")]
117            HttpBody::QuicClientIncoming(stream) => match ready!(stream.poll_recv_data(cx)) {
118                Ok(frame) => match frame {
119                    Some(mut frame) => Poll::Ready(Some(Ok(Frame::data(
120                        frame.copy_to_bytes(frame.remaining()),
121                    )))),
122                    None => {
123                        cx.waker().wake_by_ref();
124                        Poll::Ready(None)
125                    }
126                },
127                Err(e) => {
128                    println!("Error polling frame: {}", e);
129                    Poll::Ready(Some(Err(std::io::Error::other(e))))
130                }
131            },
132            #[cfg(feature = "http3")]
133            HttpBody::QuicServerIncoming(stream) => match ready!(stream.poll_recv_data(cx)) {
134                Ok(frame) => match frame {
135                    Some(mut frame) => Poll::Ready(Some(Ok(Frame::data(
136                        frame.copy_to_bytes(frame.remaining()),
137                    )))),
138                    None => {
139                        cx.waker().wake_by_ref();
140                        Poll::Ready(None)
141                    }
142                },
143                Err(e) => Poll::Ready(Some(Err(std::io::Error::other(e)))),
144            },
145        }
146    }
147}
148
149impl Stream for HttpBody {
150    type Item = Result<Frame<Bytes>, std::io::Error>;
151
152    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
153        self.poll_frame(cx)
154    }
155}
156
157/*
158pub struct FileStream {
159    file: OwnedMutexGuard<File>,
160}
161
162impl FileStream {
163    pub fn new(file: OwnedMutexGuard<File>) -> Self {
164        FileStream { file }
165    }
166}
167
168impl Stream for FileStream {
169    type Item = Result<Frame<Bytes>, std::io::Error>;
170
171    #[hotpath::measure]
172    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
173        let mut me = self.as_mut();
174        let mut buf = BytesMut::with_capacity(70);
175        let mut read_buf = Box::pin(me.file.read_buf(&mut buf));
176        match read_buf.poll_unpin(cx) {
177            Poll::Ready(value) => match value {
178                Ok(size) => {
179                    if size == 0 {
180                        Poll::Ready(None)
181                    } else {
182                        Poll::Ready(Some(Ok(Frame::data(buf.into()))))
183                    }
184                }
185                Err(e) => Poll::Ready(Some(Err(e))),
186            },
187            Poll::Pending => Poll::Pending,
188        }
189    }
190}
191*/