1use std::pin::Pin;
9use std::task::{ready, Context, Poll};
10
11use bytes::Bytes;
12use http_body_util::BodyExt;
13use pin_project_lite::pin_project;
14
15use crate::body::{Error, SdkBody};
16
17impl SdkBody {
18 pub fn from_body_1_x<T, E>(body: T) -> Self
20 where
21 T: http_body_1_0::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
22 E: Into<Error> + 'static,
23 {
24 SdkBody::from_body_0_4_internal(Http1toHttp04::new(body.map_err(Into::into)))
25 }
26
27 pub(crate) fn poll_data_frame(
28 mut self: Pin<&mut Self>,
29 cx: &mut Context<'_>,
30 ) -> Poll<Option<Result<http_body_1_0::Frame<Bytes>, Error>>> {
31 match ready!(self.as_mut().poll_next(cx)) {
32 None => match ready!(self.poll_next_trailers(cx)) {
34 Ok(Some(trailers)) => Poll::Ready(Some(Ok(http_body_1_0::Frame::trailers(
35 convert_headers_0x_1x(trailers),
36 )))),
37 Ok(None) => Poll::Ready(None),
38 Err(e) => Poll::Ready(Some(Err(e))),
39 },
40 Some(result) => match result {
41 Err(err) => Poll::Ready(Some(Err(err))),
42 Ok(bytes) => Poll::Ready(Some(Ok(http_body_1_0::Frame::data(bytes)))),
43 },
44 }
45 }
46}
47
48#[cfg(feature = "http-body-1-x")]
49impl http_body_1_0::Body for SdkBody {
50 type Data = Bytes;
51 type Error = Error;
52
53 fn poll_frame(
54 self: Pin<&mut Self>,
55 cx: &mut Context<'_>,
56 ) -> Poll<Option<Result<http_body_1_0::Frame<Self::Data>, Self::Error>>> {
57 self.poll_data_frame(cx)
58 }
59
60 fn is_end_stream(&self) -> bool {
61 self.is_end_stream()
62 }
63
64 fn size_hint(&self) -> http_body_1_0::SizeHint {
65 let mut hint = http_body_1_0::SizeHint::default();
66 let (lower, upper) = self.bounds_on_remaining_length();
67 hint.set_lower(lower);
68 if let Some(upper) = upper {
69 hint.set_upper(upper);
70 }
71 hint
72 }
73}
74
75pin_project! {
76 struct Http1toHttp04<B> {
77 #[pin]
78 inner: B,
79 trailers: Option<http_1x::HeaderMap>,
80 }
81}
82
83impl<B> Http1toHttp04<B> {
84 fn new(inner: B) -> Self {
85 Self {
86 inner,
87 trailers: None,
88 }
89 }
90}
91
92impl<B> http_body_0_4::Body for Http1toHttp04<B>
93where
94 B: http_body_1_0::Body,
95{
96 type Data = B::Data;
97 type Error = B::Error;
98
99 fn poll_data(
100 mut self: Pin<&mut Self>,
101 cx: &mut Context<'_>,
102 ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
103 loop {
104 let this = self.as_mut().project();
105 match ready!(this.inner.poll_frame(cx)) {
106 Some(Ok(frame)) => {
107 let frame = match frame.into_data() {
108 Ok(data) => return Poll::Ready(Some(Ok(data))),
109 Err(frame) => frame,
110 };
111 if let Ok(trailers) = frame.into_trailers() {
113 this.trailers.replace(trailers);
114 return Poll::Ready(None);
115 };
116 }
119 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
120 None => return Poll::Ready(None),
121 }
122 }
123 }
124
125 fn poll_trailers(
126 self: Pin<&mut Self>,
127 _cx: &mut Context<'_>,
128 ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
129 let this = self.project();
132 match this.trailers.take() {
133 Some(headers) => Poll::Ready(Ok(Some(convert_headers_1x_0x(headers)))),
134 None => Poll::Ready(Ok(None)),
135 }
136 }
137
138 fn is_end_stream(&self) -> bool {
139 self.inner.is_end_stream()
140 }
141
142 fn size_hint(&self) -> http_body_0_4::SizeHint {
143 let mut size_hint = http_body_0_4::SizeHint::new();
144 let inner_hint = self.inner.size_hint();
145 if let Some(exact) = inner_hint.exact() {
146 size_hint.set_exact(exact);
147 } else {
148 size_hint.set_lower(inner_hint.lower());
149 if let Some(upper) = inner_hint.upper() {
150 size_hint.set_upper(upper);
151 }
152 }
153 size_hint
154 }
155}
156
157fn convert_headers_1x_0x(input: http_1x::HeaderMap) -> http::HeaderMap {
158 let mut map = http::HeaderMap::with_capacity(input.capacity());
159 let mut mem: Option<http_1x::HeaderName> = None;
160 for (k, v) in input.into_iter() {
161 let name = k.or_else(|| mem.clone()).unwrap();
162 map.append(
163 http::HeaderName::from_bytes(name.as_str().as_bytes()).expect("already validated"),
164 http::HeaderValue::from_bytes(v.as_bytes()).expect("already validated"),
165 );
166 mem = Some(name);
167 }
168 map
169}
170
171fn convert_headers_0x_1x(input: http::HeaderMap) -> http_1x::HeaderMap {
172 let mut map = http_1x::HeaderMap::with_capacity(input.capacity());
173 let mut mem: Option<http::HeaderName> = None;
174 for (k, v) in input.into_iter() {
175 let name = k.or_else(|| mem.clone()).unwrap();
176 map.append(
177 http_1x::HeaderName::from_bytes(name.as_str().as_bytes()).expect("already validated"),
178 http_1x::HeaderValue::from_bytes(v.as_bytes()).expect("already validated"),
179 );
180 mem = Some(name);
181 }
182 map
183}
184
185#[cfg(test)]
186mod test {
187 use std::collections::VecDeque;
188 use std::pin::Pin;
189 use std::task::{Context, Poll};
190
191 use bytes::Bytes;
192 use http::header::{CONTENT_LENGTH as CL0, CONTENT_TYPE as CT0};
193 use http_1x::header::{CONTENT_LENGTH as CL1, CONTENT_TYPE as CT1};
194 use http_1x::{HeaderMap, HeaderName, HeaderValue};
195 use http_body_1_0::Frame;
196 use http_body_util::BodyExt;
197
198 use crate::body::http_body_1_x::{convert_headers_1x_0x, Http1toHttp04};
199 use crate::body::{Error, SdkBody};
200 use crate::byte_stream::ByteStream;
201
202 struct TestBody {
203 chunks: VecDeque<Chunk>,
204 }
205
206 enum Chunk {
207 Data(&'static str),
208 Error(&'static str),
209 Trailers(HeaderMap),
210 }
211
212 impl http_body_1_0::Body for TestBody {
213 type Data = Bytes;
214 type Error = Error;
215
216 fn poll_frame(
217 mut self: Pin<&mut Self>,
218 _cx: &mut Context<'_>,
219 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
220 let next = self.chunks.pop_front();
221 let mk = |v: Frame<Bytes>| Poll::Ready(Some(Ok(v)));
222
223 match next {
224 Some(Chunk::Data(s)) => mk(Frame::data(Bytes::from_static(s.as_bytes()))),
225 Some(Chunk::Trailers(headers)) => mk(Frame::trailers(headers)),
226 Some(Chunk::Error(err)) => Poll::Ready(Some(Err(err.into()))),
227 None => Poll::Ready(None),
228 }
229 }
230 }
231
232 fn trailers() -> HeaderMap {
233 let mut map = HeaderMap::new();
234 map.insert(
235 HeaderName::from_static("x-test"),
236 HeaderValue::from_static("x-test-value"),
237 );
238 map.append(
239 HeaderName::from_static("x-test"),
240 HeaderValue::from_static("x-test-value-2"),
241 );
242 map.append(
243 HeaderName::from_static("y-test"),
244 HeaderValue::from_static("y-test-value-2"),
245 );
246 map
247 }
248
249 #[tokio::test]
250 async fn test_body_with_trailers() {
251 let body = TestBody {
252 chunks: vec![
253 Chunk::Data("123"),
254 Chunk::Data("456"),
255 Chunk::Data("789"),
256 Chunk::Trailers(trailers()),
257 ]
258 .into(),
259 };
260 let body = SdkBody::from_body_1_x(body);
261 let data = ByteStream::new(body);
262 assert_eq!(data.collect().await.unwrap().to_vec(), b"123456789");
263 }
264
265 #[tokio::test]
266 async fn test_read_trailers() {
267 let body = TestBody {
268 chunks: vec![
269 Chunk::Data("123"),
270 Chunk::Data("456"),
271 Chunk::Data("789"),
272 Chunk::Trailers(trailers()),
273 ]
274 .into(),
275 };
276 let mut body = SdkBody::from_body_1_x(body);
277 while let Some(_data) = http_body_0_4::Body::data(&mut body).await {}
278 assert_eq!(
279 http_body_0_4::Body::trailers(&mut body).await.unwrap(),
280 Some(convert_headers_1x_0x(trailers()))
281 );
282 }
283
284 #[tokio::test]
285 async fn test_read_trailers_as_1x() {
286 let body = TestBody {
287 chunks: vec![
288 Chunk::Data("123"),
289 Chunk::Data("456"),
290 Chunk::Data("789"),
291 Chunk::Trailers(trailers()),
292 ]
293 .into(),
294 };
295 let body = SdkBody::from_body_1_x(body);
296
297 let collected = BodyExt::collect(body).await.expect("should succeed");
298 assert_eq!(collected.trailers(), Some(&trailers()));
299 assert_eq!(collected.to_bytes().as_ref(), b"123456789");
300 }
301
302 #[tokio::test]
303 async fn test_trailers_04x_to_1x() {
304 let body = TestBody {
305 chunks: vec![
306 Chunk::Data("123"),
307 Chunk::Data("456"),
308 Chunk::Data("789"),
309 Chunk::Trailers(trailers()),
310 ]
311 .into(),
312 };
313 let body = SdkBody::from_body_0_4(Http1toHttp04::new(body));
314
315 let collected = BodyExt::collect(body).await.expect("should succeed");
316 assert_eq!(collected.trailers(), Some(&trailers()));
317 assert_eq!(collected.to_bytes().as_ref(), b"123456789");
318 }
319
320 #[tokio::test]
321 async fn test_errors() {
322 let body = TestBody {
323 chunks: vec![
324 Chunk::Data("123"),
325 Chunk::Data("456"),
326 Chunk::Data("789"),
327 Chunk::Error("errors!"),
328 ]
329 .into(),
330 };
331
332 let body = SdkBody::from_body_1_x(body);
333 let body = ByteStream::new(body);
334 body.collect().await.expect_err("body returned an error");
335 }
336
337 #[tokio::test]
338 async fn test_no_trailers() {
339 let body = TestBody {
340 chunks: vec![Chunk::Data("123"), Chunk::Data("456"), Chunk::Data("789")].into(),
341 };
342
343 let body = SdkBody::from_body_1_x(body);
344 let body = ByteStream::new(body);
345 assert_eq!(body.collect().await.unwrap().to_vec(), b"123456789");
346 }
347
348 #[test]
349 fn test_convert_headers() {
350 let mut http1_headermap = http_1x::HeaderMap::new();
351 http1_headermap.append(CT1, HeaderValue::from_static("a"));
352 http1_headermap.append(CT1, HeaderValue::from_static("b"));
353 http1_headermap.append(CT1, HeaderValue::from_static("c"));
354
355 http1_headermap.insert(CL1, HeaderValue::from_static("1234"));
356
357 let mut expect = http::HeaderMap::new();
358 expect.append(CT0, http::HeaderValue::from_static("a"));
359 expect.append(CT0, http::HeaderValue::from_static("b"));
360 expect.append(CT0, http::HeaderValue::from_static("c"));
361
362 expect.insert(CL0, http::HeaderValue::from_static("1234"));
363
364 assert_eq!(convert_headers_1x_0x(http1_headermap), expect);
365 }
366
367 #[test]
368 fn sdkbody_debug_dyn() {
369 let body = TestBody {
370 chunks: vec![
371 Chunk::Data("123"),
372 Chunk::Data("456"),
373 Chunk::Data("789"),
374 Chunk::Trailers(trailers()),
375 ]
376 .into(),
377 };
378 let body = SdkBody::from_body_1_x(body);
379 assert!(format!("{body:?}").contains("BoxBody"));
380 }
381}