clone_stream/
lib.rs

1//! # Clone streams with `clone-stream`
2//!
3//! Turn any [`Stream`] into a cloneable stream where each clone receives all
4//! items independently.
5//!
6//! The [`CloneStream`] struct implements [`Clone`] + [`Stream`], allowing you
7//! to create multiple independent consumers of the same data. The
8//! [`ForkStream`] trait provides the entry point for converting regular
9//! streams.
10//!
11//! # Quick Start
12//!
13//! ```rust
14//! use clone_stream::ForkStream;
15//! use futures::{StreamExt, stream};
16//!
17//! # #[tokio::main]
18//! # async fn main() {
19//! let stream = stream::iter(vec![1, 2, 3]).fork();
20//! let mut clone1 = stream.clone();
21//! let mut clone2 = stream.clone();
22//! // Both clones receive all items independently
23//! # }
24//! ```
25mod clone;
26mod error;
27mod fork;
28mod states;
29
30pub use clone::CloneStream;
31pub use error::{CloneStreamError, Result};
32use fork::Fork;
33pub use fork::ForkConfig;
34use futures::Stream;
35
36/// Extension trait to make any [`Stream`] cloneable.
37pub trait ForkStream: Stream<Item: Clone> + Sized {
38    /// Creates a cloneable version of this stream.
39    ///
40    /// ```rust
41    /// use clone_stream::ForkStream;
42    /// use futures::{StreamExt, stream};
43    ///
44    /// let stream = stream::iter(0..3).fork();
45    /// let mut clone = stream.clone();
46    /// ```
47    fn fork(self) -> CloneStream<Self> {
48        CloneStream::from(Fork::new(self))
49    }
50
51    /// Creates a cloneable stream with custom limits.
52    ///
53    /// # Arguments
54    /// * `max_queue_size` - Max items queued before panic
55    /// * `max_clone_count` - Max clones before panic
56    ///
57    /// # Panics
58    /// When limits are exceeded during operation.
59    ///
60    /// ```rust
61    /// use clone_stream::ForkStream;
62    /// use futures::stream;
63    ///
64    /// let stream = stream::iter(0..3).fork_with_limits(100, 5);
65    /// ```
66    fn fork_with_limits(self, max_queue_size: usize, max_clone_count: usize) -> CloneStream<Self> {
67        let config = ForkConfig {
68            max_clone_count,
69            max_queue_size,
70        };
71        CloneStream::from(Fork::with_config(self, config))
72    }
73}
74
75impl<BaseStream> ForkStream for BaseStream where BaseStream: Stream<Item: Clone> {}
76
77impl<BaseStream> From<BaseStream> for CloneStream<BaseStream>
78where
79    BaseStream: Stream<Item: Clone>,
80{
81    /// Converts a stream into a cloneable stream.
82    fn from(base_stream: BaseStream) -> CloneStream<BaseStream> {
83        CloneStream::from(Fork::new(base_stream))
84    }
85}