xitca_web/handler/types/
body.rs

1//! type extractor for request body stream.
2
3use core::{cmp, convert::Infallible, future::poll_fn, pin::pin};
4
5use crate::{
6    body::{BodyStream, BoxBody, ResponseBody},
7    bytes::{Bytes, BytesMut},
8    context::WebContext,
9    error::{BodyOverFlow, Error},
10    handler::{FromRequest, Responder},
11    http::{IntoResponse, WebResponse},
12};
13
14use super::header::{self, HeaderRef};
15
16pub struct Body<B>(pub B);
17
18impl<'a, 'r, C, B> FromRequest<'a, WebContext<'r, C, B>> for Body<B>
19where
20    B: BodyStream + Default,
21{
22    type Type<'b> = Body<B>;
23    type Error = Error;
24
25    #[inline]
26    async fn from_request(ctx: &'a WebContext<'r, C, B>) -> Result<Self, Self::Error> {
27        Ok(Body(ctx.take_body_ref()))
28    }
29}
30
31/// helper type for limiting body size.
32/// when LIMIT > 0 body size is limited to LIMIT in bytes.
33/// when LIMIT == 0 body size is unlimited.
34pub struct Limit<const LIMIT: usize>;
35
36macro_rules! from_bytes_impl {
37    ($type: ty, $original: tt) => {
38        impl<'a, 'r, C, B, const LIMIT: usize> FromRequest<'a, WebContext<'r, C, B>> for ($type, Limit<LIMIT>)
39        where
40            B: BodyStream + Default,
41        {
42            type Type<'b> = ($type, Limit<LIMIT>);
43            type Error = Error;
44
45            async fn from_request(ctx: &'a WebContext<'r, C, B>) -> Result<Self, Self::Error> {
46                let limit = HeaderRef::<'a, { header::CONTENT_LENGTH }>::from_request(ctx)
47                    .await
48                    .ok()
49                    .and_then(|header| header.to_str().ok().and_then(|s| s.parse().ok()))
50                    // when content length is 0 the http library should be producing an immediate
51                    // yielding streaming body which result in an empty body collection type.
52                    .map(|len| cmp::min(len, LIMIT))
53                    .unwrap_or_else(|| LIMIT);
54
55                let body = ctx.take_body_ref();
56
57                let mut body = pin!(body);
58
59                let mut buf = <$type>::new();
60
61                while let Some(chunk) = poll_fn(|cx| body.as_mut().poll_next(cx)).await {
62                    let chunk = chunk.map_err(Into::into)?;
63                    buf.extend_from_slice(chunk.as_ref());
64                    if limit > 0 && buf.len() > limit {
65                        return Err(Error::from(BodyOverFlow { limit }));
66                    }
67                }
68
69                Ok((buf, Limit))
70            }
71        }
72
73        from_bytes_impl!($type);
74    };
75    ($type: ty) => {
76        impl<'a, 'r, C, B> FromRequest<'a, WebContext<'r, C, B>> for $type
77        where
78            B: BodyStream + Default,
79        {
80            type Type<'b> = $type;
81            type Error = Error;
82
83            #[inline]
84            async fn from_request(ctx: &'a WebContext<'r, C, B>) -> Result<Self, Self::Error> {
85                <($type, Limit<0>)>::from_request(ctx)
86                    .await
87                    .map(|(bytes, _)| bytes)
88            }
89        }
90    };
91}
92
93from_bytes_impl!(BytesMut, _);
94from_bytes_impl!(Vec<u8>, _);
95
96impl<'a, 'r, C, B, const LIMIT: usize> FromRequest<'a, WebContext<'r, C, B>> for (Bytes, Limit<LIMIT>)
97where
98    B: BodyStream + Default,
99{
100    type Type<'b> = (Bytes, Limit<LIMIT>);
101    type Error = Error;
102
103    #[inline]
104    async fn from_request(ctx: &'a WebContext<'r, C, B>) -> Result<Self, Self::Error> {
105        <(BytesMut, Limit<LIMIT>)>::from_request(ctx)
106            .await
107            .map(|(bytes, limit)| (bytes.into(), limit))
108    }
109}
110
111from_bytes_impl!(Bytes);
112
113macro_rules! responder_impl {
114    ($type: ty) => {
115        impl<'r, C, B> Responder<WebContext<'r, C, B>> for $type {
116            type Response = WebResponse;
117            type Error = Infallible;
118
119            #[inline]
120            async fn respond(self, ctx: WebContext<'r, C, B>) -> Result<Self::Response, Self::Error> {
121                Ok(ctx.into_response(self))
122            }
123
124            #[inline]
125            fn map(self, res: Self::Response) -> Result<Self::Response, Self::Error> {
126                Ok(res.map(|_| self.into()))
127            }
128        }
129    };
130}
131
132responder_impl!(Bytes);
133responder_impl!(BytesMut);
134responder_impl!(Vec<u8>);
135
136impl<'r, C, B, ResB> Responder<WebContext<'r, C, B>> for ResponseBody<ResB> {
137    type Response = WebResponse<ResponseBody<ResB>>;
138    type Error = Error;
139
140    #[inline]
141    async fn respond(self, ctx: WebContext<'r, C, B>) -> Result<Self::Response, Self::Error> {
142        Ok(ctx.req.as_response(self))
143    }
144
145    #[inline]
146    fn map(self, res: Self::Response) -> Result<Self::Response, Self::Error> {
147        Ok(res.map(|_| self))
148    }
149}
150
151impl<'r, C, B> Responder<WebContext<'r, C, B>> for BoxBody {
152    type Response = WebResponse;
153    type Error = Error;
154
155    #[inline]
156    async fn respond(self, ctx: WebContext<'r, C, B>) -> Result<Self::Response, Self::Error> {
157        ResponseBody::stream(self).respond(ctx).await
158    }
159
160    #[inline]
161    fn map(self, res: Self::Response) -> Result<Self::Response, Self::Error> {
162        Responder::<WebContext<'r, C, B>>::map(ResponseBody::stream(self), res)
163    }
164}