aws_smithy_types/byte_stream/
http_body_0_4_x.rs1use crate::body::SdkBody;
7use crate::byte_stream::ByteStream;
8use bytes::Bytes;
9
10impl ByteStream {
11 pub fn from_body_0_4<T, E>(body: T) -> Self
15 where
16 T: http_body_0_4::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
17 E: Into<crate::body::Error> + 'static,
18 {
19 ByteStream::new(SdkBody::from_body_0_4(body))
20 }
21}
22
23#[cfg(feature = "hyper-0-14-x")]
24impl From<hyper_0_14::Body> for ByteStream {
25 fn from(input: hyper_0_14::Body) -> Self {
26 ByteStream::new(SdkBody::from_body_0_4(input))
27 }
28}
29
30#[cfg(test)]
31mod tests {
32 use crate::body::SdkBody;
33 use crate::byte_stream::Inner;
34 use bytes::Bytes;
35
36 #[cfg(feature = "hyper-0-14-x")]
37 #[tokio::test]
38 async fn read_from_channel_body() {
39 let (mut sender, body) = hyper_0_14::Body::channel();
40 let byte_stream = Inner::new(SdkBody::from_body_0_4(body));
41 tokio::spawn(async move {
42 sender.send_data(Bytes::from("data 1")).await.unwrap();
43 sender.send_data(Bytes::from("data 2")).await.unwrap();
44 sender.send_data(Bytes::from("data 3")).await.unwrap();
45 });
46 assert_eq!(
47 byte_stream.collect().await.expect("no errors").into_bytes(),
48 Bytes::from("data 1data 2data 3")
49 );
50 }
51
52 #[cfg(feature = "rt-tokio")]
53 #[tokio::test]
54 async fn path_based_bytestreams() -> Result<(), Box<dyn std::error::Error>> {
55 use super::ByteStream;
56 use bytes::Buf;
57 use std::io::Write;
58 use tempfile::NamedTempFile;
59 let mut file = NamedTempFile::new()?;
60
61 for i in 0..10000 {
62 writeln!(file, "Brian was here. Briefly. {i}")?;
63 }
64 let body = ByteStream::from_path(&file).await?.into_inner();
65 assert_eq!(body.content_length(), Some(298890));
67 let mut body1 = body.try_clone().expect("retryable bodies are cloneable");
68 let some_data = body1
70 .next()
71 .await
72 .expect("should have some data")
73 .expect("read should not fail");
74 assert!(!some_data.is_empty());
75 let body2 = body.try_clone().expect("retryable bodies are cloneable");
77 let body3 = body.try_clone().expect("retryable bodies are cloneable");
78 let body2 = ByteStream::new(body2).collect().await?.into_bytes();
79 let body3 = ByteStream::new(body3).collect().await?.into_bytes();
80 assert_eq!(body2, body3);
81 assert!(body2.starts_with(b"Brian was here."));
82 assert!(body2.ends_with(b"9999\n"));
83 assert_eq!(body2.len(), 298890);
84
85 assert_eq!(
86 ByteStream::new(body1).collect().await?.remaining(),
87 298890 - some_data.len()
88 );
89
90 Ok(())
91 }
92}