clone_stream/
lib.rs

1//! # Clone streams with `clone-stream`
2//!
3//! Turn any [`Stream`] into a cloneable stream where each clone receives all
4//! items independently.
5//!
6//! The [`CloneStream`] struct implements [`Clone`] + [`Stream`], allowing you
7//! to create multiple independent consumers of the same data. The
8//! [`ForkStream`] trait provides the entry point for converting regular
9//! streams.
10//!
11//! # Quick Start
12//!
13//! ```rust
14//! use clone_stream::ForkStream;
15//! use futures::{StreamExt, stream};
16//!
17//! # #[tokio::main]
18//! # async fn main() {
19//! let stream = stream::iter(vec![1, 2, 3]).fork();
20//! let mut clone1 = stream.clone();
21//! let mut clone2 = stream.clone();
22//! // Both clones receive all items independently
23//! # }
24//! ```
25pub mod clean_log;
26mod clone;
27mod error;
28mod fork;
29pub mod ring_queue;
30mod states;
31
32pub use clone::CloneStream;
33pub use error::{CloneStreamError, Result};
34use fork::Fork;
35pub use fork::ForkConfig;
36use futures::Stream;
37
38/// Extension trait to make any [`Stream`] cloneable.
39pub trait ForkStream: Stream<Item: Clone> + Sized {
40    /// Creates a cloneable version of this stream.
41    ///
42    /// ```rust
43    /// use clone_stream::ForkStream;
44    /// use futures::{StreamExt, stream};
45    ///
46    /// let stream = stream::iter(0..3).fork();
47    /// let mut clone = stream.clone();
48    /// ```
49    fn fork(self) -> CloneStream<Self> {
50        CloneStream::from(Fork::new(self))
51    }
52
53    /// Creates a cloneable stream with custom limits.
54    ///
55    /// # Arguments
56    /// * `max_queue_size` - Max items queued before panic
57    /// * `max_clone_count` - Max clones before panic
58    ///
59    /// # Panics
60    /// When limits are exceeded during operation.
61    ///
62    /// ```rust
63    /// use clone_stream::ForkStream;
64    /// use futures::stream;
65    ///
66    /// let stream = stream::iter(0..3).fork_with_limits(100, 5);
67    /// ```
68    fn fork_with_limits(self, max_queue_size: usize, max_clone_count: usize) -> CloneStream<Self> {
69        let config = ForkConfig {
70            max_clone_count,
71            max_queue_size,
72        };
73        CloneStream::from(Fork::with_config(self, config))
74    }
75}
76
77impl<BaseStream> ForkStream for BaseStream where BaseStream: Stream<Item: Clone> {}
78
79impl<BaseStream> From<BaseStream> for CloneStream<BaseStream>
80where
81    BaseStream: Stream<Item: Clone>,
82{
83    /// Converts a stream into a cloneable stream.
84    fn from(base_stream: BaseStream) -> CloneStream<BaseStream> {
85        CloneStream::from(Fork::new(base_stream))
86    }
87}