aws_smithy_types/byte_stream/
http_body_0_4_x.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use crate::body::SdkBody;
7use crate::byte_stream::ByteStream;
8use bytes::Bytes;
9
10impl ByteStream {
11    /// Construct a `ByteStream` from a type that implements [`http_body_0_4::Body<Data = Bytes>`](http_body_0_4::Body).
12    ///
13    /// _Note: This is only available when the `http-body-0-4-x` feature is enabled._
14    pub fn from_body_0_4<T, E>(body: T) -> Self
15    where
16        T: http_body_0_4::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
17        E: Into<crate::body::Error> + 'static,
18    {
19        ByteStream::new(SdkBody::from_body_0_4(body))
20    }
21}
22
23#[cfg(feature = "hyper-0-14-x")]
24impl From<hyper_0_14::Body> for ByteStream {
25    fn from(input: hyper_0_14::Body) -> Self {
26        ByteStream::new(SdkBody::from_body_0_4(input))
27    }
28}
29
30#[cfg(test)]
31mod tests {
32    use crate::body::SdkBody;
33    use crate::byte_stream::Inner;
34    use bytes::Bytes;
35
36    #[cfg(feature = "hyper-0-14-x")]
37    #[tokio::test]
38    async fn read_from_channel_body() {
39        let (mut sender, body) = hyper_0_14::Body::channel();
40        let byte_stream = Inner::new(SdkBody::from_body_0_4(body));
41        tokio::spawn(async move {
42            sender.send_data(Bytes::from("data 1")).await.unwrap();
43            sender.send_data(Bytes::from("data 2")).await.unwrap();
44            sender.send_data(Bytes::from("data 3")).await.unwrap();
45        });
46        assert_eq!(
47            byte_stream.collect().await.expect("no errors").into_bytes(),
48            Bytes::from("data 1data 2data 3")
49        );
50    }
51
52    #[cfg(feature = "rt-tokio")]
53    #[tokio::test]
54    async fn path_based_bytestreams() -> Result<(), Box<dyn std::error::Error>> {
55        use super::ByteStream;
56        use bytes::Buf;
57        use std::io::Write;
58        use tempfile::NamedTempFile;
59        let mut file = NamedTempFile::new()?;
60
61        for i in 0..10000 {
62            writeln!(file, "Brian was here. Briefly. {i}")?;
63        }
64        let body = ByteStream::from_path(&file).await?.into_inner();
65        // assert that a valid size hint is immediately ready
66        assert_eq!(body.content_length(), Some(298890));
67        let mut body1 = body.try_clone().expect("retryable bodies are cloneable");
68        // read a little bit from one of the clones
69        let some_data = body1
70            .next()
71            .await
72            .expect("should have some data")
73            .expect("read should not fail");
74        assert!(!some_data.is_empty());
75        // make some more clones
76        let body2 = body.try_clone().expect("retryable bodies are cloneable");
77        let body3 = body.try_clone().expect("retryable bodies are cloneable");
78        let body2 = ByteStream::new(body2).collect().await?.into_bytes();
79        let body3 = ByteStream::new(body3).collect().await?.into_bytes();
80        assert_eq!(body2, body3);
81        assert!(body2.starts_with(b"Brian was here."));
82        assert!(body2.ends_with(b"9999\n"));
83        assert_eq!(body2.len(), 298890);
84
85        assert_eq!(
86            ByteStream::new(body1).collect().await?.remaining(),
87            298890 - some_data.len()
88        );
89
90        Ok(())
91    }
92}