Skip to main content

aws_smithy_http_server/
body.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! HTTP body utilities.
7//!
8//! This module provides body handling utilities for HTTP 1.x using the
9//! `http-body` and `http-body-util` crates.
10
11use crate::error::{BoxError, Error};
12use bytes::Bytes;
13
14// Used in the codegen in trait bounds.
15#[doc(hidden)]
16pub use http_body::Body as HttpBody;
17
18// ============================================================================
19// BoxBody - Type-Erased Body
20// ============================================================================
21
22/// The primary body type returned by the generated `smithy-rs` service.
23///
24/// This is a type-erased body that wraps `UnsyncBoxBody` from `http-body-util`.
25/// It is `Send` but not `Sync`, making it suitable for most HTTP handlers.
26pub type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, Error>;
27
28/// A thread-safe body type for operations that require `Sync`.
29///
30/// This is used specifically for event streaming operations and lambda handlers
31/// that need thread safety guarantees.
32pub type BoxBodySync = http_body_util::combinators::BoxBody<Bytes, Error>;
33
34// ============================================================================
35// Body Construction Functions
36// ============================================================================
37
38// `boxed` is used in the codegen of the implementation of the operation `Handler` trait.
39/// Convert an HTTP body implementing [`http_body::Body`] into a [`BoxBody`].
40pub fn boxed<B>(body: B) -> BoxBody
41where
42    B: http_body::Body<Data = Bytes> + Send + 'static,
43    B::Error: Into<BoxError>,
44{
45    use http_body_util::BodyExt;
46
47    try_downcast(body).unwrap_or_else(|body| body.map_err(Error::new).boxed_unsync())
48}
49
50/// Convert an HTTP body implementing [`http_body::Body`] into a [`BoxBodySync`].
51pub fn boxed_sync<B>(body: B) -> BoxBodySync
52where
53    B: http_body::Body<Data = Bytes> + Send + Sync + 'static,
54    B::Error: Into<BoxError>,
55{
56    use http_body_util::BodyExt;
57    body.map_err(Error::new).boxed()
58}
59
60#[doc(hidden)]
61pub(crate) fn try_downcast<T, K>(k: K) -> Result<T, K>
62where
63    T: 'static,
64    K: Send + 'static,
65{
66    let mut k = Some(k);
67    if let Some(k) = <dyn std::any::Any>::downcast_mut::<Option<T>>(&mut k) {
68        Ok(k.take().unwrap())
69    } else {
70        Err(k.unwrap())
71    }
72}
73
74/// Create an empty body.
75pub fn empty() -> BoxBody {
76    boxed(http_body_util::Empty::<Bytes>::new())
77}
78
79/// Create an empty sync body.
80pub fn empty_sync() -> BoxBodySync {
81    boxed_sync(http_body_util::Empty::<Bytes>::new())
82}
83
84/// Convert bytes or similar types into a [`BoxBody`].
85///
86/// This simplifies codegen a little bit.
87#[doc(hidden)]
88pub fn to_boxed<B>(body: B) -> BoxBody
89where
90    B: Into<Bytes>,
91{
92    boxed(http_body_util::Full::new(body.into()))
93}
94
95/// Convert bytes or similar types into a [`BoxBodySync`].
96///
97/// This simplifies codegen a little bit.
98#[doc(hidden)]
99pub fn to_boxed_sync<B>(body: B) -> BoxBodySync
100where
101    B: Into<Bytes>,
102{
103    boxed_sync(http_body_util::Full::new(body.into()))
104}
105
106/// Create a body from bytes.
107pub fn from_bytes(bytes: Bytes) -> BoxBody {
108    boxed(http_body_util::Full::new(bytes))
109}
110
111// ============================================================================
112// Stream Wrapping for Event Streaming
113// ============================================================================
114
115/// Wrap a stream of byte chunks into a BoxBody.
116///
117/// This is used for event streaming support. The stream should produce `Result<O, E>`
118/// where `O` can be converted into `Bytes` and `E` can be converted into an error.
119///
120/// In hyper 0.x, `Body::wrap_stream` was available directly on the body type.
121/// In hyper 1.x, the `stream` feature was removed, and the official approach is to use
122/// `http_body_util::StreamBody` to convert streams into bodies, which is what this
123/// function provides as a convenient wrapper.
124///
125/// For scenarios requiring `Sync` (e.g., lambda handlers), use [`wrap_stream_sync`] instead.
126pub fn wrap_stream<S, O, E>(stream: S) -> BoxBody
127where
128    S: futures_util::Stream<Item = Result<O, E>> + Send + 'static,
129    O: Into<Bytes> + 'static,
130    E: Into<BoxError> + 'static,
131{
132    use futures_util::TryStreamExt;
133    use http_body_util::StreamBody;
134
135    // Convert the stream of Result<O, E> into a stream of Result<Frame<Bytes>, Error>
136    let frame_stream = stream
137        .map_ok(|chunk| http_body::Frame::data(chunk.into()))
138        .map_err(|e| Error::new(e.into()));
139
140    boxed(StreamBody::new(frame_stream))
141}
142
143/// Wrap a stream of byte chunks into a BoxBodySync.
144///
145/// This is the thread-safe variant of [`wrap_stream`], used for event streaming operations
146/// that require `Sync` bounds, such as lambda handlers.
147///
148/// The stream should produce `Result<O, E>` where `O` can be converted into `Bytes` and
149/// `E` can be converted into an error.
150pub fn wrap_stream_sync<S, O, E>(stream: S) -> BoxBodySync
151where
152    S: futures_util::Stream<Item = Result<O, E>> + Send + Sync + 'static,
153    O: Into<Bytes> + 'static,
154    E: Into<BoxError> + 'static,
155{
156    use futures_util::TryStreamExt;
157    use http_body_util::StreamBody;
158
159    // Convert the stream of Result<O, E> into a stream of Result<Frame<Bytes>, Error>
160    let frame_stream = stream
161        .map_ok(|chunk| http_body::Frame::data(chunk.into()))
162        .map_err(|e| Error::new(e.into()));
163
164    boxed_sync(StreamBody::new(frame_stream))
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170
171    /// Collect all bytes from a body (test utility).
172    ///
173    /// This uses `http_body_util::BodyExt::collect()` to read all body chunks
174    /// into a single `Bytes` buffer.
175    async fn collect_bytes<B>(body: B) -> Result<Bytes, Error>
176    where
177        B: HttpBody,
178        B::Error: Into<BoxError>,
179    {
180        use http_body_util::BodyExt;
181
182        let collected = body.collect().await.map_err(Error::new)?;
183        Ok(collected.to_bytes())
184    }
185
186    #[tokio::test]
187    async fn test_empty_body() {
188        let body = empty();
189        let bytes = collect_bytes(body).await.unwrap();
190        assert_eq!(bytes.len(), 0);
191    }
192
193    #[tokio::test]
194    async fn test_from_bytes() {
195        let data = Bytes::from("hello world");
196        let body = from_bytes(data.clone());
197        let collected = collect_bytes(body).await.unwrap();
198        assert_eq!(collected, data);
199    }
200
201    #[tokio::test]
202    async fn test_to_boxed_string() {
203        let s = "hello world";
204        let body = to_boxed(s);
205        let collected = collect_bytes(body).await.unwrap();
206        assert_eq!(collected, Bytes::from(s));
207    }
208
209    #[tokio::test]
210    async fn test_to_boxed_vec() {
211        let vec = vec![1u8, 2, 3, 4, 5];
212        let body = to_boxed(vec.clone());
213        let collected = collect_bytes(body).await.unwrap();
214        assert_eq!(collected.as_ref(), vec.as_slice());
215    }
216
217    #[tokio::test]
218    async fn test_boxed() {
219        use http_body_util::Full;
220        let full_body = Full::new(Bytes::from("test data"));
221        let boxed_body: BoxBody = boxed(full_body);
222        let collected = collect_bytes(boxed_body).await.unwrap();
223        assert_eq!(collected, Bytes::from("test data"));
224    }
225
226    #[tokio::test]
227    async fn test_boxed_sync() {
228        use http_body_util::Full;
229        let full_body = Full::new(Bytes::from("sync test"));
230        let boxed_body: BoxBodySync = boxed_sync(full_body);
231        let collected = collect_bytes(boxed_body).await.unwrap();
232        assert_eq!(collected, Bytes::from("sync test"));
233    }
234
235    #[tokio::test]
236    async fn test_wrap_stream_single_chunk() {
237        use futures_util::stream;
238
239        let data = Bytes::from("single chunk");
240        let stream = stream::iter(vec![Ok::<_, std::io::Error>(data.clone())]);
241
242        let body = wrap_stream(stream);
243        let collected = collect_bytes(body).await.unwrap();
244        assert_eq!(collected, data);
245    }
246
247    #[tokio::test]
248    async fn test_wrap_stream_multiple_chunks() {
249        use futures_util::stream;
250
251        let chunks = vec![
252            Ok::<_, std::io::Error>(Bytes::from("chunk1")),
253            Ok(Bytes::from("chunk2")),
254            Ok(Bytes::from("chunk3")),
255        ];
256        let expected = Bytes::from("chunk1chunk2chunk3");
257
258        let stream = stream::iter(chunks);
259        let body = wrap_stream(stream);
260        let collected = collect_bytes(body).await.unwrap();
261        assert_eq!(collected, expected);
262    }
263
264    #[tokio::test]
265    async fn test_wrap_stream_empty() {
266        use futures_util::stream;
267
268        let stream = stream::iter(vec![Ok::<_, std::io::Error>(Bytes::new())]);
269
270        let body = wrap_stream(stream);
271        let collected = collect_bytes(body).await.unwrap();
272        assert_eq!(collected.len(), 0);
273    }
274
275    #[tokio::test]
276    async fn test_wrap_stream_error() {
277        use futures_util::stream;
278
279        let chunks = vec![
280            Ok::<_, std::io::Error>(Bytes::from("chunk1")),
281            Err(std::io::Error::other("test error")),
282        ];
283
284        let stream = stream::iter(chunks);
285        let body = wrap_stream(stream);
286        let result = collect_bytes(body).await;
287        assert!(result.is_err());
288    }
289
290    #[tokio::test]
291    async fn test_wrap_stream_various_types() {
292        use futures_util::stream;
293
294        // Test that Into<Bytes> works for various types
295
296        // Test with &str
297        let chunks = vec![Ok::<_, std::io::Error>("string slice"), Ok("another string")];
298        let stream = stream::iter(chunks);
299        let body = wrap_stream(stream);
300        let collected = collect_bytes(body).await.unwrap();
301        assert_eq!(collected, Bytes::from("string sliceanother string"));
302
303        // Test with String
304        let chunks = vec![
305            Ok::<_, std::io::Error>(String::from("owned ")),
306            Ok(String::from("strings")),
307        ];
308        let stream = stream::iter(chunks);
309        let body = wrap_stream(stream);
310        let collected = collect_bytes(body).await.unwrap();
311        assert_eq!(collected, Bytes::from("owned strings"));
312
313        // Test with Vec<u8>
314        let chunks = vec![
315            Ok::<_, std::io::Error>(vec![72u8, 101, 108, 108, 111]), // "Hello"
316            Ok(vec![32u8, 87, 111, 114, 108, 100]),                  // " World"
317        ];
318        let stream = stream::iter(chunks);
319        let body = wrap_stream(stream);
320        let collected = collect_bytes(body).await.unwrap();
321        assert_eq!(collected, Bytes::from("Hello World"));
322
323        // Test with &[u8]
324        let chunks = vec![
325            Ok::<_, std::io::Error>(&[98u8, 121, 116, 101] as &[u8]), // "byte"
326            Ok(&[115u8, 33] as &[u8]),                                // "s!"
327        ];
328        let stream = stream::iter(chunks);
329        let body = wrap_stream(stream);
330        let collected = collect_bytes(body).await.unwrap();
331        assert_eq!(collected, Bytes::from("bytes!"));
332
333        // Test with custom struct implementing Into<Bytes>
334        struct CustomChunk {
335            data: String,
336        }
337
338        impl From<CustomChunk> for Bytes {
339            fn from(chunk: CustomChunk) -> Bytes {
340                Bytes::from(chunk.data)
341            }
342        }
343
344        let chunks = vec![
345            Ok::<_, std::io::Error>(CustomChunk { data: "custom ".into() }),
346            Ok(CustomChunk { data: "struct".into() }),
347        ];
348        let stream = stream::iter(chunks);
349        let body = wrap_stream(stream);
350        let collected = collect_bytes(body).await.unwrap();
351        assert_eq!(collected, Bytes::from("custom struct"));
352    }
353
354    #[tokio::test]
355    async fn test_wrap_stream_custom_stream_type() {
356        use bytes::Bytes;
357        use std::pin::Pin;
358        use std::task::{Context, Poll};
359
360        // Custom stream type that implements futures_util::Stream
361        struct CustomStream {
362            chunks: Vec<Result<Bytes, std::io::Error>>,
363        }
364
365        impl CustomStream {
366            fn new(chunks: Vec<Result<Bytes, std::io::Error>>) -> Self {
367                Self { chunks }
368            }
369        }
370
371        impl futures_util::Stream for CustomStream {
372            type Item = Result<Bytes, std::io::Error>;
373
374            fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
375                if self.chunks.is_empty() {
376                    Poll::Ready(None)
377                } else {
378                    Poll::Ready(Some(self.chunks.remove(0)))
379                }
380            }
381        }
382
383        let stream = CustomStream::new(vec![Ok(Bytes::from("custom ")), Ok(Bytes::from("stream"))]);
384
385        let body = wrap_stream(stream);
386        let collected = collect_bytes(body).await.unwrap();
387        assert_eq!(collected, Bytes::from("custom stream"));
388    }
389
390    #[tokio::test]
391    async fn test_wrap_stream_custom_error_type() {
392        use bytes::Bytes;
393        use futures_util::stream;
394
395        // Custom error type that implements Into<BoxError>
396        #[derive(Debug, Clone)]
397        struct CustomError {
398            message: String,
399        }
400
401        impl std::fmt::Display for CustomError {
402            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
403                write!(f, "CustomError: {}", self.message)
404            }
405        }
406
407        impl std::error::Error for CustomError {}
408
409        // Test successful case with custom error type
410        let chunks = vec![
411            Ok::<_, CustomError>(Bytes::from("custom ")),
412            Ok(Bytes::from("error type")),
413        ];
414        let stream = stream::iter(chunks);
415        let body = wrap_stream(stream);
416        let collected = collect_bytes(body).await.unwrap();
417        assert_eq!(collected, Bytes::from("custom error type"));
418
419        // Test error case with custom error type
420        let chunks = vec![
421            Ok::<_, CustomError>(Bytes::from("data")),
422            Err(CustomError {
423                message: "custom error".into(),
424            }),
425        ];
426        let stream = stream::iter(chunks);
427        let body = wrap_stream(stream);
428        let result = collect_bytes(body).await;
429        assert!(result.is_err());
430    }
431
432    #[tokio::test]
433    async fn test_wrap_stream_incremental_consumption() {
434        use bytes::Bytes;
435        use http_body_util::BodyExt;
436        use std::pin::Pin;
437        use std::task::{Context, Poll};
438
439        struct IncrementalStream {
440            chunks: Vec<Result<Bytes, std::io::Error>>,
441        }
442
443        impl IncrementalStream {
444            fn new(chunks: Vec<Result<Bytes, std::io::Error>>) -> Self {
445                Self { chunks }
446            }
447        }
448
449        impl futures_util::Stream for IncrementalStream {
450            type Item = Result<Bytes, std::io::Error>;
451
452            fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
453                if self.chunks.is_empty() {
454                    Poll::Ready(None)
455                } else {
456                    Poll::Ready(Some(self.chunks.remove(0)))
457                }
458            }
459        }
460
461        let stream = IncrementalStream::new(vec![
462            Ok(Bytes::from("chunk1")),
463            Ok(Bytes::from("chunk2")),
464            Ok(Bytes::from("chunk3")),
465        ]);
466
467        let mut body = wrap_stream(stream);
468
469        let frame1 = body.frame().await.unwrap().unwrap();
470        assert!(frame1.is_data());
471        assert_eq!(frame1.into_data().unwrap(), Bytes::from("chunk1"));
472
473        let frame2 = body.frame().await.unwrap().unwrap();
474        assert!(frame2.is_data());
475        assert_eq!(frame2.into_data().unwrap(), Bytes::from("chunk2"));
476
477        let frame3 = body.frame().await.unwrap().unwrap();
478        assert!(frame3.is_data());
479        assert_eq!(frame3.into_data().unwrap(), Bytes::from("chunk3"));
480
481        let frame4 = body.frame().await;
482        assert!(frame4.is_none());
483    }
484
485    #[tokio::test]
486    async fn test_wrap_stream_sync_single_chunk() {
487        use futures_util::stream;
488
489        let data = Bytes::from("sync single chunk");
490        let stream = stream::iter(vec![Ok::<_, std::io::Error>(data.clone())]);
491
492        let body = wrap_stream_sync(stream);
493        let collected = collect_bytes(body).await.unwrap();
494        assert_eq!(collected, data);
495    }
496
497    #[tokio::test]
498    async fn test_wrap_stream_sync_multiple_chunks() {
499        use futures_util::stream;
500
501        let chunks = vec![
502            Ok::<_, std::io::Error>(Bytes::from("sync1")),
503            Ok(Bytes::from("sync2")),
504            Ok(Bytes::from("sync3")),
505        ];
506        let expected = Bytes::from("sync1sync2sync3");
507
508        let stream = stream::iter(chunks);
509        let body = wrap_stream_sync(stream);
510        let collected = collect_bytes(body).await.unwrap();
511        assert_eq!(collected, expected);
512    }
513
514    #[tokio::test]
515    async fn test_empty_sync_body() {
516        let body = empty_sync();
517        let bytes = collect_bytes(body).await.unwrap();
518        assert_eq!(bytes.len(), 0);
519    }
520
521    #[tokio::test]
522    async fn test_to_boxed_sync() {
523        let data = Bytes::from("sync boxed data");
524        let body = to_boxed_sync(data.clone());
525        let collected = collect_bytes(body).await.unwrap();
526        assert_eq!(collected, data);
527    }
528
529    // Compile-time tests to ensure Send/Sync bounds are correct
530    // Following the pattern used by hyper and axum
531    fn _assert_send<T: Send>() {}
532    fn _assert_sync<T: Sync>() {}
533
534    fn _assert_send_sync_bounds() {
535        // BoxBodySync must be both Send and Sync
536        _assert_send::<BoxBodySync>();
537        _assert_sync::<BoxBodySync>();
538
539        // BoxBody must be Send (but is intentionally NOT Sync - it's UnsyncBoxBody)
540        _assert_send::<BoxBody>();
541    }
542
543    #[tokio::test]
544    async fn test_wrap_stream_sync_produces_sync_body() {
545        use futures_util::stream;
546
547        let data = Bytes::from("test sync");
548        let stream = stream::iter(vec![Ok::<_, std::io::Error>(data.clone())]);
549
550        let body = wrap_stream_sync(stream);
551
552        // Compile-time check: ensure the body is Sync
553        fn check_sync<T: Sync>(_: &T) {}
554        check_sync(&body);
555
556        let collected = collect_bytes(body).await.unwrap();
557        assert_eq!(collected, data);
558    }
559
560    #[test]
561    fn test_empty_sync_is_sync() {
562        let body = empty_sync();
563        fn check_sync<T: Sync>(_: &T) {}
564        check_sync(&body);
565    }
566
567    #[test]
568    fn test_boxed_sync_is_sync() {
569        use http_body_util::Full;
570        let body = boxed_sync(Full::new(Bytes::from("test")));
571        fn check_sync<T: Sync>(_: &T) {}
572        check_sync(&body);
573    }
574}