clone_stream/
lib.rs

1//! # Clone streams with `clone-stream`
2//!
3//! Lazy single-threaded stream cloning: items are only cloned when a consumer
4//! actually polls for them.
5//!
6//! The [`CloneStream`] struct implements [`Clone`] + [`Stream`], allowing you
7//! to create multiple concurrent consumers. The [`ForkStream`] trait provides
8//! the entry point via the [`fork()`](ForkStream::fork) method.
9//!
10//! # How It Works
11//!
12//! Unlike broadcast channels that eagerly clone every item for every
13//! subscriber, this crate clones **on-demand**. Items are delivered only to
14//! clones actively polling when items arrive (poll-time semantics).
15//!
16//! # Important: Single-Threaded Runtime Required
17//!
18//! This crate requires single-threaded async runtimes (`current_thread` flavor
19//! or `#[tokio::test]`). The lazy semantics depend on cooperative scheduling;
20//! multi-threaded runtimes cause race conditions.
21pub mod clean_log;
22mod clone;
23mod error;
24mod fork;
25mod registry;
26pub mod ring_queue;
27mod states;
28
29pub use clone::CloneStream;
30pub use error::{CloneStreamError, Result};
31use fork::Fork;
32pub use fork::ForkConfig;
33use futures::Stream;
34
35/// Extension trait to make any [`Stream`] cloneable.
36pub trait ForkStream: Stream<Item: Clone> + Sized {
37    /// Creates a cloneable version of this stream.
38    ///
39    /// ```rust
40    /// use clone_stream::ForkStream;
41    /// use futures::{StreamExt, stream};
42    ///
43    /// let stream = stream::iter(0..3).fork();
44    /// let mut clone = stream.clone();
45    /// ```
46    fn fork(self) -> CloneStream<Self> {
47        CloneStream::from(Fork::new(self))
48    }
49
50    /// Creates a cloneable stream with custom limits.
51    ///
52    /// # Arguments
53    /// * `max_queue_size` - Max items queued before panic
54    /// * `max_clone_count` - Max clones before panic
55    ///
56    /// # Panics
57    /// When limits are exceeded during operation.
58    ///
59    /// ```rust
60    /// use clone_stream::ForkStream;
61    /// use futures::stream;
62    ///
63    /// let stream = stream::iter(0..3).fork_with_limits(100, 5);
64    /// ```
65    fn fork_with_limits(self, max_queue_size: usize, max_clone_count: usize) -> CloneStream<Self> {
66        let config = ForkConfig {
67            max_clone_count,
68            max_queue_size,
69        };
70        CloneStream::from(Fork::with_config(self, config))
71    }
72}
73
74impl<BaseStream> ForkStream for BaseStream where BaseStream: Stream<Item: Clone> {}
75
76impl<BaseStream> From<BaseStream> for CloneStream<BaseStream>
77where
78    BaseStream: Stream<Item: Clone>,
79{
80    /// Converts a stream into a cloneable stream.
81    fn from(base_stream: BaseStream) -> Self {
82        Self::from(Fork::new(base_stream))
83    }
84}