Skip to main content

async_compression_issue_150_workaround/stream/
mod.rs

1//! Types which operate over [`Stream`](futures_core::stream::Stream)`<Item =
2//! `[`io::Result`](std::io::Result)`<`[`Bytes`](bytes_05::Bytes)`>>` streams, both encoders and
3//! decoders for various formats.
4//!
5//! The `Stream` is treated as a single byte-stream to be compressed/decompressed, each item is a
6//! chunk of data from this byte-stream. There is not guaranteed to be a one-to-one relationship
7//! between chunks of data from the underlying stream and the resulting compressed/decompressed
8//! stream, the encoders and decoders will buffer the incoming data and choose their own boundaries
9//! at which to yield a new item.
10//!
11//! # Deprecation Migration
12//!
13//! This feature and module was deprecated because it's choosing one point in a large solution
14//! space of "stream of byte chunks" to represent an IO data stream, and the conversion between
15//! these solutions and standard IO data streams like `futures::io::AsyncBufRead` /
16//! `tokio::io::AsyncBufRead` should be zero-cost.
17//!
18//! ```rust
19//! use bytes_05::Bytes;
20//! use futures::{stream::Stream, TryStreamExt};
21//! use std::io::Result;
22//!
23//! /// For code that looks like this, choose one of the options below to replace it
24//! fn from(
25//!     input: impl Stream<Item = Result<bytes_05::Bytes>>,
26//! ) -> impl Stream<Item = Result<bytes_05::Bytes>> {
27//!     #[allow(deprecated)]
28//!     async_compression::stream::GzipEncoder::new(input)
29//! }
30//!
31//! /// Direct replacement with `tokio` v0.2 and `bytes` v0.5 using `tokio-util` v0.3
32//! fn tokio_02_bytes_05(
33//!     input: impl Stream<Item = Result<bytes_05::Bytes>>,
34//! ) -> impl Stream<Item = Result<bytes_05::Bytes>> {
35//!     tokio_util_03::codec::FramedRead::new(
36//!         async_compression::tokio_02::bufread::GzipEncoder::new(
37//!             tokio_02::io::stream_reader(input),
38//!         ),
39//!         tokio_util_03::codec::BytesCodec::new(),
40//!     ).map_ok(|bytes| bytes.freeze())
41//! }
42//!
43//! /// Upgrade replacement with `tokio` v0.3 and `bytes` v0.5 using `tokio-util` v0.4
44//! fn tokio_03_bytes_05(
45//!     input: impl Stream<Item = Result<bytes_05::Bytes>>,
46//! ) -> impl Stream<Item = Result<bytes_05::Bytes>> {
47//!     tokio_util_04::io::ReaderStream::new(
48//!         async_compression::tokio_03::bufread::GzipEncoder::new(
49//!             tokio_util_04::io::StreamReader::new(input),
50//!         ),
51//!     )
52//! }
53//!
54//! /// Upgrade replacement with `tokio` v0.3 and `bytes` v0.6 using `tokio-util` v0.5
55//! fn tokio_03_bytes_06(
56//!     input: impl Stream<Item = Result<bytes_06::Bytes>>,
57//! ) -> impl Stream<Item = Result<bytes_06::Bytes>> {
58//!     tokio_util_05::io::ReaderStream::new(
59//!         async_compression::tokio_03::bufread::GzipEncoder::new(
60//!             tokio_util_05::io::StreamReader::new(input),
61//!         ),
62//!     )
63//! }
64//!
65//! /// Upgrade replacement with `tokio` v1.0 and `bytes` v1.0 using `tokio-util` v0.6
66//! fn tokio_bytes(
67//!     input: impl Stream<Item = Result<bytes::Bytes>>,
68//! ) -> impl Stream<Item = Result<bytes::Bytes>> {
69//!     tokio_util_06::io::ReaderStream::new(
70//!         async_compression::tokio::bufread::GzipEncoder::new(
71//!             tokio_util_06::io::StreamReader::new(input),
72//!         ),
73//!     )
74//! }
75//!
76//! /// What if you didn't want anything to do with `bytes`, but just a `Vec<u8>` instead?
77//! fn futures_vec(
78//!     input: impl Stream<Item = Result<Vec<u8>>> + Unpin,
79//! ) -> impl Stream<Item = Result<Vec<u8>>> {
80//!     use futures::io::AsyncReadExt;
81//!
82//!     futures::stream::try_unfold(
83//!         async_compression::futures::bufread::GzipEncoder::new(input.into_async_read()),
84//!         |mut encoder| async move {
85//!             let mut chunk = vec![0; 8 * 1024];
86//!             let len = encoder.read(&mut chunk).await?;
87//!             if len == 0 {
88//!                 Ok(None)
89//!             } else {
90//!                 chunk.truncate(len);
91//!                 Ok(Some((chunk, encoder)))
92//!             }
93//!         })
94//! }
95//! #
96//! # futures::executor::block_on(async {
97//! #     let data = || futures::stream::iter(vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5, 6])]);
98//! #     let expected: Vec<Vec<u8>> = from(data().map_ok(bytes_05::Bytes::from))
99//! #         .map_ok(|bytes| bytes.as_ref().into())
100//! #         .try_collect()
101//! #         .await?;
102//! #
103//! #     assert_eq!(
104//! #         expected,
105//! #         tokio_02_bytes_05(data().map_ok(bytes_05::Bytes::from))
106//! #             .map_ok(|bytes| bytes.as_ref().into())
107//! #             .try_collect::<Vec<Vec<u8>>>()
108//! #             .await?,
109//! #     );
110//! #     assert_eq!(
111//! #         expected,
112//! #         tokio_03_bytes_05(data().map_ok(bytes_05::Bytes::from))
113//! #             .map_ok(|bytes| bytes.as_ref().into())
114//! #             .try_collect::<Vec<Vec<u8>>>()
115//! #             .await?,
116//! #     );
117//! #     assert_eq!(
118//! #         expected,
119//! #         tokio_03_bytes_06(data().map_ok(bytes_06::Bytes::from))
120//! #             .map_ok(|bytes| bytes.as_ref().into())
121//! #             .try_collect::<Vec<Vec<u8>>>()
122//! #             .await?,
123//! #     );
124//! #     assert_eq!(
125//! #         expected,
126//! #         tokio_bytes(data().map_ok(bytes::Bytes::from))
127//! #             .map_ok(|bytes| bytes.as_ref().into())
128//! #             .try_collect::<Vec<Vec<u8>>>()
129//! #             .await?,
130//! #     );
131//! #     assert_eq!(
132//! #         expected,
133//! #         futures_vec(data())
134//! #             .try_collect::<Vec<Vec<u8>>>()
135//! #             .await?
136//! #     );
137//! #     Ok::<_, std::io::Error>(())
138//! # })?; Ok::<_, std::io::Error>(())
139//! ```
140
141#![deprecated(
142    since = "0.3.8",
143    note = "See `async-compression::stream` docs for migration"
144)]
145
146#[macro_use]
147mod macros;
148mod generic;
149
150pub(crate) use self::generic::{Decoder, Encoder};
151
152algos!(stream<S>);