clone_stream/
clone.rs

1use std::{
2    pin::Pin,
3    sync::{Arc, RwLock},
4    task::{Context, Poll},
5};
6
7use futures::{Stream, stream::FusedStream};
8use log::trace;
9
10use crate::fork::Fork;
11
12/// A stream that implements `Clone` and returns cloned items from a base
13/// stream.
14///
15/// This is the main type provided by this crate. It wraps any [`Stream`] whose
16/// items implement [`Clone`], allowing the stream itself to be cloned. Each
17/// clone operates independently but shares the same underlying stream data.
18///
19/// # Examples
20///
21/// ```rust
22/// use clone_stream::ForkStream;
23/// use futures::{StreamExt, stream};
24///
25/// # #[tokio::main]
26/// # async fn main() {
27/// let stream = stream::iter(vec![1, 2, 3]);
28/// let clone_stream = stream.fork();
29///
30/// // Create multiple clones that can be used independently
31/// let mut clone1 = clone_stream.clone();
32/// let mut clone2 = clone_stream.clone();
33///
34/// // Each clone can be polled independently
35/// let item1 = clone1.next().await;
36/// let item2 = clone2.next().await;
37///
38/// println!("Clone1 got: {:?}, Clone2 got: {:?}", item1, item2);
39/// # }
40/// ```
41///
42/// # Performance
43///
44/// Items are cached internally until all clones have consumed them. The memory
45/// usage grows with the number of items that haven't been consumed by all
46/// clones yet.
47pub struct CloneStream<BaseStream>
48where
49    BaseStream: Stream<Item: Clone>,
50{
51    pub(crate) fork: Arc<RwLock<Fork<BaseStream>>>,
52    /// Unique identifier for this clone within the fork
53    pub id: usize,
54}
55
56impl<BaseStream> From<Fork<BaseStream>> for CloneStream<BaseStream>
57where
58    BaseStream: Stream<Item: Clone>,
59{
60    fn from(mut fork: Fork<BaseStream>) -> Self {
61        let id = fork.register().expect("Failed to register initial clone");
62
63        Self {
64            id,
65            fork: Arc::new(RwLock::new(fork)),
66        }
67    }
68}
69
70impl<BaseStream> Clone for CloneStream<BaseStream>
71where
72    BaseStream: Stream<Item: Clone>,
73{
74    /// Creates a new clone of this stream.
75    ///
76    /// # Panics
77    ///
78    /// Panics if the maximum number of clones has been exceeded for this
79    /// stream. The limit is set when creating the stream with
80    /// [`ForkStream::fork_with_limits`].
81    ///
82    /// [`ForkStream::fork_with_limits`]: crate::ForkStream::fork_with_limits
83    fn clone(&self) -> Self {
84        let mut fork = self.fork.write().expect("Fork lock poisoned during clone");
85        let clone_id = fork
86            .register()
87            .expect("Failed to register clone - clone limit exceeded");
88        drop(fork);
89
90        Self {
91            fork: self.fork.clone(),
92            id: clone_id,
93        }
94    }
95}
96
97impl<BaseStream> Stream for CloneStream<BaseStream>
98where
99    BaseStream: Stream<Item: Clone>,
100{
101    type Item = BaseStream::Item;
102
103    fn poll_next(self: Pin<&mut Self>, current_task: &mut Context) -> Poll<Option<Self::Item>> {
104        let waker = current_task.waker();
105        let mut fork = self
106            .fork
107            .write()
108            .expect("Fork lock poisoned during poll_next");
109        fork.poll_clone(self.id, waker)
110    }
111
112    fn size_hint(&self) -> (usize, Option<usize>) {
113        let fork = self
114            .fork
115            .read()
116            .expect("Fork lock poisoned during size_hint");
117        let (lower, upper) = fork.size_hint();
118        let n_cached = fork.remaining_queued_items(self.id);
119        (lower + n_cached, upper.map(|u| u + n_cached))
120    }
121}
122
123impl<BaseStream> FusedStream for CloneStream<BaseStream>
124where
125    BaseStream: FusedStream<Item: Clone>,
126{
127    /// Returns `true` if the stream is terminated.
128    ///
129    /// A clone stream is considered terminated when both:
130    /// 1. The underlying base stream is terminated
131    /// 2. This clone has no remaining queued items to consume
132    fn is_terminated(&self) -> bool {
133        let fork = self
134            .fork
135            .read()
136            .expect("Fork lock poisoned during is_terminated");
137        fork.is_terminated() && fork.remaining_queued_items(self.id) == 0
138    }
139}
140
141impl<BaseStream> Drop for CloneStream<BaseStream>
142where
143    BaseStream: Stream<Item: Clone>,
144{
145    fn drop(&mut self) {
146        if let Ok(mut fork) = self.fork.try_write() {
147            fork.unregister(self.id);
148        } else {
149            log::warn!(
150                "Failed to acquire lock during clone drop for clone {}",
151                self.id
152            );
153        }
154    }
155}
156
157impl<BaseStream> CloneStream<BaseStream>
158where
159    BaseStream: Stream<Item: Clone>,
160{
161    /// Returns the number of items currently queued for this clone.
162    ///
163    /// This represents items that have been produced by the base stream but not
164    /// yet consumed by this particular clone. Other clones may have
165    /// different queue lengths depending on their consumption patterns.
166    ///
167    /// # Panics
168    ///
169    /// Panics if the internal fork lock is poisoned. This should not happen
170    /// under normal circumstances.
171    ///
172    /// # Examples
173    ///
174    /// ```rust
175    /// use clone_stream::ForkStream;
176    /// use futures::stream;
177    ///
178    /// let stream = stream::iter(vec![1, 2, 3]);
179    /// let clone_stream = stream.fork();
180    /// assert_eq!(clone_stream.n_queued_items(), 0);
181    /// ```
182    #[must_use]
183    pub fn n_queued_items(&self) -> usize {
184        trace!("Getting the number of queued items for clone {}.", self.id);
185        self.fork
186            .read()
187            .expect("Fork lock poisoned during n_queued_items")
188            .remaining_queued_items(self.id)
189    }
190}