1#[cfg(feature = "http3")]
2use bytes::Buf;
3#[cfg(feature = "http3")]
4use futures::ready;
5
6use bytes::{Bytes};
7use futures::{stream, FutureExt, Stream, TryStreamExt};
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11#[cfg(feature = "http3")]
12use h3::client::RequestStream as ClientRequestStream;
13#[cfg(feature = "http3")]
14use h3::server::RequestStream as ServerRequestStream;
15#[cfg(feature = "http3")]
16use h3_quinn::RecvStream;
17
18pub use http_body_util::BodyExt;
19
20use http_body_util::{combinators::BoxBody, StreamBody};
21use hyper::body::{Body, Frame, Incoming};
22
23#[cfg(feature = "smol-rt")]
24use smol::fs::File;
25#[cfg(feature = "smol-rt")]
26use smol::io::AsyncReadExt;
27
28#[cfg(feature = "tokio-rt")]
29use tokio::fs::File;
30
31#[cfg(test)]
32mod tests;
33
34pub enum HttpBody {
35 Incoming(Incoming),
36 Stream(BoxBody<Bytes, std::io::Error>),
37 #[cfg(feature = "http3")]
38 QuicClientIncoming(ClientRequestStream<RecvStream, Bytes>),
39 #[cfg(feature = "http3")]
40 QuicServerIncoming(ServerRequestStream<RecvStream, Bytes>),
41}
42
43impl HttpBody {
44 pub fn from_incoming(incoming: Incoming) -> Self {
45 HttpBody::Incoming(incoming)
46 }
47
48 #[cfg(feature = "http3")]
49 pub fn from_quic_client(stream: ClientRequestStream<RecvStream, Bytes>) -> Self {
50 HttpBody::QuicClientIncoming(stream)
51 }
52
53 #[cfg(feature = "http3")]
54 pub fn from_quic_server(stream: ServerRequestStream<RecvStream, Bytes>) -> Self {
55 HttpBody::QuicServerIncoming(stream)
56 }
57
58 pub fn from_text(text: &str) -> Self {
59 Self::from_bytes(text.as_bytes())
60 }
61
62 pub fn from_file(file: File) -> Self {
63 #[cfg(feature = "tokio-rt")]
64 {
65 let content = tokio_util::io::ReaderStream::new(file).map_ok(Frame::data);
66 let body = StreamBody::new(content);
67 HttpBody::Stream(BodyExt::boxed(body))
68 }
69
70 #[cfg(feature = "smol-rt")]
71 {
72 let content = file
73 .bytes()
74 .map_ok(|data| Frame::data(Bytes::copy_from_slice(&[data])));
75 let body = StreamBody::new(content);
76 HttpBody::Stream(BodyExt::boxed(body))
77 }
78 }
79
80 pub fn from_bytes(bytes: &[u8]) -> Self {
81 #[cfg(feature = "tokio-rt")]
82 {
83 let all_bytes = Bytes::copy_from_slice(bytes);
84 let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
85 let body = StreamBody::new(content);
86 HttpBody::Stream(BodyExt::boxed(body))
87 }
88
89 #[cfg(feature = "smol-rt")]
90 {
91 let all_bytes = Bytes::copy_from_slice(bytes);
92 let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
93 let body = StreamBody::new(content);
94 HttpBody::Stream(BodyExt::boxed(body))
95 }
96 }
97}
98
99impl Body for HttpBody {
100 type Data = Bytes;
101
102 type Error = std::io::Error;
103
104 fn poll_frame(
105 self: Pin<&mut Self>,
106 cx: &mut Context<'_>,
107 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
108 match self.get_mut() {
109 HttpBody::Incoming(incoming) => incoming
110 .frame()
111 .poll_unpin(cx)
112 .map_err(std::io::Error::other),
113 HttpBody::Stream(stream) => {
114 stream.frame().poll_unpin(cx).map_err(std::io::Error::other)
115 }
116 #[cfg(feature = "http3")]
117 HttpBody::QuicClientIncoming(stream) => match ready!(stream.poll_recv_data(cx)) {
118 Ok(frame) => match frame {
119 Some(mut frame) => Poll::Ready(Some(Ok(Frame::data(
120 frame.copy_to_bytes(frame.remaining()),
121 )))),
122 None => {
123 cx.waker().wake_by_ref();
124 Poll::Ready(None)
125 }
126 },
127 Err(e) => {
128 println!("Error polling frame: {}", e);
129 Poll::Ready(Some(Err(std::io::Error::other(e))))
130 }
131 },
132 #[cfg(feature = "http3")]
133 HttpBody::QuicServerIncoming(stream) => match ready!(stream.poll_recv_data(cx)) {
134 Ok(frame) => match frame {
135 Some(mut frame) => Poll::Ready(Some(Ok(Frame::data(
136 frame.copy_to_bytes(frame.remaining()),
137 )))),
138 None => {
139 cx.waker().wake_by_ref();
140 Poll::Ready(None)
141 }
142 },
143 Err(e) => Poll::Ready(Some(Err(std::io::Error::other(e)))),
144 },
145 }
146 }
147}
148
149impl Stream for HttpBody {
150 type Item = Result<Frame<Bytes>, std::io::Error>;
151
152 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
153 self.poll_frame(cx)
154 }
155}
156
157