1use std::{
2 fmt,
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use futures::{Stream, StreamExt, TryStreamExt};
9use http_body::SizeHint;
10
11use crate::header::HeaderMap;
12
13#[derive(Debug)]
14pub enum BodyComponent {
15 Data(Bytes),
16 Trailers(HeaderMap),
17}
18
19pub enum Body {
21 Bytes(Vec<u8>),
22 Stream {
23 size_hint: Option<usize>,
24 stream: BodyStream,
25 },
26}
27
28pub type BodyStream =
29 Pin<Box<dyn Stream<Item = Result<BodyComponent, anyhow::Error>> + Send + Sync + 'static>>;
30
31pub struct BodyWrapper {
33 body: Body,
34 has_written_blob: bool,
35 bytes_streamed: usize,
36 pending_trailers: Option<HeaderMap>,
37 eof: bool,
38}
39
40impl From<Body> for BodyWrapper {
41 fn from(body: Body) -> Self {
42 Self {
43 body,
44 has_written_blob: false,
45 bytes_streamed: 0,
46 pending_trailers: None,
47 eof: false,
48 }
49 }
50}
51
52impl Into<Body> for BodyWrapper {
53 fn into(self) -> Body {
54 self.body
55 }
56}
57
58impl Body {
59 pub fn new() -> Self {
60 Self::default()
61 }
62
63 pub fn empty() -> Self {
64 Self::default()
65 }
66
67 pub fn bytes_and_trailers(bytes: Vec<u8>, trailers: HeaderMap) -> Self {
68 Body::Stream {
69 size_hint: Some(bytes.len()),
70 stream: Box::pin(futures::stream::iter([
71 Ok(BodyComponent::Data(bytes.into())),
72 Ok(BodyComponent::Trailers(trailers)),
73 ])),
74 }
75 }
76
77 pub fn trailers(trailers: HeaderMap) -> Self {
78 Body::Stream {
79 size_hint: Some(0),
80 stream: Box::pin(futures::stream::once(async move {
81 Ok(BodyComponent::Trailers(trailers))
82 })),
83 }
84 }
85
86 pub async fn collect(self) -> Result<Vec<u8>, anyhow::Error> {
87 match self {
88 Body::Bytes(x) => Ok(x),
89 Body::Stream {
90 size_hint,
91 mut stream,
92 } => {
93 let mut out = Vec::with_capacity(size_hint.unwrap_or_default());
94 while let Some(component) = stream.next().await.transpose()? {
95 match component {
96 BodyComponent::Data(data) => {
97 out.extend_from_slice(&data[..]);
98 }
99 BodyComponent::Trailers(_) => (),
100 }
101 }
102 Ok(out)
103 }
104 }
105 }
106
107 pub fn into_stream(
108 self,
109 ) -> Pin<Box<dyn Stream<Item = Result<BodyComponent, anyhow::Error>> + Send + Sync + 'static>>
110 {
111 match self {
112 Body::Bytes(bytes) => Box::pin(futures::stream::once(async move {
113 Ok(BodyComponent::Data(bytes.into()))
114 })),
115 Body::Stream {
116 size_hint: _,
117 stream,
118 } => stream,
119 }
120 }
121}
122
123impl Into<Body> for Vec<u8> {
124 fn into(self) -> Body {
125 Body::Bytes(self)
126 }
127}
128
129impl Into<Body> for String {
130 fn into(self) -> Body {
131 Body::Bytes(self.into_bytes())
132 }
133}
134
135impl Into<Body> for &str {
136 fn into(self) -> Body {
137 Body::Bytes(self.as_bytes().to_vec())
138 }
139}
140
141impl Into<Body> for () {
142 fn into(self) -> Body {
143 Body::Bytes(vec![])
144 }
145}
146
147impl From<BodyStream> for Body {
148 fn from(stream: BodyStream) -> Self {
149 Self::Stream {
150 size_hint: None,
151 stream,
152 }
153 }
154}
155
156impl From<Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send + Sync + 'static>>>
157 for Body
158{
159 fn from(
160 value: Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send + Sync + 'static>>,
161 ) -> Self {
162 Self::Stream {
163 size_hint: None,
164 stream: Box::pin(value.map_ok(|x| BodyComponent::Data(x))),
165 }
166 }
167}
168
169impl fmt::Debug for Body {
170 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
171 match self {
172 Self::Bytes(arg0) => f.debug_tuple("Bytes").field(arg0).finish(),
173 Self::Stream { size_hint, .. } => f.debug_tuple("Stream").field(size_hint).finish(),
174 }
175 }
176}
177
178impl Default for Body {
179 fn default() -> Self {
180 Body::Bytes(vec![])
181 }
182}
183
184impl http_body::Body for BodyWrapper {
185 type Data = Bytes;
186
187 type Error = anyhow::Error;
188
189 fn poll_data(
190 mut self: Pin<&mut Self>,
191 cx: &mut Context<'_>,
192 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
193 match &mut self.body {
194 Body::Bytes(bytes) => {
195 let bytes = std::mem::take(bytes);
196 if self.has_written_blob {
197 return Poll::Ready(None);
198 }
199 self.eof = true;
200 self.has_written_blob = true;
201 Poll::Ready(Some(Ok(bytes.into())))
202 }
203 Body::Stream {
204 size_hint: _,
205 stream,
206 } => match stream.poll_next_unpin(cx) {
207 Poll::Pending => Poll::Pending,
208 Poll::Ready(None) => {
209 self.eof = true;
210 Poll::Ready(None)
211 }
212 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
213 Poll::Ready(Some(Ok(BodyComponent::Data(data)))) => {
214 self.bytes_streamed += data.len();
215 Poll::Ready(Some(Ok(data)))
216 }
217 Poll::Ready(Some(Ok(BodyComponent::Trailers(trailers)))) => {
218 self.pending_trailers = Some(trailers);
219 Poll::Ready(None)
220 }
221 },
222 }
223 }
224
225 fn poll_trailers(
226 mut self: Pin<&mut Self>,
227 _cx: &mut Context<'_>,
228 ) -> Poll<Result<Option<headers::HeaderMap>, Self::Error>> {
229 self.eof = true;
230 if let Some(pending_trailers) = self.pending_trailers.take() {
231 Poll::Ready(Ok(Some(pending_trailers.into())))
232 } else {
233 Poll::Ready(Ok(None))
234 }
235 }
236
237 fn is_end_stream(&self) -> bool {
238 self.eof
239 }
240
241 fn size_hint(&self) -> SizeHint {
242 if self.eof {
243 return SizeHint::with_exact(0);
244 }
245 match &self.body {
246 Body::Bytes(bytes) => SizeHint::with_exact(bytes.len() as u64),
247 Body::Stream {
248 size_hint,
249 stream: _,
250 } => size_hint
251 .map(|x| SizeHint::with_exact(x as u64))
252 .unwrap_or_default(),
253 }
254 }
255}