1use std::convert::Infallible;
34use std::fmt::Debug;
35use std::pin::Pin;
36use std::task::Context;
37use std::task::Poll;
38
39use anyhow::Result;
40use bytes::Bytes;
41use futures_util::Stream;
42use futures_util::TryStream;
43use futures_util::TryStreamExt;
44use http_body::Body;
45use http_body::Frame;
46use http_body::SizeHint;
47use http_body_util::BodyExt;
48use http_body_util::Empty;
49use http_body_util::Full;
50use http_body_util::StreamBody;
51
52use crate::types::BoxBody;
53use crate::types::BoxError;
54
55#[allow(dead_code)]
59enum BodyInner {
60 Full(Full<Bytes>),
61 Empty(Empty<Bytes>),
62 Incoming(hyper::body::Incoming),
64 Boxed(BoxBody),
65}
66
67pub struct TakoBody(BodyInner);
97
98impl std::fmt::Debug for TakoBody {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 f.debug_struct("TakoBody").finish_non_exhaustive()
101 }
102}
103
104impl TakoBody {
105 #[inline]
110 pub fn new<B>(body: B) -> Self
111 where
112 B: Body<Data = Bytes> + Send + 'static,
113 B::Error: Into<BoxError>,
114 {
115 Self(BodyInner::Boxed(
116 body.map_err(std::convert::Into::into).boxed_unsync(),
117 ))
118 }
119
120 #[inline]
122 pub fn full(body: Full<Bytes>) -> Self {
123 Self(BodyInner::Full(body))
124 }
125
126 #[inline]
128 #[doc(hidden)]
129 pub fn incoming(body: hyper::body::Incoming) -> Self {
130 Self(BodyInner::Incoming(body))
131 }
132
133 #[inline]
135 pub fn from_stream<S, E>(stream: S) -> Self
136 where
137 S: Stream<Item = Result<Bytes, E>> + Send + 'static,
138 E: Into<BoxError> + Debug + 'static,
139 {
140 let stream = stream.map_err(Into::into).map_ok(http_body::Frame::data);
141 let body = StreamBody::new(stream).boxed_unsync();
142 Self(BodyInner::Boxed(body))
143 }
144
145 #[inline]
147 pub fn from_try_stream<S, E>(stream: S) -> Self
148 where
149 S: TryStream<Ok = Frame<Bytes>, Error = E> + Send + 'static,
150 E: Into<BoxError> + 'static,
151 {
152 let body = StreamBody::new(stream.map_err(Into::into)).boxed_unsync();
153 Self(BodyInner::Boxed(body))
154 }
155
156 #[inline]
158 #[must_use]
159 pub fn empty() -> Self {
160 Self(BodyInner::Empty(Empty::new()))
161 }
162}
163
164impl Default for TakoBody {
166 fn default() -> Self {
167 Self::empty()
168 }
169}
170
171impl From<()> for TakoBody {
172 fn from((): ()) -> Self {
173 Self::empty()
174 }
175}
176
177impl From<&str> for TakoBody {
178 fn from(buf: &str) -> Self {
179 Self::full(Full::from(Bytes::from(buf.to_owned())))
180 }
181}
182
183impl From<String> for TakoBody {
184 fn from(buf: String) -> Self {
185 Self::full(Full::from(Bytes::from(buf)))
186 }
187}
188
189impl From<Vec<u8>> for TakoBody {
190 fn from(buf: Vec<u8>) -> Self {
191 Self::full(Full::from(Bytes::from(buf)))
192 }
193}
194
195impl From<Bytes> for TakoBody {
196 fn from(buf: Bytes) -> Self {
197 Self::full(Full::from(buf))
198 }
199}
200
201#[inline]
203fn map_infallible_frame(
204 poll: Poll<Option<core::result::Result<Frame<Bytes>, Infallible>>>,
205) -> Poll<Option<core::result::Result<Frame<Bytes>, BoxError>>> {
206 poll.map(|opt| opt.map(|res| res.map_err(|e| match e {})))
207}
208
209#[inline]
211fn map_hyper_frame(
212 poll: Poll<Option<core::result::Result<Frame<Bytes>, hyper::Error>>>,
213) -> Poll<Option<core::result::Result<Frame<Bytes>, BoxError>>> {
214 poll.map(|opt| opt.map(|res| res.map_err(Into::into)))
215}
216
217impl Body for TakoBody {
218 type Data = Bytes;
219 type Error = BoxError;
220
221 #[inline]
222 fn poll_frame(
223 self: Pin<&mut Self>,
224 cx: &mut Context<'_>,
225 ) -> Poll<Option<core::result::Result<Frame<Self::Data>, Self::Error>>> {
226 match &mut self.get_mut().0 {
228 BodyInner::Full(body) => map_infallible_frame(Pin::new(body).poll_frame(cx)),
229 BodyInner::Empty(body) => map_infallible_frame(Pin::new(body).poll_frame(cx)),
230 BodyInner::Incoming(body) => map_hyper_frame(Pin::new(body).poll_frame(cx)),
231 BodyInner::Boxed(body) => Pin::new(body).poll_frame(cx),
232 }
233 }
234
235 #[inline]
236 fn size_hint(&self) -> SizeHint {
237 match &self.0 {
238 BodyInner::Full(body) => body.size_hint(),
239 BodyInner::Empty(body) => body.size_hint(),
240 BodyInner::Incoming(body) => body.size_hint(),
241 BodyInner::Boxed(body) => body.size_hint(),
242 }
243 }
244
245 #[inline]
246 fn is_end_stream(&self) -> bool {
247 match &self.0 {
248 BodyInner::Full(body) => body.is_end_stream(),
249 BodyInner::Empty(body) => body.is_end_stream(),
250 BodyInner::Incoming(body) => body.is_end_stream(),
251 BodyInner::Boxed(body) => body.is_end_stream(),
252 }
253 }
254}