clone_stream/lib.rs
1//! # Clone streams with `clone-stream`
2//!
3//! Turn any [`Stream`] into a cloneable stream where each clone receives all
4//! items independently.
5//!
6//! The [`CloneStream`] struct implements [`Clone`] + [`Stream`], allowing you
7//! to create multiple independent consumers of the same data. The
8//! [`ForkStream`] trait provides the entry point for converting regular
9//! streams.
10//!
11//! # Quick Start
12//!
13//! ```rust
14//! use clone_stream::ForkStream;
15//! use futures::{StreamExt, stream};
16//!
17//! # #[tokio::main]
18//! # async fn main() {
19//! let stream = stream::iter(vec![1, 2, 3]).fork();
20//! let mut clone1 = stream.clone();
21//! let mut clone2 = stream.clone();
22//! // Both clones receive all items independently
23//! # }
24//! ```
25mod clone;
26mod error;
27mod fork;
28mod states;
29
30pub use clone::CloneStream;
31pub use error::{CloneStreamError, Result};
32use fork::Fork;
33pub use fork::ForkConfig;
34use futures::Stream;
35
36/// Extension trait to make any [`Stream`] cloneable.
37pub trait ForkStream: Stream<Item: Clone> + Sized {
38 /// Creates a cloneable version of this stream.
39 ///
40 /// ```rust
41 /// use clone_stream::ForkStream;
42 /// use futures::{StreamExt, stream};
43 ///
44 /// let stream = stream::iter(0..3).fork();
45 /// let mut clone = stream.clone();
46 /// ```
47 fn fork(self) -> CloneStream<Self> {
48 CloneStream::from(Fork::new(self))
49 }
50
51 /// Creates a cloneable stream with custom limits.
52 ///
53 /// # Arguments
54 /// * `max_queue_size` - Max items queued before panic
55 /// * `max_clone_count` - Max clones before panic
56 ///
57 /// # Panics
58 /// When limits are exceeded during operation.
59 ///
60 /// ```rust
61 /// use clone_stream::ForkStream;
62 /// use futures::stream;
63 ///
64 /// let stream = stream::iter(0..3).fork_with_limits(100, 5);
65 /// ```
66 fn fork_with_limits(self, max_queue_size: usize, max_clone_count: usize) -> CloneStream<Self> {
67 let config = ForkConfig {
68 max_clone_count,
69 max_queue_size,
70 };
71 CloneStream::from(Fork::with_config(self, config))
72 }
73}
74
75impl<BaseStream> ForkStream for BaseStream where BaseStream: Stream<Item: Clone> {}
76
77impl<BaseStream> From<BaseStream> for CloneStream<BaseStream>
78where
79 BaseStream: Stream<Item: Clone>,
80{
81 /// Converts a stream into a cloneable stream.
82 fn from(base_stream: BaseStream) -> CloneStream<BaseStream> {
83 CloneStream::from(Fork::new(base_stream))
84 }
85}