clone_stream/lib.rs
1//! # Clone streams with `clone-stream`
2//!
3//! Lazy single-threaded stream cloning: items are only cloned when a consumer
4//! actually polls for them.
5//!
6//! The [`CloneStream`] struct implements [`Clone`] + [`Stream`], allowing you
7//! to create multiple concurrent consumers. The [`ForkStream`] trait provides
8//! the entry point via the [`fork()`](ForkStream::fork) method.
9//!
10//! # How It Works
11//!
12//! Unlike broadcast channels that eagerly clone every item for every
13//! subscriber, this crate clones **on-demand**. Items are delivered only to
14//! clones actively polling when items arrive (poll-time semantics).
15//!
16//! # Important: Single-Threaded Runtime Required
17//!
18//! This crate requires single-threaded async runtimes (`current_thread` flavor
19//! or `#[tokio::test]`). The lazy semantics depend on cooperative scheduling;
20//! multi-threaded runtimes cause race conditions.
21pub mod clean_log;
22mod clone;
23mod error;
24mod fork;
25mod registry;
26pub mod ring_queue;
27mod states;
28
29pub use clone::CloneStream;
30pub use error::{CloneStreamError, Result};
31use fork::Fork;
32pub use fork::ForkConfig;
33use futures::Stream;
34
35/// Extension trait to make any [`Stream`] cloneable.
36pub trait ForkStream: Stream<Item: Clone> + Sized {
37 /// Creates a cloneable version of this stream.
38 ///
39 /// ```rust
40 /// use clone_stream::ForkStream;
41 /// use futures::{StreamExt, stream};
42 ///
43 /// let stream = stream::iter(0..3).fork();
44 /// let mut clone = stream.clone();
45 /// ```
46 fn fork(self) -> CloneStream<Self> {
47 CloneStream::from(Fork::new(self))
48 }
49
50 /// Creates a cloneable stream with custom limits.
51 ///
52 /// # Arguments
53 /// * `max_queue_size` - Max items queued before panic
54 /// * `max_clone_count` - Max clones before panic
55 ///
56 /// # Panics
57 /// When limits are exceeded during operation.
58 ///
59 /// ```rust
60 /// use clone_stream::ForkStream;
61 /// use futures::stream;
62 ///
63 /// let stream = stream::iter(0..3).fork_with_limits(100, 5);
64 /// ```
65 fn fork_with_limits(self, max_queue_size: usize, max_clone_count: usize) -> CloneStream<Self> {
66 let config = ForkConfig {
67 max_clone_count,
68 max_queue_size,
69 };
70 CloneStream::from(Fork::with_config(self, config))
71 }
72}
73
74impl<BaseStream> ForkStream for BaseStream where BaseStream: Stream<Item: Clone> {}
75
76impl<BaseStream> From<BaseStream> for CloneStream<BaseStream>
77where
78 BaseStream: Stream<Item: Clone>,
79{
80 /// Converts a stream into a cloneable stream.
81 fn from(base_stream: BaseStream) -> Self {
82 Self::from(Fork::new(base_stream))
83 }
84}