boluo_core/
body.rs

1//! HTTP 主体。
2
3pub use bytes::Bytes;
4pub use http_body::{Body as HttpBody, Frame, SizeHint};
5
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use futures_core::{Stream, TryStream};
10use http_body_util::{BodyExt, Empty, Full};
11
12use crate::BoxError;
13
14type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, BoxError>;
15
16fn boxed<B>(body: B) -> BoxBody
17where
18    B: HttpBody<Data = Bytes> + Send + 'static,
19    B::Error: Into<BoxError>,
20{
21    crate::util::__try_downcast(body).unwrap_or_else(|body| body.map_err(Into::into).boxed_unsync())
22}
23
24/// 请求和响应的主体类型。
25#[derive(Debug)]
26pub struct Body(BoxBody);
27
28impl Body {
29    /// 创建一个新的 [`Body`],内部包装给定的 [`http_body::Body`] 对象。
30    pub fn new<B>(body: B) -> Self
31    where
32        B: HttpBody<Data = Bytes> + Send + 'static,
33        B::Error: Into<BoxError>,
34    {
35        crate::util::__try_downcast(body).unwrap_or_else(|body| Self(boxed(body)))
36    }
37
38    /// 创建一个空的 [`Body`]。
39    pub fn empty() -> Self {
40        Self::new(Empty::new())
41    }
42
43    /// 从 [`Stream`] 中创建一个新的 [`Body`]。
44    pub fn from_data_stream<S>(stream: S) -> Self
45    where
46        S: TryStream + Send + 'static,
47        S::Ok: Into<Bytes>,
48        S::Error: Into<BoxError>,
49    {
50        Self::new(StreamBody { stream })
51    }
52
53    /// 将 [`Body`] 的数据帧转换为 [`Stream`],非数据帧的部分将被丢弃。
54    pub fn into_data_stream(self) -> BodyDataStream {
55        BodyDataStream { inner: self }
56    }
57
58    /// 消耗此 [`Body`] 对象,将其所有数据收集并合并为单个 [`Bytes`] 缓冲区。
59    pub async fn to_bytes(self) -> Result<Bytes, BoxError> {
60        self.collect().await.map(|col| col.to_bytes())
61    }
62}
63
64impl Default for Body {
65    fn default() -> Self {
66        Self::empty()
67    }
68}
69
70impl From<()> for Body {
71    fn from(_: ()) -> Self {
72        Self::empty()
73    }
74}
75
76macro_rules! body_from_impl {
77    ($ty:ty) => {
78        impl From<$ty> for Body {
79            fn from(buf: $ty) -> Self {
80                Self::new(Full::from(buf))
81            }
82        }
83    };
84}
85
86body_from_impl!(&'static [u8]);
87body_from_impl!(std::borrow::Cow<'static, [u8]>);
88body_from_impl!(Vec<u8>);
89
90body_from_impl!(&'static str);
91body_from_impl!(std::borrow::Cow<'static, str>);
92body_from_impl!(String);
93
94body_from_impl!(Bytes);
95
96impl HttpBody for Body {
97    type Data = Bytes;
98    type Error = BoxError;
99
100    #[inline]
101    fn poll_frame(
102        mut self: Pin<&mut Self>,
103        cx: &mut Context<'_>,
104    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
105        Pin::new(&mut self.0).poll_frame(cx)
106    }
107
108    #[inline]
109    fn size_hint(&self) -> SizeHint {
110        self.0.size_hint()
111    }
112
113    #[inline]
114    fn is_end_stream(&self) -> bool {
115        self.0.is_end_stream()
116    }
117}
118
119/// [`Body`]的数据帧流。
120#[derive(Debug)]
121pub struct BodyDataStream {
122    inner: Body,
123}
124
125impl Stream for BodyDataStream {
126    type Item = Result<Bytes, BoxError>;
127
128    #[inline]
129    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
130        loop {
131            match futures_core::ready!(Pin::new(&mut self.inner).poll_frame(cx)?) {
132                Some(frame) => match frame.into_data() {
133                    Ok(data) => return Poll::Ready(Some(Ok(data))),
134                    Err(_frame) => {}
135                },
136                None => return Poll::Ready(None),
137            }
138        }
139    }
140}
141
142impl HttpBody for BodyDataStream {
143    type Data = Bytes;
144    type Error = BoxError;
145
146    #[inline]
147    fn poll_frame(
148        mut self: Pin<&mut Self>,
149        cx: &mut Context<'_>,
150    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
151        Pin::new(&mut self.inner).poll_frame(cx)
152    }
153
154    #[inline]
155    fn is_end_stream(&self) -> bool {
156        self.inner.is_end_stream()
157    }
158
159    #[inline]
160    fn size_hint(&self) -> SizeHint {
161        self.inner.size_hint()
162    }
163}
164
165pin_project_lite::pin_project! {
166    struct StreamBody<S> {
167        #[pin]
168        stream: S,
169    }
170}
171
172impl<S> HttpBody for StreamBody<S>
173where
174    S: TryStream,
175    S::Ok: Into<Bytes>,
176    S::Error: Into<BoxError>,
177{
178    type Data = Bytes;
179    type Error = BoxError;
180
181    fn poll_frame(
182        self: Pin<&mut Self>,
183        cx: &mut Context<'_>,
184    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
185        let this = self.project();
186        match futures_core::ready!(this.stream.try_poll_next(cx)) {
187            Some(Ok(chunk)) => Poll::Ready(Some(Ok(Frame::data(chunk.into())))),
188            Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
189            None => Poll::Ready(None),
190        }
191    }
192}