1#[cfg(feature = "http3")]
2use bytes::Buf;
3#[cfg(feature = "http3")]
4use futures::ready;
5
6use bytes::{Bytes};
7use futures::{stream, FutureExt, Stream, TryStreamExt};
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11#[cfg(feature = "http3")]
12use h3::client::RequestStream as ClientRequestStream;
13#[cfg(feature = "http3")]
14use h3::server::RequestStream as ServerRequestStream;
15#[cfg(feature = "http3")]
16use h3_quinn::RecvStream;
17
18pub use http_body_util::BodyExt;
19
20use http_body_util::{combinators::BoxBody, StreamBody};
21use hyper::body::{Body, Frame, Incoming};
22
23#[cfg(feature = "smol-rt")]
24use smol::fs::File;
25
26#[cfg(feature = "tokio-rt")]
27use tokio::fs::File;
28
29#[cfg(test)]
30mod tests;
31
32pub enum HttpBody {
33 Incoming(Incoming),
34 Stream(BoxBody<Bytes, std::io::Error>),
35 #[cfg(feature = "http3")]
36 QuicClientIncoming(ClientRequestStream<RecvStream, Bytes>),
37 #[cfg(feature = "http3")]
38 QuicServerIncoming(ServerRequestStream<RecvStream, Bytes>),
39}
40
41impl HttpBody {
42 pub fn from_incoming(incoming: Incoming) -> Self {
43 HttpBody::Incoming(incoming)
44 }
45
46 #[cfg(feature = "http3")]
47 pub fn from_quic_client(stream: ClientRequestStream<RecvStream, Bytes>) -> Self {
48 HttpBody::QuicClientIncoming(stream)
49 }
50
51 #[cfg(feature = "http3")]
52 pub fn from_quic_server(stream: ServerRequestStream<RecvStream, Bytes>) -> Self {
53 HttpBody::QuicServerIncoming(stream)
54 }
55
56 pub fn from_text(text: &str) -> Self {
57 Self::from_bytes(text.as_bytes())
58 }
59
60 pub fn from_file(file: File) -> Self {
61 #[cfg(feature = "tokio-rt")]
62 {
63 let content = tokio_util::io::ReaderStream::new(file).map_ok(Frame::data);
64 let body = StreamBody::new(content);
65 HttpBody::Stream(BodyExt::boxed(body))
66 }
67
68 #[cfg(feature = "smol-rt")]
69 {
70 let content = file
71 .bytes()
72 .map_ok(|data| Frame::data(Bytes::copy_from_slice(&[data])));
73 let body = StreamBody::new(content);
74 HttpBody::Stream(BodyExt::boxed(body))
75 }
76 }
77
78 pub fn from_bytes(bytes: &[u8]) -> Self {
79 #[cfg(feature = "tokio-rt")]
80 {
81 let all_bytes = Bytes::copy_from_slice(bytes);
82 let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
83 let body = StreamBody::new(content);
84 HttpBody::Stream(BodyExt::boxed(body))
85 }
86
87 #[cfg(feature = "smol-rt")]
88 {
89 let all_bytes = Bytes::copy_from_slice(bytes);
90 let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
91 let body = StreamBody::new(content);
92 HttpBody::Stream(BodyExt::boxed(body))
93 }
94 }
95}
96
97impl Body for HttpBody {
98 type Data = Bytes;
99
100 type Error = std::io::Error;
101
102 fn poll_frame(
103 self: Pin<&mut Self>,
104 cx: &mut Context<'_>,
105 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
106 match self.get_mut() {
107 HttpBody::Incoming(incoming) => incoming
108 .frame()
109 .poll_unpin(cx)
110 .map_err(std::io::Error::other),
111 HttpBody::Stream(stream) => {
112 stream.frame().poll_unpin(cx).map_err(std::io::Error::other)
113 }
114 #[cfg(feature = "http3")]
115 HttpBody::QuicClientIncoming(stream) => match ready!(stream.poll_recv_data(cx)) {
116 Ok(frame) => match frame {
117 Some(mut frame) => Poll::Ready(Some(Ok(Frame::data(
118 frame.copy_to_bytes(frame.remaining()),
119 )))),
120 None => {
121 cx.waker().wake_by_ref();
122 Poll::Ready(None)
123 }
124 },
125 Err(e) => {
126 println!("Error polling frame: {}", e);
127 Poll::Ready(Some(Err(std::io::Error::other(e))))
128 }
129 },
130 #[cfg(feature = "http3")]
131 HttpBody::QuicServerIncoming(stream) => match ready!(stream.poll_recv_data(cx)) {
132 Ok(frame) => match frame {
133 Some(mut frame) => Poll::Ready(Some(Ok(Frame::data(
134 frame.copy_to_bytes(frame.remaining()),
135 )))),
136 None => {
137 cx.waker().wake_by_ref();
138 Poll::Ready(None)
139 }
140 },
141 Err(e) => Poll::Ready(Some(Err(std::io::Error::other(e)))),
142 },
143 }
144 }
145}
146
147impl Stream for HttpBody {
148 type Item = Result<Frame<Bytes>, std::io::Error>;
149
150 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
151 self.poll_frame(cx)
152 }
153}
154
155