clone_stream/
lib.rs

1//! # Clone streams with `clone-stream`
2//!
3//! Turn any [`Stream`] into a cloneable stream where each clone receives all
4//! items independently.
5//!
6//! The [`CloneStream`] struct implements [`Clone`] + [`Stream`], allowing you
7//! to create multiple independent consumers of the same data. The
8//! [`ForkStream`] trait provides the entry point for converting regular
9//! streams.
10//!
11//! # Quick Start
12//!
13//! ```rust
14//! use clone_stream::ForkStream;
15//! use futures::{StreamExt, stream};
16//!
17//! # #[tokio::main]
18//! # async fn main() {
19//! let stream = stream::iter(vec![1, 2, 3]).fork();
20//! let mut clone1 = stream.clone();
21//! let mut clone2 = stream.clone();
22//! // Both clones receive all items independently
23//! # }
24//! ```
25pub mod clean_log;
26mod clone;
27mod error;
28mod fork;
29mod registry;
30pub mod ring_queue;
31mod states;
32
33pub use clone::CloneStream;
34pub use error::{CloneStreamError, Result};
35use fork::Fork;
36pub use fork::ForkConfig;
37use futures::Stream;
38
39/// Extension trait to make any [`Stream`] cloneable.
40pub trait ForkStream: Stream<Item: Clone> + Sized {
41    /// Creates a cloneable version of this stream.
42    ///
43    /// ```rust
44    /// use clone_stream::ForkStream;
45    /// use futures::{StreamExt, stream};
46    ///
47    /// let stream = stream::iter(0..3).fork();
48    /// let mut clone = stream.clone();
49    /// ```
50    fn fork(self) -> CloneStream<Self> {
51        CloneStream::from(Fork::new(self))
52    }
53
54    /// Creates a cloneable stream with custom limits.
55    ///
56    /// # Arguments
57    /// * `max_queue_size` - Max items queued before panic
58    /// * `max_clone_count` - Max clones before panic
59    ///
60    /// # Panics
61    /// When limits are exceeded during operation.
62    ///
63    /// ```rust
64    /// use clone_stream::ForkStream;
65    /// use futures::stream;
66    ///
67    /// let stream = stream::iter(0..3).fork_with_limits(100, 5);
68    /// ```
69    fn fork_with_limits(self, max_queue_size: usize, max_clone_count: usize) -> CloneStream<Self> {
70        let config = ForkConfig {
71            max_clone_count,
72            max_queue_size,
73        };
74        CloneStream::from(Fork::with_config(self, config))
75    }
76}
77
78impl<BaseStream> ForkStream for BaseStream where BaseStream: Stream<Item: Clone> {}
79
80impl<BaseStream> From<BaseStream> for CloneStream<BaseStream>
81where
82    BaseStream: Stream<Item: Clone>,
83{
84    /// Converts a stream into a cloneable stream.
85    fn from(base_stream: BaseStream) -> CloneStream<BaseStream> {
86        CloneStream::from(Fork::new(base_stream))
87    }
88}