1use core::{cmp, convert::Infallible, future::poll_fn, pin::pin};
4
5use crate::{
6 body::{BodyStream, BoxBody, ResponseBody},
7 bytes::{Bytes, BytesMut},
8 context::WebContext,
9 error::{BodyOverFlow, Error},
10 handler::{FromRequest, Responder},
11 http::{IntoResponse, WebResponse},
12};
13
14use super::header::{self, HeaderRef};
15
16pub struct Body<B>(pub B);
17
18impl<'a, 'r, C, B> FromRequest<'a, WebContext<'r, C, B>> for Body<B>
19where
20 B: BodyStream + Default,
21{
22 type Type<'b> = Body<B>;
23 type Error = Error;
24
25 #[inline]
26 async fn from_request(ctx: &'a WebContext<'r, C, B>) -> Result<Self, Self::Error> {
27 Ok(Body(ctx.take_body_ref()))
28 }
29}
30
31pub struct Limit<const LIMIT: usize>;
35
36macro_rules! from_bytes_impl {
37 ($type: ty, $original: tt) => {
38 impl<'a, 'r, C, B, const LIMIT: usize> FromRequest<'a, WebContext<'r, C, B>> for ($type, Limit<LIMIT>)
39 where
40 B: BodyStream + Default,
41 {
42 type Type<'b> = ($type, Limit<LIMIT>);
43 type Error = Error;
44
45 async fn from_request(ctx: &'a WebContext<'r, C, B>) -> Result<Self, Self::Error> {
46 let limit = HeaderRef::<'a, { header::CONTENT_LENGTH }>::from_request(ctx)
47 .await
48 .ok()
49 .and_then(|header| header.to_str().ok().and_then(|s| s.parse().ok()))
50 .map(|len| cmp::min(len, LIMIT))
53 .unwrap_or_else(|| LIMIT);
54
55 let body = ctx.take_body_ref();
56
57 let mut body = pin!(body);
58
59 let mut buf = <$type>::new();
60
61 while let Some(chunk) = poll_fn(|cx| body.as_mut().poll_next(cx)).await {
62 let chunk = chunk.map_err(Into::into)?;
63 buf.extend_from_slice(chunk.as_ref());
64 if limit > 0 && buf.len() > limit {
65 return Err(Error::from(BodyOverFlow { limit }));
66 }
67 }
68
69 Ok((buf, Limit))
70 }
71 }
72
73 from_bytes_impl!($type);
74 };
75 ($type: ty) => {
76 impl<'a, 'r, C, B> FromRequest<'a, WebContext<'r, C, B>> for $type
77 where
78 B: BodyStream + Default,
79 {
80 type Type<'b> = $type;
81 type Error = Error;
82
83 #[inline]
84 async fn from_request(ctx: &'a WebContext<'r, C, B>) -> Result<Self, Self::Error> {
85 <($type, Limit<0>)>::from_request(ctx)
86 .await
87 .map(|(bytes, _)| bytes)
88 }
89 }
90 };
91}
92
93from_bytes_impl!(BytesMut, _);
94from_bytes_impl!(Vec<u8>, _);
95
96impl<'a, 'r, C, B, const LIMIT: usize> FromRequest<'a, WebContext<'r, C, B>> for (Bytes, Limit<LIMIT>)
97where
98 B: BodyStream + Default,
99{
100 type Type<'b> = (Bytes, Limit<LIMIT>);
101 type Error = Error;
102
103 #[inline]
104 async fn from_request(ctx: &'a WebContext<'r, C, B>) -> Result<Self, Self::Error> {
105 <(BytesMut, Limit<LIMIT>)>::from_request(ctx)
106 .await
107 .map(|(bytes, limit)| (bytes.into(), limit))
108 }
109}
110
111from_bytes_impl!(Bytes);
112
113macro_rules! responder_impl {
114 ($type: ty) => {
115 impl<'r, C, B> Responder<WebContext<'r, C, B>> for $type {
116 type Response = WebResponse;
117 type Error = Infallible;
118
119 #[inline]
120 async fn respond(self, ctx: WebContext<'r, C, B>) -> Result<Self::Response, Self::Error> {
121 Ok(ctx.into_response(self))
122 }
123
124 #[inline]
125 fn map(self, res: Self::Response) -> Result<Self::Response, Self::Error> {
126 Ok(res.map(|_| self.into()))
127 }
128 }
129 };
130}
131
132responder_impl!(Bytes);
133responder_impl!(BytesMut);
134responder_impl!(Vec<u8>);
135
136impl<'r, C, B, ResB> Responder<WebContext<'r, C, B>> for ResponseBody<ResB> {
137 type Response = WebResponse<ResponseBody<ResB>>;
138 type Error = Error;
139
140 #[inline]
141 async fn respond(self, ctx: WebContext<'r, C, B>) -> Result<Self::Response, Self::Error> {
142 Ok(ctx.req.as_response(self))
143 }
144
145 #[inline]
146 fn map(self, res: Self::Response) -> Result<Self::Response, Self::Error> {
147 Ok(res.map(|_| self))
148 }
149}
150
151impl<'r, C, B> Responder<WebContext<'r, C, B>> for BoxBody {
152 type Response = WebResponse;
153 type Error = Error;
154
155 #[inline]
156 async fn respond(self, ctx: WebContext<'r, C, B>) -> Result<Self::Response, Self::Error> {
157 ResponseBody::stream(self).respond(ctx).await
158 }
159
160 #[inline]
161 fn map(self, res: Self::Response) -> Result<Self::Response, Self::Error> {
162 Responder::<WebContext<'r, C, B>>::map(ResponseBody::stream(self), res)
163 }
164}