1use core::{
10 convert::Infallible,
11 marker::PhantomData,
12 mem,
13 pin::Pin,
14 task::{Context, Poll},
15};
16
17use std::{borrow::Cow, error};
18
19use futures_core::stream::{LocalBoxStream, Stream};
20use pin_project_lite::pin_project;
21
22use super::{
23 bytes::{Buf, Bytes, BytesMut},
24 error::BodyError,
25};
26
27pub const fn none_body_hint() -> (usize, Option<usize>) {
30 NONE_BODY_HINT
31}
32
33pub const NONE_BODY_HINT: (usize, Option<usize>) = (usize::MAX, Some(0));
34
35pub const fn exact_body_hint(size: usize) -> (usize, Option<usize>) {
38 (size, Some(size))
39}
40
41#[derive(Default)]
44pub enum RequestBody {
45 #[cfg(feature = "http1")]
46 H1(super::h1::RequestBody),
47 #[cfg(feature = "http2")]
48 H2(super::h2::RequestBody),
49 #[cfg(feature = "http3")]
50 H3(super::h3::RequestBody),
51 Unknown(BoxBody),
52 #[default]
53 None,
54}
55
56impl Stream for RequestBody {
57 type Item = Result<Bytes, BodyError>;
58
59 #[inline]
60 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
61 match self.get_mut() {
62 #[cfg(feature = "http1")]
63 Self::H1(body) => Pin::new(body).poll_next(cx).map_err(Into::into),
64 #[cfg(feature = "http2")]
65 Self::H2(body) => Pin::new(body).poll_next(cx),
66 #[cfg(feature = "http3")]
67 Self::H3(body) => Pin::new(body).poll_next(cx),
68 Self::Unknown(body) => Pin::new(body).poll_next(cx),
69 Self::None => Poll::Ready(None),
70 }
71 }
72}
73
74impl<B> From<NoneBody<B>> for RequestBody {
75 fn from(_: NoneBody<B>) -> Self {
76 Self::None
77 }
78}
79
80impl From<Bytes> for RequestBody {
81 fn from(bytes: Bytes) -> Self {
82 Self::from(Once::new(bytes))
83 }
84}
85
86impl From<Once<Bytes>> for RequestBody {
87 fn from(once: Once<Bytes>) -> Self {
88 Self::from(BoxBody::new(once))
89 }
90}
91
92impl From<BoxBody> for RequestBody {
93 fn from(body: BoxBody) -> Self {
94 Self::Unknown(body)
95 }
96}
97
98macro_rules! req_bytes_impl {
99 ($ty: ty) => {
100 impl From<$ty> for RequestBody {
101 fn from(item: $ty) -> Self {
102 Self::from(Bytes::from(item))
103 }
104 }
105 };
106}
107
108req_bytes_impl!(&'static [u8]);
109req_bytes_impl!(Box<[u8]>);
110req_bytes_impl!(Vec<u8>);
111req_bytes_impl!(String);
112
113pub struct NoneBody<B>(PhantomData<fn(B)>);
116
117impl<B> Default for NoneBody<B> {
118 fn default() -> Self {
119 Self(PhantomData)
120 }
121}
122
123impl<B> Stream for NoneBody<B> {
124 type Item = Result<B, Infallible>;
125
126 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
127 unreachable!("NoneBody must not be polled. See NoneBody for detail")
128 }
129
130 #[inline]
131 fn size_hint(&self) -> (usize, Option<usize>) {
132 none_body_hint()
133 }
134}
135
136#[derive(Default)]
138pub struct Once<B>(Option<B>);
139
140impl<B> Once<B>
141where
142 B: Buf + Unpin,
143{
144 #[inline]
145 pub const fn new(body: B) -> Self {
146 Self(Some(body))
147 }
148}
149
150impl<B> From<B> for Once<B>
151where
152 B: Buf + Unpin,
153{
154 fn from(b: B) -> Self {
155 Self::new(b)
156 }
157}
158
159impl<B> Stream for Once<B>
160where
161 B: Buf + Unpin,
162{
163 type Item = Result<B, Infallible>;
164
165 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
166 Poll::Ready(mem::replace(self.get_mut(), Self(None)).0.map(Ok))
167 }
168
169 fn size_hint(&self) -> (usize, Option<usize>) {
171 self.0
172 .as_ref()
173 .map(|b| exact_body_hint(b.remaining()))
174 .expect("Once must check size_hint before it got polled")
175 }
176}
177
178pin_project! {
179 pub struct Either<L, R> {
180 #[pin]
181 inner: EitherInner<L, R>
182 }
183}
184
185pin_project! {
186 #[project = EitherProj]
187 enum EitherInner<L, R> {
188 L {
189 #[pin]
190 inner: L
191 },
192 R {
193 #[pin]
194 inner: R
195 }
196 }
197}
198
199impl<L, R> Either<L, R> {
200 #[inline]
201 pub const fn left(inner: L) -> Self {
202 Self {
203 inner: EitherInner::L { inner },
204 }
205 }
206
207 #[inline]
208 pub const fn right(inner: R) -> Self {
209 Self {
210 inner: EitherInner::R { inner },
211 }
212 }
213}
214
215impl<L, R, T, E, E2> Stream for Either<L, R>
216where
217 L: Stream<Item = Result<T, E>>,
218 R: Stream<Item = Result<T, E2>>,
219 E2: From<E>,
220{
221 type Item = Result<T, E2>;
222
223 #[inline]
224 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
225 match self.project().inner.project() {
226 EitherProj::L { inner } => inner.poll_next(cx).map(|res| res.map(|res| res.map_err(Into::into))),
227 EitherProj::R { inner } => inner.poll_next(cx),
228 }
229 }
230
231 #[inline]
232 fn size_hint(&self) -> (usize, Option<usize>) {
233 match self.inner {
234 EitherInner::L { ref inner } => inner.size_hint(),
235 EitherInner::R { ref inner } => inner.size_hint(),
236 }
237 }
238}
239
240pub struct BoxBody(LocalBoxStream<'static, Result<Bytes, BodyError>>);
242
243impl Default for BoxBody {
244 fn default() -> Self {
245 Self::new(NoneBody::<Bytes>::default())
246 }
247}
248
249impl BoxBody {
250 #[inline]
251 pub fn new<B, T, E>(body: B) -> Self
252 where
253 B: Stream<Item = Result<T, E>> + 'static,
254 T: Into<Bytes>,
255 E: Into<BodyError>,
256 {
257 pin_project! {
258 struct MapStream<B> {
259 #[pin]
260 body: B
261 }
262 }
263
264 impl<B, T, E> Stream for MapStream<B>
265 where
266 B: Stream<Item = Result<T, E>>,
267 T: Into<Bytes>,
268 E: Into<BodyError>,
269 {
270 type Item = Result<Bytes, BodyError>;
271
272 #[inline]
273 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
274 self.project().body.poll_next(cx).map_ok(Into::into).map_err(Into::into)
275 }
276
277 #[inline]
278 fn size_hint(&self) -> (usize, Option<usize>) {
279 self.body.size_hint()
280 }
281 }
282
283 Self(Box::pin(MapStream { body }))
284 }
285}
286
287impl Stream for BoxBody {
288 type Item = Result<Bytes, BodyError>;
289
290 #[inline]
291 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
292 self.get_mut().0.as_mut().poll_next(cx)
293 }
294
295 #[inline]
296 fn size_hint(&self) -> (usize, Option<usize>) {
297 self.0.size_hint()
298 }
299}
300
301pin_project! {
302 pub struct ResponseBody<B = BoxBody> {
305 #[pin]
306 inner: ResponseBodyInner<B>
307 }
308}
309
310pin_project! {
311 #[project = ResponseBodyProj]
312 #[project_replace = ResponseBodyProjReplace]
313 enum ResponseBodyInner<B> {
314 None,
315 Bytes {
316 bytes: Bytes,
317 },
318 Stream {
319 #[pin]
320 stream: B,
321 },
322 }
323}
324
325impl<B> Default for ResponseBody<B> {
326 fn default() -> Self {
327 Self::none()
328 }
329}
330
331impl ResponseBody {
332 #[inline]
334 pub fn box_stream<B, T, E>(stream: B) -> Self
335 where
336 B: Stream<Item = Result<T, E>> + 'static,
337 T: Into<Bytes>,
338 E: Into<BodyError>,
339 {
340 Self::stream(BoxBody::new(stream))
341 }
342}
343
344impl<B> ResponseBody<B> {
345 #[inline]
349 pub const fn none() -> Self {
350 Self {
351 inner: ResponseBodyInner::None,
352 }
353 }
354
355 #[inline]
359 pub const fn empty() -> Self {
360 Self {
361 inner: ResponseBodyInner::Bytes { bytes: Bytes::new() },
362 }
363 }
364
365 #[inline]
367 pub const fn stream(stream: B) -> Self {
368 Self {
369 inner: ResponseBodyInner::Stream { stream },
370 }
371 }
372
373 #[inline]
375 pub fn bytes<B2>(bytes: B2) -> Self
376 where
377 Bytes: From<B2>,
378 {
379 Self {
380 inner: ResponseBodyInner::Bytes {
381 bytes: Bytes::from(bytes),
382 },
383 }
384 }
385
386 #[inline]
388 pub fn into_boxed<T, E>(self) -> ResponseBody
389 where
390 B: Stream<Item = Result<T, E>> + 'static,
391 T: Into<Bytes>,
392 E: error::Error + Send + Sync + 'static,
393 {
394 match self.inner {
395 ResponseBodyInner::None => ResponseBody::none(),
396 ResponseBodyInner::Bytes { bytes } => ResponseBody::bytes(bytes),
397 ResponseBodyInner::Stream { stream } => ResponseBody::box_stream(stream),
398 }
399 }
400}
401
402impl<B, E> Stream for ResponseBody<B>
403where
404 B: Stream<Item = Result<Bytes, E>>,
405{
406 type Item = Result<Bytes, E>;
407
408 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
409 let mut inner = self.project().inner;
410 match inner.as_mut().project() {
411 ResponseBodyProj::None => Poll::Ready(None),
412 ResponseBodyProj::Bytes { .. } => match inner.project_replace(ResponseBodyInner::None) {
413 ResponseBodyProjReplace::Bytes { bytes } => Poll::Ready(Some(Ok(bytes))),
414 _ => unreachable!(),
415 },
416 ResponseBodyProj::Stream { stream } => stream.poll_next(cx),
417 }
418 }
419
420 fn size_hint(&self) -> (usize, Option<usize>) {
421 match self.inner {
422 ResponseBodyInner::None => none_body_hint(),
423 ResponseBodyInner::Bytes { ref bytes } => exact_body_hint(bytes.len()),
424 ResponseBodyInner::Stream { ref stream } => stream.size_hint(),
425 }
426 }
427}
428
429impl<B> From<NoneBody<B>> for ResponseBody {
430 fn from(_: NoneBody<B>) -> Self {
431 ResponseBody::none()
432 }
433}
434
435impl<B> From<Once<B>> for ResponseBody
436where
437 B: Into<Bytes>,
438{
439 fn from(once: Once<B>) -> Self {
440 ResponseBody::bytes(once.0.map(Into::into).unwrap_or_default())
441 }
442}
443
444impl From<BoxBody> for ResponseBody {
445 fn from(stream: BoxBody) -> Self {
446 Self::stream(stream)
447 }
448}
449
450macro_rules! res_bytes_impl {
451 ($ty: ty) => {
452 impl<B> From<$ty> for ResponseBody<B> {
453 fn from(item: $ty) -> Self {
454 Self::bytes(item)
455 }
456 }
457 };
458}
459
460res_bytes_impl!(Bytes);
461res_bytes_impl!(BytesMut);
462res_bytes_impl!(&'static [u8]);
463res_bytes_impl!(&'static str);
464res_bytes_impl!(Box<[u8]>);
465res_bytes_impl!(Vec<u8>);
466res_bytes_impl!(String);
467
468impl<B> From<Box<str>> for ResponseBody<B> {
469 fn from(str: Box<str>) -> Self {
470 Self::from(Box::<[u8]>::from(str))
471 }
472}
473
474impl<B> From<Cow<'static, str>> for ResponseBody<B> {
475 fn from(str: Cow<'static, str>) -> Self {
476 match str {
477 Cow::Owned(str) => Self::from(str),
478 Cow::Borrowed(str) => Self::from(str),
479 }
480 }
481}
482
483#[derive(Copy, Clone, Debug, Eq, PartialEq)]
485pub enum BodySize {
486 None,
490 Sized(usize),
494 Stream,
498}
499
500impl BodySize {
501 #[inline]
502 pub fn from_stream<S>(stream: &S) -> Self
503 where
504 S: Stream,
505 {
506 match stream.size_hint() {
507 NONE_BODY_HINT => Self::None,
508 (_, Some(size)) => Self::Sized(size),
509 (_, None) => Self::Stream,
510 }
511 }
512}
513
514#[cfg(test)]
515mod test {
516 use super::*;
517
518 #[test]
519 fn stream_body_size_hint() {
520 let body = BoxBody::new(Once::new(Bytes::new()));
521 assert_eq!(BodySize::from_stream(&body), BodySize::Sized(0));
522
523 let body = BoxBody::new(NoneBody::<Bytes>::default());
524 assert_eq!(BodySize::from_stream(&body), BodySize::None);
525 }
526}