clone_stream/lib.rs
1//! # Clone streams with `clone-stream`
2//!
3//! Turn any [`Stream`] into a cloneable stream where each clone receives all
4//! items independently.
5//!
6//! The [`CloneStream`] struct implements [`Clone`] + [`Stream`], allowing you
7//! to create multiple independent consumers of the same data. The
8//! [`ForkStream`] trait provides the entry point for converting regular
9//! streams.
10//!
11//! # Quick Start
12//!
13//! ```rust
14//! use clone_stream::ForkStream;
15//! use futures::{StreamExt, stream};
16//!
17//! # #[tokio::main]
18//! # async fn main() {
19//! let stream = stream::iter(vec![1, 2, 3]).fork();
20//! let mut clone1 = stream.clone();
21//! let mut clone2 = stream.clone();
22//! // Both clones receive all items independently
23//! # }
24//! ```
25pub mod clean_log;
26mod clone;
27mod error;
28mod fork;
29mod registry;
30pub mod ring_queue;
31mod states;
32
33pub use clone::CloneStream;
34pub use error::{CloneStreamError, Result};
35use fork::Fork;
36pub use fork::ForkConfig;
37use futures::Stream;
38
39/// Extension trait to make any [`Stream`] cloneable.
40pub trait ForkStream: Stream<Item: Clone> + Sized {
41 /// Creates a cloneable version of this stream.
42 ///
43 /// ```rust
44 /// use clone_stream::ForkStream;
45 /// use futures::{StreamExt, stream};
46 ///
47 /// let stream = stream::iter(0..3).fork();
48 /// let mut clone = stream.clone();
49 /// ```
50 fn fork(self) -> CloneStream<Self> {
51 CloneStream::from(Fork::new(self))
52 }
53
54 /// Creates a cloneable stream with custom limits.
55 ///
56 /// # Arguments
57 /// * `max_queue_size` - Max items queued before panic
58 /// * `max_clone_count` - Max clones before panic
59 ///
60 /// # Panics
61 /// When limits are exceeded during operation.
62 ///
63 /// ```rust
64 /// use clone_stream::ForkStream;
65 /// use futures::stream;
66 ///
67 /// let stream = stream::iter(0..3).fork_with_limits(100, 5);
68 /// ```
69 fn fork_with_limits(self, max_queue_size: usize, max_clone_count: usize) -> CloneStream<Self> {
70 let config = ForkConfig {
71 max_clone_count,
72 max_queue_size,
73 };
74 CloneStream::from(Fork::with_config(self, config))
75 }
76}
77
78impl<BaseStream> ForkStream for BaseStream where BaseStream: Stream<Item: Clone> {}
79
80impl<BaseStream> From<BaseStream> for CloneStream<BaseStream>
81where
82 BaseStream: Stream<Item: Clone>,
83{
84 /// Converts a stream into a cloneable stream.
85 fn from(base_stream: BaseStream) -> CloneStream<BaseStream> {
86 CloneStream::from(Fork::new(base_stream))
87 }
88}