mincat_core/
body.rs

1use crate::error::{BoxError, Error};
2
3use bytes::Bytes;
4use futures_util::{Stream, TryStream};
5use http_body::{Body as _, Frame};
6use http_body_util::{combinators::UnsyncBoxBody, BodyExt, Empty, Full};
7use pin_project_lite::pin_project;
8use std::{
9    any::Any,
10    pin::Pin,
11    task::{Context, Poll},
12};
13use sync_wrapper::SyncWrapper;
14
15const BODY_LIMITED: usize = 1024 * 1204 * 2;
16
17#[derive(Debug, Default, Clone, Copy)]
18pub struct BodyLimitedSize(pub usize);
19
20impl BodyLimitedSize {
21    pub fn new() -> Self {
22        Self(BODY_LIMITED)
23    }
24}
25
26fn boxed<B>(body: B) -> UnsyncBoxBody<Bytes, Error>
27where
28    B: http_body::Body<Data = Bytes> + Send + 'static,
29    B::Error: Into<BoxError>,
30{
31    try_downcast(body).unwrap_or_else(|body| body.map_err(Error::new).boxed_unsync())
32}
33
34pub(crate) fn try_downcast<T, K>(k: K) -> Result<T, K>
35where
36    T: 'static,
37    K: Send + 'static,
38{
39    let mut k = Some(k);
40    if let Some(k) = <dyn Any>::downcast_mut::<Option<T>>(&mut k) {
41        Ok(k.take().unwrap())
42    } else {
43        Err(k.unwrap())
44    }
45}
46
47pub struct Body(UnsyncBoxBody<Bytes, Error>);
48
49impl Body {
50    pub fn new<B>(body: B) -> Self
51    where
52        B: http_body::Body<Data = Bytes> + Send + 'static,
53        B::Error: Into<BoxError>,
54    {
55        try_downcast(body).unwrap_or_else(|body| Self(boxed(body)))
56    }
57
58    pub fn empty() -> Self {
59        Self::new(Empty::new())
60    }
61
62    pub fn from_stream<S>(stream: S) -> Self
63    where
64        S: TryStream + Send + 'static,
65        S::Ok: Into<Bytes>,
66        S::Error: Into<BoxError>,
67    {
68        Body::new(StreamBody {
69            stream: SyncWrapper::new(stream),
70        })
71    }
72}
73
74impl http_body::Body for Body {
75    type Data = Bytes;
76    type Error = Error;
77
78    #[inline]
79    fn poll_frame(
80        mut self: Pin<&mut Self>,
81        cx: &mut Context<'_>,
82    ) -> std::task::Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
83        Pin::new(&mut self.0).poll_frame(cx)
84    }
85
86    #[inline]
87    fn size_hint(&self) -> http_body::SizeHint {
88        self.0.size_hint()
89    }
90
91    #[inline]
92    fn is_end_stream(&self) -> bool {
93        self.0.is_end_stream()
94    }
95}
96
97impl Stream for Body {
98    type Item = Result<Bytes, Error>;
99
100    #[inline]
101    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
102        loop {
103            match futures_util::ready!(Pin::new(&mut self.0).poll_frame(cx)?) {
104                Some(frame) => match frame.into_data() {
105                    Ok(data) => return Poll::Ready(Some(Ok(data))),
106                    Err(_frame) => {}
107                },
108                None => return Poll::Ready(None),
109            }
110        }
111    }
112}
113
114macro_rules! body_from_impl {
115    ($ty:ty) => {
116        impl From<$ty> for Body {
117            fn from(value: $ty) -> Self {
118                Self::new(Full::from(value))
119            }
120        }
121    };
122}
123
124body_from_impl!(&'static str);
125body_from_impl!(String);
126body_from_impl!(Bytes);
127
128pin_project! {
129    struct StreamBody<S> {
130        #[pin]
131        stream: SyncWrapper<S>,
132    }
133}
134
135impl<S> http_body::Body for StreamBody<S>
136where
137    S: TryStream,
138    S::Ok: Into<Bytes>,
139    S::Error: Into<BoxError>,
140{
141    type Data = Bytes;
142    type Error = Error;
143
144    fn poll_frame(
145        self: Pin<&mut Self>,
146        cx: &mut Context<'_>,
147    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
148        let stream = self.project().stream.get_pin_mut();
149        match futures_util::ready!(stream.try_poll_next(cx)) {
150            Some(Ok(chunk)) => Poll::Ready(Some(Ok(Frame::data(chunk.into())))),
151            Some(Err(err)) => Poll::Ready(Some(Err(Error::new(err)))),
152            None => Poll::Ready(None),
153        }
154    }
155}