1use bytes::Bytes;
2use futures::{FutureExt, Stream, TryStreamExt, stream};
3use http_body_util::{BodyExt, StreamBody, combinators::BoxBody};
4use hyper::body::{Body, Frame, Incoming};
5
6#[cfg(feature = "smol-rt")]
7use smol::fs::File;
8#[cfg(feature = "smol-rt")]
9use smol::io::AsyncReadExt;
10
11#[cfg(feature = "tokio-rt")]
12use tokio::fs::File;
13#[cfg(feature = "tokio-rt")]
14use tokio_util::io::ReaderStream;
15
16pub enum HttpBody {
17 Incoming(Incoming),
18 Stream(BoxBody<Bytes, std::io::Error>),
19}
20
21impl HttpBody {
22 pub fn from_incoming(incoming: Incoming) -> Self {
23 HttpBody::Incoming(incoming)
24 }
25
26 pub fn from_text(text: &str) -> Self {
27 Self::from_bytes(text.as_bytes())
28 }
29
30 pub fn from_file(file: File) -> Self {
31 #[cfg(feature = "tokio-rt")]
32 {
33 let content = ReaderStream::new(file).map_ok(Frame::data);
34 let body = StreamBody::new(content);
35 HttpBody::Stream(body.boxed())
36 }
37
38 #[cfg(feature = "smol-rt")]
39 {
40 let content = file
42 .bytes()
43 .map_ok(|data| Frame::data(bytes::Bytes::copy_from_slice(&[data])));
44 let body = StreamBody::new(content);
45 HttpBody::Stream(body.boxed())
46 }
47 }
48
49 pub fn from_bytes(bytes: &[u8]) -> Self {
50 #[cfg(feature = "tokio-rt")]
51 {
52 let all_bytes = Bytes::copy_from_slice(bytes);
53 let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
54 let body = StreamBody::new(content);
55 HttpBody::Stream(body.boxed())
56 }
57
58 #[cfg(feature = "smol-rt")]
59 {
60 let all_bytes = Bytes::copy_from_slice(bytes);
61 let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
62 let body = StreamBody::new(content);
63 HttpBody::Stream(body.boxed())
64 }
65 }
66}
67
68impl Body for HttpBody {
69 type Data = Bytes;
70
71 type Error = std::io::Error;
72
73 fn poll_frame(
74 self: std::pin::Pin<&mut Self>,
75 cx: &mut std::task::Context<'_>,
76 ) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
77 match self.get_mut() {
78 HttpBody::Incoming(incoming) => incoming
79 .frame()
80 .poll_unpin(cx)
81 .map_err(std::io::Error::other),
82 HttpBody::Stream(stream) => {
83 stream.frame().poll_unpin(cx).map_err(std::io::Error::other)
84 }
85 }
86 }
87}
88
89impl Stream for HttpBody {
90 type Item = Result<Frame<Bytes>, std::io::Error>;
91
92 fn poll_next(
93 self: std::pin::Pin<&mut Self>,
94 cx: &mut std::task::Context<'_>,
95 ) -> std::task::Poll<Option<Self::Item>> {
96 self.poll_frame(cx)
97 }
98}