satex_core/
body.rs

1//! HTTP body utilities.
2
3use crate::util::try_downcast;
4use crate::{BoxError, Error};
5use bytes::Bytes;
6use futures_util::stream::Stream;
7use futures_util::TryStream;
8use http_body::{Body as _, Frame};
9use http_body_util::BodyExt;
10use pin_project_lite::pin_project;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13use sync_wrapper::SyncWrapper;
14
15type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, Error>;
16
17fn boxed<B>(body: B) -> BoxBody
18where
19    B: http_body::Body<Data = Bytes> + Send + 'static,
20    B::Error: Into<BoxError>,
21{
22    try_downcast(body).unwrap_or_else(|body| body.map_err(|e| Error::new(e.into())).boxed_unsync())
23}
24
25/// The body type used in axum requests and responses.
26#[derive(Debug)]
27pub struct Body(BoxBody);
28
29impl Body {
30    /// Create a new `Body` that wraps another [`http_body::Body`].
31    pub fn new<B>(body: B) -> Self
32    where
33        B: http_body::Body<Data = Bytes> + Send + 'static,
34        B::Error: Into<BoxError>,
35    {
36        try_downcast(body).unwrap_or_else(|body| Self(boxed(body)))
37    }
38
39    /// Create an empty body.
40    pub fn empty() -> Self {
41        Self::new(http_body_util::Empty::new())
42    }
43
44    /// Create a new `Body` from a [`Stream`].
45    ///
46    /// [`Stream`]: https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html
47    pub fn from_stream<S>(stream: S) -> Self
48    where
49        S: TryStream + Send + 'static,
50        S::Ok: Into<Bytes>,
51        S::Error: Into<BoxError>,
52    {
53        Self::new(StreamBody {
54            stream: SyncWrapper::new(stream),
55        })
56    }
57
58    /// Convert the body into a [`Stream`] of data frames.
59    ///
60    /// Non-data frames (such as trailers) will be discarded. Use [`http_body_util::BodyStream`] if
61    /// you need a [`Stream`] of all frame types.
62    ///
63    /// [`http_body_util::BodyStream`]: https://docs.rs/http-body-util/latest/http_body_util/struct.BodyStream.html
64    pub fn into_data_stream(self) -> BodyDataStream {
65        BodyDataStream { inner: self }
66    }
67}
68
69impl Default for Body {
70    fn default() -> Self {
71        Self::empty()
72    }
73}
74
75impl From<()> for Body {
76    fn from(_: ()) -> Self {
77        Self::empty()
78    }
79}
80
81macro_rules! body_from_impl {
82    ($ty:ty) => {
83        impl From<$ty> for Body {
84            fn from(buf: $ty) -> Self {
85                Self::new(http_body_util::Full::from(buf))
86            }
87        }
88    };
89}
90
91body_from_impl!(&'static [u8]);
92body_from_impl!(std::borrow::Cow<'static, [u8]>);
93body_from_impl!(Vec<u8>);
94
95body_from_impl!(&'static str);
96body_from_impl!(std::borrow::Cow<'static, str>);
97body_from_impl!(String);
98
99body_from_impl!(Bytes);
100
101impl http_body::Body for Body {
102    type Data = Bytes;
103    type Error = Error;
104
105    #[inline]
106    fn poll_frame(
107        mut self: Pin<&mut Self>,
108        cx: &mut Context<'_>,
109    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
110        Pin::new(&mut self.0).poll_frame(cx)
111    }
112
113    #[inline]
114    fn is_end_stream(&self) -> bool {
115        self.0.is_end_stream()
116    }
117
118    #[inline]
119    fn size_hint(&self) -> http_body::SizeHint {
120        self.0.size_hint()
121    }
122}
123
124/// A stream of data frames.
125///
126/// Created with [`Body::into_data_stream`].
127#[derive(Debug)]
128pub struct BodyDataStream {
129    inner: Body,
130}
131
132impl Stream for BodyDataStream {
133    type Item = Result<Bytes, Error>;
134
135    #[inline]
136    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
137        loop {
138            match futures_util::ready!(Pin::new(&mut self.inner).poll_frame(cx)?) {
139                Some(frame) => match frame.into_data() {
140                    Ok(data) => return Poll::Ready(Some(Ok(data))),
141                    Err(_frame) => {}
142                },
143                None => return Poll::Ready(None),
144            }
145        }
146    }
147}
148
149impl http_body::Body for BodyDataStream {
150    type Data = Bytes;
151    type Error = Error;
152
153    #[inline]
154    fn poll_frame(
155        mut self: Pin<&mut Self>,
156        cx: &mut Context<'_>,
157    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
158        Pin::new(&mut self.inner).poll_frame(cx)
159    }
160
161    #[inline]
162    fn is_end_stream(&self) -> bool {
163        self.inner.is_end_stream()
164    }
165
166    #[inline]
167    fn size_hint(&self) -> http_body::SizeHint {
168        self.inner.size_hint()
169    }
170}
171
172pin_project! {
173    struct StreamBody<S> {
174        #[pin]
175        stream: SyncWrapper<S>,
176    }
177}
178
179impl<S> http_body::Body for StreamBody<S>
180where
181    S: TryStream,
182    S::Ok: Into<Bytes>,
183    S::Error: Into<BoxError>,
184{
185    type Data = Bytes;
186    type Error = Error;
187
188    fn poll_frame(
189        self: Pin<&mut Self>,
190        cx: &mut Context<'_>,
191    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
192        let stream = self.project().stream.get_pin_mut();
193        match futures_util::ready!(stream.try_poll_next(cx)) {
194            Some(Ok(chunk)) => Poll::Ready(Some(Ok(Frame::data(chunk.into())))),
195            Some(Err(e)) => Poll::Ready(Some(Err(Error::new(e.into())))),
196            None => Poll::Ready(None),
197        }
198    }
199}