clone_stream/
clone.rs

1use std::{
2    pin::Pin,
3    sync::{Arc, RwLock},
4    task::{Context, Poll},
5};
6
7use futures::{Stream, stream::FusedStream};
8use log::trace;
9
10use crate::fork::Fork;
11
12/// A stream that implements `Clone` and returns cloned items from a base
13/// stream.
14///
15/// This is the main type provided by this crate. It wraps any [`Stream`] whose
16/// items implement [`Clone`], allowing the stream itself to be cloned. Each
17/// clone operates independently but shares the same underlying stream data.
18///
19/// # Examples
20///
21/// ```rust
22/// use clone_stream::ForkStream;
23/// use futures::{StreamExt, stream};
24///
25/// # #[tokio::main]
26/// # async fn main() {
27/// let stream = stream::iter(vec![1, 2, 3]);
28/// let clone_stream = stream.fork();
29///
30/// // Create multiple clones that can be used independently
31/// let mut clone1 = clone_stream.clone();
32/// let mut clone2 = clone_stream.clone();
33///
34/// // Each clone can be polled independently
35/// let item1 = clone1.next().await;
36/// let item2 = clone2.next().await;
37///
38/// println!("Clone1 got: {:?}, Clone2 got: {:?}", item1, item2);
39/// # }
40/// ```
41///
42/// # Performance
43///
44/// Items are cached internally until all clones have consumed them. The memory
45/// usage grows with the number of items that haven't been consumed by all
46/// clones yet.
47pub struct CloneStream<BaseStream>
48where
49    BaseStream: Stream<Item: Clone>,
50{
51    pub(crate) fork: Arc<RwLock<Fork<BaseStream>>>,
52    /// Unique identifier for this clone within the fork
53    pub id: usize,
54}
55
56impl<BaseStream> From<Fork<BaseStream>> for CloneStream<BaseStream>
57where
58    BaseStream: Stream<Item: Clone>,
59{
60    fn from(mut fork: Fork<BaseStream>) -> Self {
61        let id = fork
62            .clone_registry
63            .register()
64            .expect("Failed to register initial clone");
65
66        Self {
67            id,
68            fork: Arc::new(RwLock::new(fork)),
69        }
70    }
71}
72
73impl<BaseStream> Clone for CloneStream<BaseStream>
74where
75    BaseStream: Stream<Item: Clone>,
76{
77    /// Creates a new clone of this stream.
78    ///
79    /// # Panics
80    ///
81    /// Panics if the maximum number of clones has been exceeded for this
82    /// stream. The limit is set when creating the stream with
83    /// [`ForkStream::fork_with_limits`].
84    ///
85    /// [`ForkStream::fork_with_limits`]: crate::ForkStream::fork_with_limits
86    fn clone(&self) -> Self {
87        let mut fork = self.fork.write().expect("Fork lock poisoned during clone");
88        let clone_id = fork
89            .clone_registry
90            .register()
91            .expect("Failed to register clone - clone limit exceeded");
92        drop(fork);
93
94        Self {
95            fork: self.fork.clone(),
96            id: clone_id,
97        }
98    }
99}
100
101impl<BaseStream> Stream for CloneStream<BaseStream>
102where
103    BaseStream: Stream<Item: Clone>,
104{
105    type Item = BaseStream::Item;
106
107    fn poll_next(self: Pin<&mut Self>, current_task: &mut Context) -> Poll<Option<Self::Item>> {
108        trace!("Polling next item for clone {}.", self.id);
109        let waker = current_task.waker();
110        let mut fork = self
111            .fork
112            .write()
113            .expect("Fork lock poisoned during poll_next");
114        fork.poll_clone(self.id, waker)
115    }
116
117    fn size_hint(&self) -> (usize, Option<usize>) {
118        let fork = self
119            .fork
120            .read()
121            .expect("Fork lock poisoned during size_hint");
122        let (lower, upper) = fork.size_hint();
123        let n_cached = fork.remaining_queued_items(self.id);
124        (lower + n_cached, upper.map(|u| u + n_cached))
125    }
126}
127
128impl<BaseStream> FusedStream for CloneStream<BaseStream>
129where
130    BaseStream: FusedStream<Item: Clone>,
131{
132    /// Returns `true` if the stream is terminated.
133    ///
134    /// A clone stream is considered terminated when both:
135    /// 1. The underlying base stream is terminated
136    /// 2. This clone has no remaining queued items to consume
137    fn is_terminated(&self) -> bool {
138        let fork = self
139            .fork
140            .read()
141            .expect("Fork lock poisoned during is_terminated");
142        fork.is_terminated() && fork.remaining_queued_items(self.id) == 0
143    }
144}
145
146impl<BaseStream> Drop for CloneStream<BaseStream>
147where
148    BaseStream: Stream<Item: Clone>,
149{
150    fn drop(&mut self) {
151        if let Ok(mut fork) = self.fork.try_write() {
152            fork.unregister(self.id);
153        } else {
154            log::warn!(
155                "Failed to acquire lock during clone drop for clone {}",
156                self.id
157            );
158        }
159    }
160}
161
162impl<BaseStream> CloneStream<BaseStream>
163where
164    BaseStream: Stream<Item: Clone>,
165{
166    /// Returns the number of items currently queued for this clone.
167    ///
168    /// This represents items that have been produced by the base stream but not
169    /// yet consumed by this particular clone. Other clones may have
170    /// different queue lengths depending on their consumption patterns.
171    ///
172    /// # Panics
173    ///
174    /// Panics if the internal fork lock is poisoned. This should not happen
175    /// under normal circumstances.
176    ///
177    /// # Examples
178    ///
179    /// ```rust
180    /// use clone_stream::ForkStream;
181    /// use futures::stream;
182    ///
183    /// let stream = stream::iter(vec![1, 2, 3]);
184    /// let clone_stream = stream.fork();
185    /// assert_eq!(clone_stream.n_queued_items(), 0);
186    /// ```
187    #[must_use]
188    pub fn n_queued_items(&self) -> usize {
189        trace!("Getting the number of queued items for clone {}.", self.id);
190        self.fork
191            .read()
192            .expect("Fork lock poisoned during n_queued_items")
193            .remaining_queued_items(self.id)
194    }
195}