1#[cfg(feature = "http3")]
2use bytes::Buf;
3
4use bytes::Bytes;
5use futures::{stream, FutureExt, Stream, TryStreamExt};
6
7#[cfg(feature = "http3")]
8use h3::client::RequestStream as ClientRequestStream;
9#[cfg(feature = "http3")]
10use h3::server::RequestStream as ServerRequestStream;
11#[cfg(feature = "http3")]
12use h3_quinn::{SendStream, RecvStream};
13
14use http_body_util::{combinators::BoxBody, BodyExt, StreamBody};
15use hyper::body::{Body, Frame, Incoming};
16
17#[cfg(feature = "smol-rt")]
18use smol::fs::File;
19#[cfg(feature = "smol-rt")]
20use smol::io::AsyncReadExt;
21
22#[cfg(feature = "tokio-rt")]
23use tokio::fs::File;
24#[cfg(feature = "tokio-rt")]
25use tokio_util::io::ReaderStream;
26
27pub enum HttpBody {
28 Incoming(Incoming),
29 Stream(BoxBody<Bytes, std::io::Error>),
30 #[cfg(feature = "http3")]
31 QuicClientIncoming(ClientRequestStream<RecvStream, Bytes>),
32 #[cfg(feature = "http3")]
33 QuicServerIncoming(ServerRequestStream<RecvStream, Bytes>),
34}
35
36impl HttpBody {
37 pub fn from_incoming(incoming: Incoming) -> Self {
38 HttpBody::Incoming(incoming)
39 }
40
41 #[cfg(feature = "http3")]
42 pub fn from_quic_client(stream: ClientRequestStream<RecvStream, Bytes>) -> Self {
43 HttpBody::QuicClientIncoming(stream)
44 }
45
46 #[cfg(feature = "http3")]
47 pub fn from_quic_server(stream: ServerRequestStream<RecvStream, Bytes>) -> Self {
48 HttpBody::QuicServerIncoming(stream)
49 }
50
51 pub fn from_text(text: &str) -> Self {
52 Self::from_bytes(text.as_bytes())
53 }
54
55 pub fn from_file(file: File) -> Self {
56 #[cfg(feature = "tokio-rt")]
57 {
58 let content = ReaderStream::new(file).map_ok(Frame::data);
59 let body = StreamBody::new(content);
60 HttpBody::Stream(body.boxed())
61 }
62
63 #[cfg(feature = "smol-rt")]
64 {
65 let content = file
67 .bytes()
68 .map_ok(|data| Frame::data(bytes::Bytes::copy_from_slice(&[data])));
69 let body = StreamBody::new(content);
70 HttpBody::Stream(body.boxed())
71 }
72 }
73
74 pub fn from_bytes(bytes: &[u8]) -> Self {
75 #[cfg(feature = "tokio-rt")]
76 {
77 let all_bytes = Bytes::copy_from_slice(bytes);
78 let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
79 let body = StreamBody::new(content);
80 HttpBody::Stream(body.boxed())
81 }
82
83 #[cfg(feature = "smol-rt")]
84 {
85 let all_bytes = Bytes::copy_from_slice(bytes);
86 let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
87 let body = StreamBody::new(content);
88 HttpBody::Stream(body.boxed())
89 }
90 }
91}
92
93impl Body for HttpBody {
94 type Data = Bytes;
95
96 type Error = std::io::Error;
97
98 fn poll_frame(
99 self: std::pin::Pin<&mut Self>,
100 cx: &mut std::task::Context<'_>,
101 ) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
102 match self.get_mut() {
103 HttpBody::Incoming(incoming) => incoming
104 .frame()
105 .poll_unpin(cx)
106 .map_err(std::io::Error::other),
107 HttpBody::Stream(stream) => {
108 stream.frame().poll_unpin(cx).map_err(std::io::Error::other)
109 }
110 #[cfg(feature = "http3")]
111 HttpBody::QuicClientIncoming(stream) => stream.poll_recv_data(cx).map(|value| match value {
112 Ok(Some(mut value)) => {
113 Some(Ok(Frame::data(value.copy_to_bytes(value.remaining()))))
114 }
115 Ok(None) => None,
116 Err(e) => Some(Err(std::io::Error::other(e))),
117 }),
118 #[cfg(feature = "http3")]
119 HttpBody::QuicServerIncoming(stream) => stream.poll_recv_data(cx).map(|value| match value {
120 Ok(Some(mut value)) => {
121 Some(Ok(Frame::data(value.copy_to_bytes(value.remaining()))))
122 }
123 Ok(None) => None,
124 Err(e) => Some(Err(std::io::Error::other(e))),
125 }),
126 }
127 }
128}
129
130impl Stream for HttpBody {
131 type Item = Result<Frame<Bytes>, std::io::Error>;
132
133 fn poll_next(
134 self: std::pin::Pin<&mut Self>,
135 cx: &mut std::task::Context<'_>,
136 ) -> std::task::Poll<Option<Self::Item>> {
137 self.poll_frame(cx)
138 }
139}