rama_http_types/
body.rs

1//! HTTP body utilities.
2
3use crate::dep::{
4    http_body::{self, Body as _, Frame},
5    http_body_util::{self, BodyExt},
6};
7use bytes::Bytes;
8use futures_core::TryStream;
9use futures_lite::stream::Stream;
10use pin_project_lite::pin_project;
11use rama_error::{BoxError, OpaqueError};
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use sync_wrapper::SyncWrapper;
15
16type BoxBody = http_body_util::combinators::BoxBody<Bytes, BoxError>;
17
18fn boxed<B>(body: B) -> BoxBody
19where
20    B: http_body::Body<Data = Bytes, Error: Into<BoxError>> + Send + Sync + 'static,
21{
22    try_downcast(body).unwrap_or_else(|body| body.map_err(Into::into).boxed())
23}
24
25pub(crate) fn try_downcast<T, K>(k: K) -> Result<T, K>
26where
27    T: 'static,
28    K: Send + 'static,
29{
30    let mut k = Some(k);
31    if let Some(k) = <dyn std::any::Any>::downcast_mut::<Option<T>>(&mut k) {
32        Ok(k.take().unwrap())
33    } else {
34        Err(k.unwrap())
35    }
36}
37
38/// The body type used in rama requests and responses.
39#[derive(Debug)]
40pub struct Body(BoxBody);
41
42impl Body {
43    /// Create a new `Body` that wraps another [`http_body::Body`].
44    pub fn new<B>(body: B) -> Self
45    where
46        B: http_body::Body<Data = Bytes, Error: Into<BoxError>> + Send + Sync + 'static,
47    {
48        try_downcast(body).unwrap_or_else(|body| Self(boxed(body)))
49    }
50
51    /// Create a new `Body` with a maximum size limit.
52    pub fn with_limit<B>(body: B, limit: usize) -> Self
53    where
54        B: http_body::Body<Data = Bytes, Error: Into<BoxError>> + Send + Sync + 'static,
55    {
56        Self::new(crate::dep::http_body_util::Limited::new(body, limit))
57    }
58
59    /// Create an empty body.
60    pub fn empty() -> Self {
61        Self::new(http_body_util::Empty::new())
62    }
63
64    /// Create a new `Body` from a [`Stream`].
65    ///
66    /// [`Stream`]: https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html
67    pub fn from_stream<S>(stream: S) -> Self
68    where
69        S: TryStream<Ok: Into<Bytes>, Error: Into<BoxError>> + Send + 'static,
70    {
71        Self::new(StreamBody {
72            stream: SyncWrapper::new(stream),
73        })
74    }
75
76    /// Create a new [`Body`] from a [`Stream`] with a maximum size limit.
77    pub fn limited(self, limit: usize) -> Self {
78        Self::new(crate::dep::http_body_util::Limited::new(self.0, limit))
79    }
80
81    /// Convert the body into a [`Stream`] of data frames.
82    ///
83    /// Non-data frames (such as trailers) will be discarded. Use [`http_body_util::BodyStream`] if
84    /// you need a [`Stream`] of all frame types.
85    ///
86    /// [`http_body_util::BodyStream`]: https://docs.rs/http-body-util/latest/http_body_util/struct.BodyStream.html
87    pub fn into_data_stream(self) -> BodyDataStream {
88        BodyDataStream { inner: self }
89    }
90}
91
92impl Default for Body {
93    fn default() -> Self {
94        Self::empty()
95    }
96}
97
98impl From<()> for Body {
99    fn from(_: ()) -> Self {
100        Self::empty()
101    }
102}
103
104macro_rules! body_from_impl {
105    ($ty:ty) => {
106        impl From<$ty> for Body {
107            fn from(buf: $ty) -> Self {
108                Self::new(http_body_util::Full::from(buf))
109            }
110        }
111    };
112}
113
114body_from_impl!(&'static [u8]);
115body_from_impl!(std::borrow::Cow<'static, [u8]>);
116body_from_impl!(Vec<u8>);
117
118body_from_impl!(&'static str);
119body_from_impl!(std::borrow::Cow<'static, str>);
120body_from_impl!(String);
121
122body_from_impl!(Bytes);
123
124impl http_body::Body for Body {
125    type Data = Bytes;
126    type Error = OpaqueError;
127
128    #[inline]
129    fn poll_frame(
130        mut self: Pin<&mut Self>,
131        cx: &mut Context<'_>,
132    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
133        Pin::new(&mut self.0)
134            .poll_frame(cx)
135            .map_err(OpaqueError::from_boxed)
136    }
137
138    #[inline]
139    fn size_hint(&self) -> http_body::SizeHint {
140        self.0.size_hint()
141    }
142
143    #[inline]
144    fn is_end_stream(&self) -> bool {
145        self.0.is_end_stream()
146    }
147}
148
149/// A stream of data frames.
150///
151/// Created with [`Body::into_data_stream`].
152#[derive(Debug)]
153pub struct BodyDataStream {
154    inner: Body,
155}
156
157impl Stream for BodyDataStream {
158    type Item = Result<Bytes, BoxError>;
159
160    #[inline]
161    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
162        loop {
163            match futures_lite::ready!(Pin::new(&mut self.inner).poll_frame(cx)?) {
164                Some(frame) => match frame.into_data() {
165                    Ok(data) => return Poll::Ready(Some(Ok(data))),
166                    Err(_frame) => {}
167                },
168                None => return Poll::Ready(None),
169            }
170        }
171    }
172}
173
174impl http_body::Body for BodyDataStream {
175    type Data = Bytes;
176    type Error = BoxError;
177
178    #[inline]
179    fn poll_frame(
180        mut self: Pin<&mut Self>,
181        cx: &mut Context<'_>,
182    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
183        Pin::new(&mut self.inner).poll_frame(cx).map_err(Into::into)
184    }
185
186    #[inline]
187    fn is_end_stream(&self) -> bool {
188        self.inner.is_end_stream()
189    }
190
191    #[inline]
192    fn size_hint(&self) -> http_body::SizeHint {
193        self.inner.size_hint()
194    }
195}
196
197pin_project! {
198    struct StreamBody<S> {
199        #[pin]
200        stream: SyncWrapper<S>,
201    }
202}
203
204impl<S> http_body::Body for StreamBody<S>
205where
206    S: TryStream<Ok: Into<Bytes>, Error: Into<BoxError>>,
207{
208    type Data = Bytes;
209    type Error = BoxError;
210
211    fn poll_frame(
212        self: Pin<&mut Self>,
213        cx: &mut Context<'_>,
214    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
215        let stream = self.project().stream.get_pin_mut();
216        match futures_lite::ready!(stream.try_poll_next(cx)) {
217            Some(Ok(chunk)) => Poll::Ready(Some(Ok(Frame::data(chunk.into())))),
218            Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
219            None => Poll::Ready(None),
220        }
221    }
222}
223
224#[test]
225fn test_try_downcast() {
226    assert_eq!(try_downcast::<i32, _>(5_u32), Err(5_u32));
227    assert_eq!(try_downcast::<i32, _>(5_i32), Ok(5_i32));
228}