1use bytes::Bytes;
2use futures::{FutureExt, Stream, TryStreamExt, stream};
3use http_body_util::{BodyExt, StreamBody, combinators::BoxBody};
4use hyper::body::{Body, Frame, Incoming};
5
6#[cfg(feature = "smol-rt")]
7use smol::fs::File;
8
9#[cfg(feature = "tokio-rt")]
10use tokio::fs::File;
11#[cfg(feature = "tokio-rt")]
12use tokio_util::io::ReaderStream;
13
14pub enum HttpBody {
15 Incoming(Incoming),
16 Stream(BoxBody<Bytes, std::io::Error>),
17}
18
19impl HttpBody {
20 pub fn from_incoming(incoming: Incoming) -> Self {
21 HttpBody::Incoming(incoming)
22 }
23
24 pub fn from_text(text: &str) -> Self {
25 Self::from_bytes(text.as_bytes())
26 }
27
28 pub fn from_file(file: File) -> Self {
29 #[cfg(feature = "tokio-rt")]
30 {
31 let content = ReaderStream::new(file).map_ok(Frame::data);
32 let body = StreamBody::new(content);
33 HttpBody::Stream(body.boxed())
34 }
35
36 #[cfg(feature = "smol-rt")]
37 {
38 let content = file
40 .bytes()
41 .map_ok(|data| Frame::data(bytes::Bytes::copy_from_slice(&[data])));
42 let body = StreamBody::new(content);
43 HttpBody::Stream(body.boxed());
44 }
45 }
46
47 pub fn from_bytes(bytes: &[u8]) -> Self {
48 #[cfg(feature = "tokio-rt")]
49 {
50 let all_bytes = Bytes::copy_from_slice(bytes);
51 let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
52 let body = StreamBody::new(content);
53 HttpBody::Stream(body.boxed())
54 }
55
56 #[cfg(feature = "smol-rt")]
57 {
58 let all_bytes = Bytes::copy_from_slice(request.as_ref().raw_body());
59 let content = stream::iter(vec![Ok(all_bytes)]).map_ok(Frame::data);
60 let body = StreamBody::new(content);
61 HttpBody::Stream(body.boxed())
62 }
63 }
64}
65
66impl Body for HttpBody {
67 type Data = Bytes;
68
69 type Error = std::io::Error;
70
71 fn poll_frame(
72 self: std::pin::Pin<&mut Self>,
73 cx: &mut std::task::Context<'_>,
74 ) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
75 match self.get_mut() {
76 HttpBody::Incoming(incoming) => incoming
77 .frame()
78 .poll_unpin(cx)
79 .map_err(std::io::Error::other),
80 HttpBody::Stream(stream) => {
81 stream.frame().poll_unpin(cx).map_err(std::io::Error::other)
82 }
83 }
84 }
85}
86
87impl Stream for HttpBody {
88 type Item = Result<Frame<Bytes>, std::io::Error>;
89
90 fn poll_next(
91 self: std::pin::Pin<&mut Self>,
92 cx: &mut std::task::Context<'_>,
93 ) -> std::task::Poll<Option<Self::Item>> {
94 self.poll_frame(cx)
95 }
96}