stream_utils/
lib.rs

1#![warn(missing_docs)]
2#![crate_name = "stream_utils"]
3#![forbid(unsafe_code)]
4
5//! Extra Stream adaptors and functions.
6//!
7//! To Extend [`Stream`] with methods in this crate, import the [`StreamUtils`] trait:
8//!
9//! ```
10//! use stream_utils::StreamUtils;
11//! ```
12//!
13//! Now, new methods like [`copied_multi_stream`][`StreamUtils::copied_multi_stream`] are available
14//! on all streams.
15//!
16//! ```
17//! use futures_util::stream; // or futures::stream;
18//! use stream_utils::StreamUtils;
19//!
20//! let stream = stream::iter(0..3);
21//! let streams = stream.copied_multi_stream(4);
22//! ```
23
24use futures_util::Stream;
25
26mod copied_multi_stream;
27
28pub use crate::copied_multi_stream::*;
29
30/// A [`Stream`] blanket implementation trait that provides extra adaptors.
31///
32/// [`Stream`]: crate::Stream
33/// [futures]: https://docs.rs/futures
34/// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
35pub trait StreamUtils: Stream {
36    /// Copies values from the inner stream into multiple new streams. Polls from inner stream one
37    /// value and waits till all new streams have pulled a copied value.
38    /// Note that the internal buffer only buffers one value from the inner stream.
39    /// Not pulling from all new streams in sequence will result in an endless loop
40    /// polling a [`Pending`] state. Essentially blocking.
41    ///
42    /// When the underlying stream terminates, all new streams which have allready pulled the last value will be [`Pending`].
43    /// When all new streams have pulled the last value, all streams will terminate.
44    ///
45    /// [`Pending`]: std::task::Poll#variant.Pending
46    #[inline(always)]
47    fn copied_multi_stream(self, i: usize) -> Vec<CopiedMultiStream<Self>>
48    where
49        Self: Sized,
50    {
51        copied_multi_stream(self, i)
52    }
53}
54
55impl<T: Stream> StreamUtils for T {}