xitca_http/
body.rs

1//! HTTP body types.
2//!
3//! body types are generic over [Stream] trait and mutation of body type must also implement said
4//! trait for being accepted as body type that xitca-http know of.
5//!
6//! When implementing customized body type please reference [none_body_hint] and [exact_body_hint]
7//! for contract of inferring body size with [Stream::size_hint] trait method.
8
9use core::{
10    convert::Infallible,
11    marker::PhantomData,
12    mem,
13    pin::Pin,
14    task::{Context, Poll},
15};
16
17use std::{borrow::Cow, error};
18
19use futures_core::stream::{LocalBoxStream, Stream};
20use pin_project_lite::pin_project;
21
22use super::{
23    bytes::{Buf, Bytes, BytesMut},
24    error::BodyError,
25};
26
27// this is a crate level hack to hint for none body type.
28// A body type with this size hint means the body MUST not be polled/collected by anyone.
29pub const fn none_body_hint() -> (usize, Option<usize>) {
30    NONE_BODY_HINT
31}
32
33pub const NONE_BODY_HINT: (usize, Option<usize>) = (usize::MAX, Some(0));
34
35// this is a crate level hack to hint for exact body type.
36// A body type with this size hint means the body MUST be polled/collected for exact length of usize.
37pub const fn exact_body_hint(size: usize) -> (usize, Option<usize>) {
38    (size, Some(size))
39}
40
41/// A unified request body type for different http protocols.
42/// This enables one service type to handle multiple http protocols.
43#[derive(Default)]
44pub enum RequestBody {
45    #[cfg(feature = "http1")]
46    H1(super::h1::RequestBody),
47    #[cfg(feature = "http2")]
48    H2(super::h2::RequestBody),
49    #[cfg(feature = "http3")]
50    H3(super::h3::RequestBody),
51    Unknown(BoxBody),
52    #[default]
53    None,
54}
55
56impl Stream for RequestBody {
57    type Item = Result<Bytes, BodyError>;
58
59    #[inline]
60    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
61        match self.get_mut() {
62            #[cfg(feature = "http1")]
63            Self::H1(body) => Pin::new(body).poll_next(cx).map_err(Into::into),
64            #[cfg(feature = "http2")]
65            Self::H2(body) => Pin::new(body).poll_next(cx),
66            #[cfg(feature = "http3")]
67            Self::H3(body) => Pin::new(body).poll_next(cx),
68            Self::Unknown(body) => Pin::new(body).poll_next(cx),
69            Self::None => Poll::Ready(None),
70        }
71    }
72}
73
74impl<B> From<NoneBody<B>> for RequestBody {
75    fn from(_: NoneBody<B>) -> Self {
76        Self::None
77    }
78}
79
80impl From<Bytes> for RequestBody {
81    fn from(bytes: Bytes) -> Self {
82        Self::from(Once::new(bytes))
83    }
84}
85
86impl From<Once<Bytes>> for RequestBody {
87    fn from(once: Once<Bytes>) -> Self {
88        Self::from(BoxBody::new(once))
89    }
90}
91
92impl From<BoxBody> for RequestBody {
93    fn from(body: BoxBody) -> Self {
94        Self::Unknown(body)
95    }
96}
97
98macro_rules! req_bytes_impl {
99    ($ty: ty) => {
100        impl From<$ty> for RequestBody {
101            fn from(item: $ty) -> Self {
102                Self::from(Bytes::from(item))
103            }
104        }
105    };
106}
107
108req_bytes_impl!(&'static [u8]);
109req_bytes_impl!(Box<[u8]>);
110req_bytes_impl!(Vec<u8>);
111req_bytes_impl!(String);
112
113/// None body type.
114/// B type is used to infer other types of body's output type used together with NoneBody.
115pub struct NoneBody<B>(PhantomData<fn(B)>);
116
117impl<B> Default for NoneBody<B> {
118    fn default() -> Self {
119        Self(PhantomData)
120    }
121}
122
123impl<B> Stream for NoneBody<B> {
124    type Item = Result<B, Infallible>;
125
126    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
127        unreachable!("NoneBody must not be polled. See NoneBody for detail")
128    }
129
130    #[inline]
131    fn size_hint(&self) -> (usize, Option<usize>) {
132        none_body_hint()
133    }
134}
135
136/// Full body type that can only be polled once with [Stream::poll_next].
137#[derive(Default)]
138pub struct Once<B>(Option<B>);
139
140impl<B> Once<B>
141where
142    B: Buf + Unpin,
143{
144    #[inline]
145    pub const fn new(body: B) -> Self {
146        Self(Some(body))
147    }
148}
149
150impl<B> From<B> for Once<B>
151where
152    B: Buf + Unpin,
153{
154    fn from(b: B) -> Self {
155        Self::new(b)
156    }
157}
158
159impl<B> Stream for Once<B>
160where
161    B: Buf + Unpin,
162{
163    type Item = Result<B, Infallible>;
164
165    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
166        Poll::Ready(mem::replace(self.get_mut(), Self(None)).0.map(Ok))
167    }
168
169    // use the length of buffer as both lower bound and upper bound.
170    fn size_hint(&self) -> (usize, Option<usize>) {
171        self.0
172            .as_ref()
173            .map(|b| exact_body_hint(b.remaining()))
174            .expect("Once must check size_hint before it got polled")
175    }
176}
177
178pin_project! {
179    pub struct Either<L, R> {
180        #[pin]
181        inner: EitherInner<L, R>
182    }
183}
184
185pin_project! {
186    #[project = EitherProj]
187    enum EitherInner<L, R> {
188        L {
189            #[pin]
190            inner: L
191        },
192        R {
193            #[pin]
194            inner: R
195        }
196    }
197}
198
199impl<L, R> Either<L, R> {
200    #[inline]
201    pub const fn left(inner: L) -> Self {
202        Self {
203            inner: EitherInner::L { inner },
204        }
205    }
206
207    #[inline]
208    pub const fn right(inner: R) -> Self {
209        Self {
210            inner: EitherInner::R { inner },
211        }
212    }
213}
214
215impl<L, R, T, E, E2> Stream for Either<L, R>
216where
217    L: Stream<Item = Result<T, E>>,
218    R: Stream<Item = Result<T, E2>>,
219    E2: From<E>,
220{
221    type Item = Result<T, E2>;
222
223    #[inline]
224    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
225        match self.project().inner.project() {
226            EitherProj::L { inner } => inner.poll_next(cx).map(|res| res.map(|res| res.map_err(Into::into))),
227            EitherProj::R { inner } => inner.poll_next(cx),
228        }
229    }
230
231    #[inline]
232    fn size_hint(&self) -> (usize, Option<usize>) {
233        match self.inner {
234            EitherInner::L { ref inner } => inner.size_hint(),
235            EitherInner::R { ref inner } => inner.size_hint(),
236        }
237    }
238}
239
240/// type erased stream body.
241pub struct BoxBody(LocalBoxStream<'static, Result<Bytes, BodyError>>);
242
243impl Default for BoxBody {
244    fn default() -> Self {
245        Self::new(NoneBody::<Bytes>::default())
246    }
247}
248
249impl BoxBody {
250    #[inline]
251    pub fn new<B, T, E>(body: B) -> Self
252    where
253        B: Stream<Item = Result<T, E>> + 'static,
254        T: Into<Bytes>,
255        E: Into<BodyError>,
256    {
257        pin_project! {
258            struct MapStream<B> {
259                #[pin]
260                body: B
261            }
262        }
263
264        impl<B, T, E> Stream for MapStream<B>
265        where
266            B: Stream<Item = Result<T, E>>,
267            T: Into<Bytes>,
268            E: Into<BodyError>,
269        {
270            type Item = Result<Bytes, BodyError>;
271
272            #[inline]
273            fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
274                self.project().body.poll_next(cx).map_ok(Into::into).map_err(Into::into)
275            }
276
277            #[inline]
278            fn size_hint(&self) -> (usize, Option<usize>) {
279                self.body.size_hint()
280            }
281        }
282
283        Self(Box::pin(MapStream { body }))
284    }
285}
286
287impl Stream for BoxBody {
288    type Item = Result<Bytes, BodyError>;
289
290    #[inline]
291    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
292        self.get_mut().0.as_mut().poll_next(cx)
293    }
294
295    #[inline]
296    fn size_hint(&self) -> (usize, Option<usize>) {
297        self.0.size_hint()
298    }
299}
300
301pin_project! {
302    /// A unified response body type.
303    /// Generic type is for custom pinned response body(type implement [Stream](futures_core::Stream)).
304    pub struct ResponseBody<B = BoxBody> {
305        #[pin]
306        inner: ResponseBodyInner<B>
307    }
308}
309
310pin_project! {
311    #[project = ResponseBodyProj]
312    #[project_replace = ResponseBodyProjReplace]
313    enum ResponseBodyInner<B> {
314        None,
315        Bytes {
316            bytes: Bytes,
317        },
318        Stream {
319            #[pin]
320            stream: B,
321        },
322    }
323}
324
325impl<B> Default for ResponseBody<B> {
326    fn default() -> Self {
327        Self::none()
328    }
329}
330
331impl ResponseBody {
332    /// Construct a new Stream variant of ResponseBody with default type as [BoxBody]
333    #[inline]
334    pub fn box_stream<B, T, E>(stream: B) -> Self
335    where
336        B: Stream<Item = Result<T, E>> + 'static,
337        T: Into<Bytes>,
338        E: Into<BodyError>,
339    {
340        Self::stream(BoxBody::new(stream))
341    }
342}
343
344impl<B> ResponseBody<B> {
345    /// indicate no body is attached to response.
346    /// `content-length` and `transfer-encoding` headers would not be added to
347    /// response when [BodySize] is used for inferring response body type.
348    #[inline]
349    pub const fn none() -> Self {
350        Self {
351            inner: ResponseBodyInner::None,
352        }
353    }
354
355    /// indicate empty body is attached to response.
356    /// `content-length: 0` header would be added to response when [BodySize] is
357    /// used for inferring response body type.
358    #[inline]
359    pub const fn empty() -> Self {
360        Self {
361            inner: ResponseBodyInner::Bytes { bytes: Bytes::new() },
362        }
363    }
364
365    /// Construct a new Stream variant of ResponseBody
366    #[inline]
367    pub const fn stream(stream: B) -> Self {
368        Self {
369            inner: ResponseBodyInner::Stream { stream },
370        }
371    }
372
373    /// Construct a new Bytes variant of ResponseBody
374    #[inline]
375    pub fn bytes<B2>(bytes: B2) -> Self
376    where
377        Bytes: From<B2>,
378    {
379        Self {
380            inner: ResponseBodyInner::Bytes {
381                bytes: Bytes::from(bytes),
382            },
383        }
384    }
385
386    /// erase generic body type by boxing the variant.
387    #[inline]
388    pub fn into_boxed<T, E>(self) -> ResponseBody
389    where
390        B: Stream<Item = Result<T, E>> + 'static,
391        T: Into<Bytes>,
392        E: error::Error + Send + Sync + 'static,
393    {
394        match self.inner {
395            ResponseBodyInner::None => ResponseBody::none(),
396            ResponseBodyInner::Bytes { bytes } => ResponseBody::bytes(bytes),
397            ResponseBodyInner::Stream { stream } => ResponseBody::box_stream(stream),
398        }
399    }
400}
401
402impl<B, E> Stream for ResponseBody<B>
403where
404    B: Stream<Item = Result<Bytes, E>>,
405{
406    type Item = Result<Bytes, E>;
407
408    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
409        let mut inner = self.project().inner;
410        match inner.as_mut().project() {
411            ResponseBodyProj::None => Poll::Ready(None),
412            ResponseBodyProj::Bytes { .. } => match inner.project_replace(ResponseBodyInner::None) {
413                ResponseBodyProjReplace::Bytes { bytes } => Poll::Ready(Some(Ok(bytes))),
414                _ => unreachable!(),
415            },
416            ResponseBodyProj::Stream { stream } => stream.poll_next(cx),
417        }
418    }
419
420    fn size_hint(&self) -> (usize, Option<usize>) {
421        match self.inner {
422            ResponseBodyInner::None => none_body_hint(),
423            ResponseBodyInner::Bytes { ref bytes } => exact_body_hint(bytes.len()),
424            ResponseBodyInner::Stream { ref stream } => stream.size_hint(),
425        }
426    }
427}
428
429impl<B> From<NoneBody<B>> for ResponseBody {
430    fn from(_: NoneBody<B>) -> Self {
431        ResponseBody::none()
432    }
433}
434
435impl<B> From<Once<B>> for ResponseBody
436where
437    B: Into<Bytes>,
438{
439    fn from(once: Once<B>) -> Self {
440        ResponseBody::bytes(once.0.map(Into::into).unwrap_or_default())
441    }
442}
443
444impl From<BoxBody> for ResponseBody {
445    fn from(stream: BoxBody) -> Self {
446        Self::stream(stream)
447    }
448}
449
450macro_rules! res_bytes_impl {
451    ($ty: ty) => {
452        impl<B> From<$ty> for ResponseBody<B> {
453            fn from(item: $ty) -> Self {
454                Self::bytes(item)
455            }
456        }
457    };
458}
459
460res_bytes_impl!(Bytes);
461res_bytes_impl!(BytesMut);
462res_bytes_impl!(&'static [u8]);
463res_bytes_impl!(&'static str);
464res_bytes_impl!(Box<[u8]>);
465res_bytes_impl!(Vec<u8>);
466res_bytes_impl!(String);
467
468impl<B> From<Box<str>> for ResponseBody<B> {
469    fn from(str: Box<str>) -> Self {
470        Self::from(Box::<[u8]>::from(str))
471    }
472}
473
474impl<B> From<Cow<'static, str>> for ResponseBody<B> {
475    fn from(str: Cow<'static, str>) -> Self {
476        match str {
477            Cow::Owned(str) => Self::from(str),
478            Cow::Borrowed(str) => Self::from(str),
479        }
480    }
481}
482
483/// Body size hint.
484#[derive(Copy, Clone, Debug, Eq, PartialEq)]
485pub enum BodySize {
486    /// Absence of body can be assumed from method or status code.
487    ///
488    /// Will skip writing Content-Length header.
489    None,
490    /// Known size body.
491    ///
492    /// Will write `Content-Length: N` header.
493    Sized(usize),
494    /// Unknown size body.
495    ///
496    /// Will not write Content-Length header. Can be used with chunked Transfer-Encoding.
497    Stream,
498}
499
500impl BodySize {
501    #[inline]
502    pub fn from_stream<S>(stream: &S) -> Self
503    where
504        S: Stream,
505    {
506        match stream.size_hint() {
507            NONE_BODY_HINT => Self::None,
508            (_, Some(size)) => Self::Sized(size),
509            (_, None) => Self::Stream,
510        }
511    }
512}
513
514#[cfg(test)]
515mod test {
516    use super::*;
517
518    #[test]
519    fn stream_body_size_hint() {
520        let body = BoxBody::new(Once::new(Bytes::new()));
521        assert_eq!(BodySize::from_stream(&body), BodySize::Sized(0));
522
523        let body = BoxBody::new(NoneBody::<Bytes>::default());
524        assert_eq!(BodySize::from_stream(&body), BodySize::None);
525    }
526}