1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use bytes::Bytes;
7use futures_util::{Stream, TryStream, TryStreamExt};
8use http_body_util::{BodyExt, BodyStream, Full, StreamBody, combinators::UnsyncBoxBody};
9use hyper::body::{Frame, Incoming, SizeHint};
10use sync_wrapper::SyncWrapper;
11
12use crate::{BoxError, Error, HttpBody, Result};
13
14#[derive(Clone, Debug, Eq, PartialEq)]
16pub enum BodyState {
17 Normal,
19 Empty,
21 Used,
23}
24
25#[derive(Debug, Default)]
30pub enum Body<D = Bytes> {
31 #[default]
33 Empty,
34 Full(Full<D>),
36 Boxed(SyncWrapper<UnsyncBoxBody<D, Error>>),
38 Incoming(Incoming),
40}
41
42impl Body {
43 #[must_use]
45 pub const fn empty() -> Self {
46 Self::Empty
47 }
48
49 #[allow(clippy::missing_panics_doc)]
51 pub fn wrap<B>(body: B) -> Self
52 where
53 B: HttpBody + Send + 'static,
54 B::Data: Into<Bytes>,
55 B::Error: Into<BoxError>,
56 {
57 let mut body = Some(body);
59 <dyn std::any::Any>::downcast_mut::<Option<UnsyncBoxBody<Bytes, Error>>>(&mut body)
60 .and_then(Option::take)
61 .unwrap_or_else(|| {
62 body.unwrap()
63 .map_frame(|frame| frame.map_data(Into::into))
64 .map_err(Error::boxed)
65 .boxed_unsync()
66 })
67 .into()
68 }
69
70 pub fn from_stream<S>(stream: S) -> Self
72 where
73 S: TryStream + Send + 'static,
74 S::Ok: Into<Bytes>,
75 S::Error: Into<BoxError>,
76 {
77 StreamBody::new(
78 stream
79 .map_ok(Into::into)
80 .map_ok(Frame::data)
81 .map_err(Error::boxed),
82 )
83 .boxed_unsync()
84 .into()
85 }
86
87 pub fn into_stream(self) -> BodyStream<Self> {
89 BodyStream::new(self)
90 }
91}
92
93impl HttpBody for Body {
94 type Data = Bytes;
95 type Error = Error;
96
97 #[inline]
98 fn poll_frame(
99 self: Pin<&mut Self>,
100 cx: &mut Context<'_>,
101 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
102 match self.get_mut() {
103 Self::Empty => Poll::Ready(None),
104 Self::Full(inner) => Pin::new(inner).poll_frame(cx).map_err(Into::into),
105 Self::Boxed(inner) => Pin::new(inner).get_pin_mut().poll_frame(cx),
106 Self::Incoming(inner) => Pin::new(inner).poll_frame(cx).map_err(Into::into),
107 }
108 }
109
110 #[inline]
111 fn is_end_stream(&self) -> bool {
112 match self {
113 Self::Empty => true,
114 Self::Boxed(_) => false,
115 Self::Full(inner) => inner.is_end_stream(),
116 Self::Incoming(inner) => inner.is_end_stream(),
117 }
118 }
119
120 #[inline]
121 fn size_hint(&self) -> SizeHint {
122 match self {
123 Self::Empty => SizeHint::with_exact(0),
124 Self::Full(inner) => inner.size_hint(),
125 Self::Boxed(_) => SizeHint::default(),
126 Self::Incoming(inner) => inner.size_hint(),
127 }
128 }
129}
130
131impl Stream for Body {
132 type Item = Result<Bytes, std::io::Error>;
133
134 #[inline]
135 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
136 match match self.get_mut() {
137 Self::Empty => return Poll::Ready(None),
138 Self::Full(inner) => Pin::new(inner)
139 .poll_frame(cx)
140 .map_err(std::io::Error::other)?,
141 Self::Boxed(inner) => Pin::new(inner)
142 .get_pin_mut()
143 .poll_frame(cx)
144 .map_err(std::io::Error::other)?,
145 Self::Incoming(inner) => Pin::new(inner)
146 .poll_frame(cx)
147 .map_err(std::io::Error::other)?,
148 } {
149 Poll::Pending => Poll::Pending,
150 Poll::Ready(None) => Poll::Ready(None),
151 Poll::Ready(Some(frame)) => Poll::Ready(frame.into_data().map(Ok).ok()),
152 }
153 }
154
155 #[inline]
156 fn size_hint(&self) -> (usize, Option<usize>) {
157 let sh = match self {
158 Self::Empty => return (0, Some(0)),
159 Self::Full(inner) => inner.size_hint(),
160 Self::Boxed(_) => return (0, None),
161 Self::Incoming(inner) => inner.size_hint(),
162 };
163 (
164 usize::try_from(sh.lower()).unwrap_or(usize::MAX),
165 sh.upper().map(|v| usize::try_from(v).unwrap_or(usize::MAX)),
166 )
167 }
168}
169
170impl From<()> for Body {
171 fn from((): ()) -> Self {
172 Self::Empty
173 }
174}
175
176impl<D> From<Full<D>> for Body<D> {
177 fn from(value: Full<D>) -> Self {
178 Self::Full(value)
179 }
180}
181
182impl<D> From<UnsyncBoxBody<D, Error>> for Body<D> {
183 fn from(value: UnsyncBoxBody<D, Error>) -> Self {
184 Self::Boxed(SyncWrapper::new(value))
185 }
186}