axol_http/
body.rs

1use std::{
2    fmt,
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use futures::{Stream, StreamExt, TryStreamExt};
9use http_body::SizeHint;
10
11use crate::header::HeaderMap;
12
13#[derive(Debug)]
14pub enum BodyComponent {
15    Data(Bytes),
16    Trailers(HeaderMap),
17}
18
19//TODO: docs
20pub enum Body {
21    Bytes(Vec<u8>),
22    Stream {
23        size_hint: Option<usize>,
24        stream: BodyStream,
25    },
26}
27
28pub type BodyStream =
29    Pin<Box<dyn Stream<Item = Result<BodyComponent, anyhow::Error>> + Send + Sync + 'static>>;
30
31/// Wrapper over `Body` that implements `http_body::Body`
32pub struct BodyWrapper {
33    body: Body,
34    has_written_blob: bool,
35    bytes_streamed: usize,
36    pending_trailers: Option<HeaderMap>,
37    eof: bool,
38}
39
40impl From<Body> for BodyWrapper {
41    fn from(body: Body) -> Self {
42        Self {
43            body,
44            has_written_blob: false,
45            bytes_streamed: 0,
46            pending_trailers: None,
47            eof: false,
48        }
49    }
50}
51
52impl Into<Body> for BodyWrapper {
53    fn into(self) -> Body {
54        self.body
55    }
56}
57
58impl Body {
59    pub fn new() -> Self {
60        Self::default()
61    }
62
63    pub fn empty() -> Self {
64        Self::default()
65    }
66
67    pub fn bytes_and_trailers(bytes: Vec<u8>, trailers: HeaderMap) -> Self {
68        Body::Stream {
69            size_hint: Some(bytes.len()),
70            stream: Box::pin(futures::stream::iter([
71                Ok(BodyComponent::Data(bytes.into())),
72                Ok(BodyComponent::Trailers(trailers)),
73            ])),
74        }
75    }
76
77    pub fn trailers(trailers: HeaderMap) -> Self {
78        Body::Stream {
79            size_hint: Some(0),
80            stream: Box::pin(futures::stream::once(async move {
81                Ok(BodyComponent::Trailers(trailers))
82            })),
83        }
84    }
85
86    pub async fn collect(self) -> Result<Vec<u8>, anyhow::Error> {
87        match self {
88            Body::Bytes(x) => Ok(x),
89            Body::Stream {
90                size_hint,
91                mut stream,
92            } => {
93                let mut out = Vec::with_capacity(size_hint.unwrap_or_default());
94                while let Some(component) = stream.next().await.transpose()? {
95                    match component {
96                        BodyComponent::Data(data) => {
97                            out.extend_from_slice(&data[..]);
98                        }
99                        BodyComponent::Trailers(_) => (),
100                    }
101                }
102                Ok(out)
103            }
104        }
105    }
106
107    pub fn into_stream(
108        self,
109    ) -> Pin<Box<dyn Stream<Item = Result<BodyComponent, anyhow::Error>> + Send + Sync + 'static>>
110    {
111        match self {
112            Body::Bytes(bytes) => Box::pin(futures::stream::once(async move {
113                Ok(BodyComponent::Data(bytes.into()))
114            })),
115            Body::Stream {
116                size_hint: _,
117                stream,
118            } => stream,
119        }
120    }
121}
122
123impl Into<Body> for Vec<u8> {
124    fn into(self) -> Body {
125        Body::Bytes(self)
126    }
127}
128
129impl Into<Body> for String {
130    fn into(self) -> Body {
131        Body::Bytes(self.into_bytes())
132    }
133}
134
135impl Into<Body> for &str {
136    fn into(self) -> Body {
137        Body::Bytes(self.as_bytes().to_vec())
138    }
139}
140
141impl Into<Body> for () {
142    fn into(self) -> Body {
143        Body::Bytes(vec![])
144    }
145}
146
147impl From<BodyStream> for Body {
148    fn from(stream: BodyStream) -> Self {
149        Self::Stream {
150            size_hint: None,
151            stream,
152        }
153    }
154}
155
156impl From<Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send + Sync + 'static>>>
157    for Body
158{
159    fn from(
160        value: Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send + Sync + 'static>>,
161    ) -> Self {
162        Self::Stream {
163            size_hint: None,
164            stream: Box::pin(value.map_ok(|x| BodyComponent::Data(x))),
165        }
166    }
167}
168
169impl fmt::Debug for Body {
170    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
171        match self {
172            Self::Bytes(arg0) => f.debug_tuple("Bytes").field(arg0).finish(),
173            Self::Stream { size_hint, .. } => f.debug_tuple("Stream").field(size_hint).finish(),
174        }
175    }
176}
177
178impl Default for Body {
179    fn default() -> Self {
180        Body::Bytes(vec![])
181    }
182}
183
184impl http_body::Body for BodyWrapper {
185    type Data = Bytes;
186
187    type Error = anyhow::Error;
188
189    fn poll_data(
190        mut self: Pin<&mut Self>,
191        cx: &mut Context<'_>,
192    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
193        match &mut self.body {
194            Body::Bytes(bytes) => {
195                let bytes = std::mem::take(bytes);
196                if self.has_written_blob {
197                    return Poll::Ready(None);
198                }
199                self.eof = true;
200                self.has_written_blob = true;
201                Poll::Ready(Some(Ok(bytes.into())))
202            }
203            Body::Stream {
204                size_hint: _,
205                stream,
206            } => match stream.poll_next_unpin(cx) {
207                Poll::Pending => Poll::Pending,
208                Poll::Ready(None) => {
209                    self.eof = true;
210                    Poll::Ready(None)
211                }
212                Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
213                Poll::Ready(Some(Ok(BodyComponent::Data(data)))) => {
214                    self.bytes_streamed += data.len();
215                    Poll::Ready(Some(Ok(data)))
216                }
217                Poll::Ready(Some(Ok(BodyComponent::Trailers(trailers)))) => {
218                    self.pending_trailers = Some(trailers);
219                    Poll::Ready(None)
220                }
221            },
222        }
223    }
224
225    fn poll_trailers(
226        mut self: Pin<&mut Self>,
227        _cx: &mut Context<'_>,
228    ) -> Poll<Result<Option<headers::HeaderMap>, Self::Error>> {
229        self.eof = true;
230        if let Some(pending_trailers) = self.pending_trailers.take() {
231            Poll::Ready(Ok(Some(pending_trailers.into())))
232        } else {
233            Poll::Ready(Ok(None))
234        }
235    }
236
237    fn is_end_stream(&self) -> bool {
238        self.eof
239    }
240
241    fn size_hint(&self) -> SizeHint {
242        if self.eof {
243            return SizeHint::with_exact(0);
244        }
245        match &self.body {
246            Body::Bytes(bytes) => SizeHint::with_exact(bytes.len() as u64),
247            Body::Stream {
248                size_hint,
249                stream: _,
250            } => size_hint
251                .map(|x| SizeHint::with_exact(x as u64))
252                .unwrap_or_default(),
253        }
254    }
255}