aws_smithy_types/body/
http_body_1_x.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! Adapters to use http-body 1.0 bodies with SdkBody & ByteStream
7
8use std::pin::Pin;
9use std::task::{ready, Context, Poll};
10
11use bytes::Bytes;
12use http_body_util::BodyExt;
13use pin_project_lite::pin_project;
14
15use crate::body::{Error, SdkBody};
16
17impl SdkBody {
18    /// Construct an `SdkBody` from a type that implements [`http_body_1_0::Body<Data = Bytes>`](http_body_1_0::Body).
19    pub fn from_body_1_x<T, E>(body: T) -> Self
20    where
21        T: http_body_1_0::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
22        E: Into<Error> + 'static,
23    {
24        SdkBody::from_body_0_4_internal(Http1toHttp04::new(body.map_err(Into::into)))
25    }
26
27    pub(crate) fn poll_data_frame(
28        mut self: Pin<&mut Self>,
29        cx: &mut Context<'_>,
30    ) -> Poll<Option<Result<http_body_1_0::Frame<Bytes>, Error>>> {
31        match ready!(self.as_mut().poll_next(cx)) {
32            // if there's no more data, try to return trailers
33            None => match ready!(self.poll_next_trailers(cx)) {
34                Ok(Some(trailers)) => Poll::Ready(Some(Ok(http_body_1_0::Frame::trailers(
35                    convert_headers_0x_1x(trailers),
36                )))),
37                Ok(None) => Poll::Ready(None),
38                Err(e) => Poll::Ready(Some(Err(e))),
39            },
40            Some(result) => match result {
41                Err(err) => Poll::Ready(Some(Err(err))),
42                Ok(bytes) => Poll::Ready(Some(Ok(http_body_1_0::Frame::data(bytes)))),
43            },
44        }
45    }
46}
47
48#[cfg(feature = "http-body-1-x")]
49impl http_body_1_0::Body for SdkBody {
50    type Data = Bytes;
51    type Error = Error;
52
53    fn poll_frame(
54        self: Pin<&mut Self>,
55        cx: &mut Context<'_>,
56    ) -> Poll<Option<Result<http_body_1_0::Frame<Self::Data>, Self::Error>>> {
57        self.poll_data_frame(cx)
58    }
59
60    fn is_end_stream(&self) -> bool {
61        self.is_end_stream()
62    }
63
64    fn size_hint(&self) -> http_body_1_0::SizeHint {
65        let mut hint = http_body_1_0::SizeHint::default();
66        let (lower, upper) = self.bounds_on_remaining_length();
67        hint.set_lower(lower);
68        if let Some(upper) = upper {
69            hint.set_upper(upper);
70        }
71        hint
72    }
73}
74
75pin_project! {
76    struct Http1toHttp04<B> {
77        #[pin]
78        inner: B,
79        trailers: Option<http_1x::HeaderMap>,
80    }
81}
82
83impl<B> Http1toHttp04<B> {
84    fn new(inner: B) -> Self {
85        Self {
86            inner,
87            trailers: None,
88        }
89    }
90}
91
92impl<B> http_body_0_4::Body for Http1toHttp04<B>
93where
94    B: http_body_1_0::Body,
95{
96    type Data = B::Data;
97    type Error = B::Error;
98
99    fn poll_data(
100        mut self: Pin<&mut Self>,
101        cx: &mut Context<'_>,
102    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
103        loop {
104            let this = self.as_mut().project();
105            match ready!(this.inner.poll_frame(cx)) {
106                Some(Ok(frame)) => {
107                    let frame = match frame.into_data() {
108                        Ok(data) => return Poll::Ready(Some(Ok(data))),
109                        Err(frame) => frame,
110                    };
111                    // when we get a trailers frame, store the trailers for the next poll
112                    if let Ok(trailers) = frame.into_trailers() {
113                        this.trailers.replace(trailers);
114                        return Poll::Ready(None);
115                    };
116                    // if the frame type was unknown, discard it. the next one might be something
117                    // useful
118                }
119                Some(Err(e)) => return Poll::Ready(Some(Err(e))),
120                None => return Poll::Ready(None),
121            }
122        }
123    }
124
125    fn poll_trailers(
126        self: Pin<&mut Self>,
127        _cx: &mut Context<'_>,
128    ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
129        // all of the polling happens in poll_data, once we get to the trailers we've actually
130        // already read everything
131        let this = self.project();
132        match this.trailers.take() {
133            Some(headers) => Poll::Ready(Ok(Some(convert_headers_1x_0x(headers)))),
134            None => Poll::Ready(Ok(None)),
135        }
136    }
137
138    fn is_end_stream(&self) -> bool {
139        self.inner.is_end_stream()
140    }
141
142    fn size_hint(&self) -> http_body_0_4::SizeHint {
143        let mut size_hint = http_body_0_4::SizeHint::new();
144        let inner_hint = self.inner.size_hint();
145        if let Some(exact) = inner_hint.exact() {
146            size_hint.set_exact(exact);
147        } else {
148            size_hint.set_lower(inner_hint.lower());
149            if let Some(upper) = inner_hint.upper() {
150                size_hint.set_upper(upper);
151            }
152        }
153        size_hint
154    }
155}
156
157fn convert_headers_1x_0x(input: http_1x::HeaderMap) -> http::HeaderMap {
158    let mut map = http::HeaderMap::with_capacity(input.capacity());
159    let mut mem: Option<http_1x::HeaderName> = None;
160    for (k, v) in input.into_iter() {
161        let name = k.or_else(|| mem.clone()).unwrap();
162        map.append(
163            http::HeaderName::from_bytes(name.as_str().as_bytes()).expect("already validated"),
164            http::HeaderValue::from_bytes(v.as_bytes()).expect("already validated"),
165        );
166        mem = Some(name);
167    }
168    map
169}
170
171fn convert_headers_0x_1x(input: http::HeaderMap) -> http_1x::HeaderMap {
172    let mut map = http_1x::HeaderMap::with_capacity(input.capacity());
173    let mut mem: Option<http::HeaderName> = None;
174    for (k, v) in input.into_iter() {
175        let name = k.or_else(|| mem.clone()).unwrap();
176        map.append(
177            http_1x::HeaderName::from_bytes(name.as_str().as_bytes()).expect("already validated"),
178            http_1x::HeaderValue::from_bytes(v.as_bytes()).expect("already validated"),
179        );
180        mem = Some(name);
181    }
182    map
183}
184
185#[cfg(test)]
186mod test {
187    use std::collections::VecDeque;
188    use std::pin::Pin;
189    use std::task::{Context, Poll};
190
191    use bytes::Bytes;
192    use http::header::{CONTENT_LENGTH as CL0, CONTENT_TYPE as CT0};
193    use http_1x::header::{CONTENT_LENGTH as CL1, CONTENT_TYPE as CT1};
194    use http_1x::{HeaderMap, HeaderName, HeaderValue};
195    use http_body_1_0::Frame;
196    use http_body_util::BodyExt;
197
198    use crate::body::http_body_1_x::{convert_headers_1x_0x, Http1toHttp04};
199    use crate::body::{Error, SdkBody};
200    use crate::byte_stream::ByteStream;
201
202    struct TestBody {
203        chunks: VecDeque<Chunk>,
204    }
205
206    enum Chunk {
207        Data(&'static str),
208        Error(&'static str),
209        Trailers(HeaderMap),
210    }
211
212    impl http_body_1_0::Body for TestBody {
213        type Data = Bytes;
214        type Error = Error;
215
216        fn poll_frame(
217            mut self: Pin<&mut Self>,
218            _cx: &mut Context<'_>,
219        ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
220            let next = self.chunks.pop_front();
221            let mk = |v: Frame<Bytes>| Poll::Ready(Some(Ok(v)));
222
223            match next {
224                Some(Chunk::Data(s)) => mk(Frame::data(Bytes::from_static(s.as_bytes()))),
225                Some(Chunk::Trailers(headers)) => mk(Frame::trailers(headers)),
226                Some(Chunk::Error(err)) => Poll::Ready(Some(Err(err.into()))),
227                None => Poll::Ready(None),
228            }
229        }
230    }
231
232    fn trailers() -> HeaderMap {
233        let mut map = HeaderMap::new();
234        map.insert(
235            HeaderName::from_static("x-test"),
236            HeaderValue::from_static("x-test-value"),
237        );
238        map.append(
239            HeaderName::from_static("x-test"),
240            HeaderValue::from_static("x-test-value-2"),
241        );
242        map.append(
243            HeaderName::from_static("y-test"),
244            HeaderValue::from_static("y-test-value-2"),
245        );
246        map
247    }
248
249    #[tokio::test]
250    async fn test_body_with_trailers() {
251        let body = TestBody {
252            chunks: vec![
253                Chunk::Data("123"),
254                Chunk::Data("456"),
255                Chunk::Data("789"),
256                Chunk::Trailers(trailers()),
257            ]
258            .into(),
259        };
260        let body = SdkBody::from_body_1_x(body);
261        let data = ByteStream::new(body);
262        assert_eq!(data.collect().await.unwrap().to_vec(), b"123456789");
263    }
264
265    #[tokio::test]
266    async fn test_read_trailers() {
267        let body = TestBody {
268            chunks: vec![
269                Chunk::Data("123"),
270                Chunk::Data("456"),
271                Chunk::Data("789"),
272                Chunk::Trailers(trailers()),
273            ]
274            .into(),
275        };
276        let mut body = SdkBody::from_body_1_x(body);
277        while let Some(_data) = http_body_0_4::Body::data(&mut body).await {}
278        assert_eq!(
279            http_body_0_4::Body::trailers(&mut body).await.unwrap(),
280            Some(convert_headers_1x_0x(trailers()))
281        );
282    }
283
284    #[tokio::test]
285    async fn test_read_trailers_as_1x() {
286        let body = TestBody {
287            chunks: vec![
288                Chunk::Data("123"),
289                Chunk::Data("456"),
290                Chunk::Data("789"),
291                Chunk::Trailers(trailers()),
292            ]
293            .into(),
294        };
295        let body = SdkBody::from_body_1_x(body);
296
297        let collected = BodyExt::collect(body).await.expect("should succeed");
298        assert_eq!(collected.trailers(), Some(&trailers()));
299        assert_eq!(collected.to_bytes().as_ref(), b"123456789");
300    }
301
302    #[tokio::test]
303    async fn test_trailers_04x_to_1x() {
304        let body = TestBody {
305            chunks: vec![
306                Chunk::Data("123"),
307                Chunk::Data("456"),
308                Chunk::Data("789"),
309                Chunk::Trailers(trailers()),
310            ]
311            .into(),
312        };
313        let body = SdkBody::from_body_0_4(Http1toHttp04::new(body));
314
315        let collected = BodyExt::collect(body).await.expect("should succeed");
316        assert_eq!(collected.trailers(), Some(&trailers()));
317        assert_eq!(collected.to_bytes().as_ref(), b"123456789");
318    }
319
320    #[tokio::test]
321    async fn test_errors() {
322        let body = TestBody {
323            chunks: vec![
324                Chunk::Data("123"),
325                Chunk::Data("456"),
326                Chunk::Data("789"),
327                Chunk::Error("errors!"),
328            ]
329            .into(),
330        };
331
332        let body = SdkBody::from_body_1_x(body);
333        let body = ByteStream::new(body);
334        body.collect().await.expect_err("body returned an error");
335    }
336
337    #[tokio::test]
338    async fn test_no_trailers() {
339        let body = TestBody {
340            chunks: vec![Chunk::Data("123"), Chunk::Data("456"), Chunk::Data("789")].into(),
341        };
342
343        let body = SdkBody::from_body_1_x(body);
344        let body = ByteStream::new(body);
345        assert_eq!(body.collect().await.unwrap().to_vec(), b"123456789");
346    }
347
348    #[test]
349    fn test_convert_headers() {
350        let mut http1_headermap = http_1x::HeaderMap::new();
351        http1_headermap.append(CT1, HeaderValue::from_static("a"));
352        http1_headermap.append(CT1, HeaderValue::from_static("b"));
353        http1_headermap.append(CT1, HeaderValue::from_static("c"));
354
355        http1_headermap.insert(CL1, HeaderValue::from_static("1234"));
356
357        let mut expect = http::HeaderMap::new();
358        expect.append(CT0, http::HeaderValue::from_static("a"));
359        expect.append(CT0, http::HeaderValue::from_static("b"));
360        expect.append(CT0, http::HeaderValue::from_static("c"));
361
362        expect.insert(CL0, http::HeaderValue::from_static("1234"));
363
364        assert_eq!(convert_headers_1x_0x(http1_headermap), expect);
365    }
366
367    #[test]
368    fn sdkbody_debug_dyn() {
369        let body = TestBody {
370            chunks: vec![
371                Chunk::Data("123"),
372                Chunk::Data("456"),
373                Chunk::Data("789"),
374                Chunk::Trailers(trailers()),
375            ]
376            .into(),
377        };
378        let body = SdkBody::from_body_1_x(body);
379        assert!(format!("{body:?}").contains("BoxBody"));
380    }
381}