1#[cfg(feature = "http3")]
2use bytes::Buf;
3
4use bytes::Bytes;
5use futures::{stream, FutureExt, Stream, TryStreamExt};
6
7#[cfg(feature = "http3")]
8use h3::client::RequestStream as ClientRequestStream;
9#[cfg(feature = "http3")]
10use h3::server::RequestStream as ServerRequestStream;
11#[cfg(feature = "http3")]
12use h3_quinn::{SendStream, RecvStream};
13
14use http_body_util::{combinators::BoxBody, BodyExt, StreamBody};
15use hyper::body::{Body, Frame, Incoming};
16
17#[cfg(feature = "smol-rt")]
18use smol::fs::File;
19#[cfg(feature = "smol-rt")]
20use smol::io::AsyncReadExt;
21
22#[cfg(feature = "tokio-rt")]
23use tokio::fs::File;
24#[cfg(feature = "tokio-rt")]
25use tokio_util::io::ReaderStream;
26
27#[cfg(test)]
28mod tests;
29
30pub enum HttpBody {
31 Incoming(Incoming),
32 Stream(BoxBody<Bytes, std::io::Error>),
33 #[cfg(feature = "http3")]
34 QuicClientIncoming(ClientRequestStream<RecvStream, Bytes>),
35 #[cfg(feature = "http3")]
36 QuicServerIncoming(ServerRequestStream<RecvStream, Bytes>),
37}
38
39impl HttpBody {
40 pub fn from_incoming(incoming: Incoming) -> Self {
41 HttpBody::Incoming(incoming)
42 }
43
44 #[cfg(feature = "http3")]
45 pub fn from_quic_client(stream: ClientRequestStream<RecvStream, Bytes>) -> Self {
46 HttpBody::QuicClientIncoming(stream)
47 }
48
49 #[cfg(feature = "http3")]
50 pub fn from_quic_server(stream: ServerRequestStream<RecvStream, Bytes>) -> Self {
51 HttpBody::QuicServerIncoming(stream)
52 }
53
54 pub fn from_text(text: &str) -> Self {
55 Self::from_bytes(text.as_bytes())
56 }
57
58 pub fn from_file(file: File) -> Self {
59 #[cfg(feature = "tokio-rt")]
60 {
61 let content = ReaderStream::new(file).map_ok(Frame::data);
62 let body = StreamBody::new(content);
63 HttpBody::Stream(body.boxed())
64 }
65
66 #[cfg(feature = "smol-rt")]
67 {
68 let content = file
70 .bytes()
71 .map_ok(|data| Frame::data(
72 Bytes::copy_from_slice(&[data])));
73 let body = StreamBody::new(content);
74 HttpBody::Stream(body.boxed())
75 }
76 }
77
78 pub fn from_bytes(bytes: &[u8]) -> Self {
79 #[cfg(feature = "tokio-rt")]
80 {
81 let all_bytes = Bytes::copy_from_slice(bytes);
82 let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
83 let body = StreamBody::new(content);
84 HttpBody::Stream(body.boxed())
85 }
86
87 #[cfg(feature = "smol-rt")]
88 {
89 let all_bytes = Bytes::copy_from_slice(bytes);
90 let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
91 let body = StreamBody::new(content);
92 HttpBody::Stream(body.boxed())
93 }
94 }
95}
96
97impl Body for HttpBody {
98 type Data = Bytes;
99
100 type Error = std::io::Error;
101
102 fn poll_frame(
103 self: std::pin::Pin<&mut Self>,
104 cx: &mut std::task::Context<'_>,
105 ) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
106 match self.get_mut() {
107 HttpBody::Incoming(incoming) => incoming
108 .frame()
109 .poll_unpin(cx)
110 .map_err(std::io::Error::other),
111 HttpBody::Stream(stream) => {
112 stream.frame().poll_unpin(cx).map_err(std::io::Error::other)
113 }
114 #[cfg(feature = "http3")]
115 HttpBody::QuicClientIncoming(stream) => stream.poll_recv_data(cx).map(|value| match value {
116 Ok(Some(mut value)) => {
117 Some(Ok(Frame::data(value.copy_to_bytes(value.remaining()))))
118 }
119 Ok(None) => None,
120 Err(e) => Some(Err(std::io::Error::other(e))),
121 }),
122 #[cfg(feature = "http3")]
123 HttpBody::QuicServerIncoming(stream) => stream.poll_recv_data(cx).map(|value| match value {
124 Ok(Some(mut value)) => {
125 Some(Ok(Frame::data(value.copy_to_bytes(value.remaining()))))
126 }
127 Ok(None) => None,
128 Err(e) => Some(Err(std::io::Error::other(e))),
129 }),
130 }
131 }
132}
133
134impl Stream for HttpBody {
135 type Item = Result<Frame<Bytes>, std::io::Error>;
136
137 fn poll_next(
138 self: std::pin::Pin<&mut Self>,
139 cx: &mut std::task::Context<'_>,
140 ) -> std::task::Poll<Option<Self::Item>> {
141 self.poll_frame(cx)
142 }
143}