clone_stream/clone.rs
1use std::{
2 pin::Pin,
3 sync::{Arc, RwLock},
4 task::{Context, Poll},
5};
6
7use futures::{Stream, stream::FusedStream};
8use log::trace;
9
10use crate::fork::Fork;
11
12/// A stream that implements `Clone` and returns cloned items from a base
13/// stream.
14///
15/// This is the main type provided by this crate. It wraps any [`Stream`] whose
16/// items implement [`Clone`], allowing the stream itself to be cloned. Each
17/// clone operates independently but shares the same underlying stream data.
18///
19/// # Examples
20///
21/// ```rust
22/// use clone_stream::ForkStream;
23/// use futures::{StreamExt, stream};
24///
25/// # #[tokio::main]
26/// # async fn main() {
27/// let stream = stream::iter(vec![1, 2, 3]);
28/// let clone_stream = stream.fork();
29///
30/// // Create multiple clones that can be used independently
31/// let mut clone1 = clone_stream.clone();
32/// let mut clone2 = clone_stream.clone();
33///
34/// // Each clone can be polled independently
35/// let item1 = clone1.next().await;
36/// let item2 = clone2.next().await;
37///
38/// println!("Clone1 got: {:?}, Clone2 got: {:?}", item1, item2);
39/// # }
40/// ```
41///
42/// # Performance
43///
44/// Items are cached internally until all clones have consumed them. The memory
45/// usage grows with the number of items that haven't been consumed by all
46/// clones yet.
47pub struct CloneStream<BaseStream>
48where
49 BaseStream: Stream<Item: Clone>,
50{
51 pub(crate) fork: Arc<RwLock<Fork<BaseStream>>>,
52 /// Unique identifier for this clone within the fork
53 pub id: usize,
54}
55
56impl<BaseStream> From<Fork<BaseStream>> for CloneStream<BaseStream>
57where
58 BaseStream: Stream<Item: Clone>,
59{
60 fn from(mut fork: Fork<BaseStream>) -> Self {
61 let id = fork.register().expect("Failed to register initial clone");
62
63 Self {
64 id,
65 fork: Arc::new(RwLock::new(fork)),
66 }
67 }
68}
69
70impl<BaseStream> Clone for CloneStream<BaseStream>
71where
72 BaseStream: Stream<Item: Clone>,
73{
74 /// Creates a new clone of this stream.
75 ///
76 /// # Panics
77 ///
78 /// Panics if the maximum number of clones has been exceeded for this
79 /// stream. The limit is set when creating the stream with
80 /// [`ForkStream::fork_with_limits`].
81 ///
82 /// [`ForkStream::fork_with_limits`]: crate::ForkStream::fork_with_limits
83 fn clone(&self) -> Self {
84 let mut fork = self.fork.write().expect("Fork lock poisoned during clone");
85 let clone_id = fork
86 .register()
87 .expect("Failed to register clone - clone limit exceeded");
88 drop(fork);
89
90 Self {
91 fork: self.fork.clone(),
92 id: clone_id,
93 }
94 }
95}
96
97impl<BaseStream> Stream for CloneStream<BaseStream>
98where
99 BaseStream: Stream<Item: Clone>,
100{
101 type Item = BaseStream::Item;
102
103 fn poll_next(self: Pin<&mut Self>, current_task: &mut Context) -> Poll<Option<Self::Item>> {
104 let waker = current_task.waker();
105 let mut fork = self
106 .fork
107 .write()
108 .expect("Fork lock poisoned during poll_next");
109 fork.poll_clone(self.id, waker)
110 }
111
112 fn size_hint(&self) -> (usize, Option<usize>) {
113 let fork = self
114 .fork
115 .read()
116 .expect("Fork lock poisoned during size_hint");
117 let (lower, upper) = fork.size_hint();
118 let n_cached = fork.remaining_queued_items(self.id);
119 (lower + n_cached, upper.map(|u| u + n_cached))
120 }
121}
122
123impl<BaseStream> FusedStream for CloneStream<BaseStream>
124where
125 BaseStream: FusedStream<Item: Clone>,
126{
127 /// Returns `true` if the stream is terminated.
128 ///
129 /// A clone stream is considered terminated when both:
130 /// 1. The underlying base stream is terminated
131 /// 2. This clone has no remaining queued items to consume
132 fn is_terminated(&self) -> bool {
133 let fork = self
134 .fork
135 .read()
136 .expect("Fork lock poisoned during is_terminated");
137 fork.is_terminated() && fork.remaining_queued_items(self.id) == 0
138 }
139}
140
141impl<BaseStream> Drop for CloneStream<BaseStream>
142where
143 BaseStream: Stream<Item: Clone>,
144{
145 fn drop(&mut self) {
146 if let Ok(mut fork) = self.fork.try_write() {
147 fork.unregister(self.id);
148 } else {
149 log::warn!(
150 "Failed to acquire lock during clone drop for clone {}",
151 self.id
152 );
153 }
154 }
155}
156
157impl<BaseStream> CloneStream<BaseStream>
158where
159 BaseStream: Stream<Item: Clone>,
160{
161 /// Returns the number of items currently queued for this clone.
162 ///
163 /// This represents items that have been produced by the base stream but not
164 /// yet consumed by this particular clone. Other clones may have
165 /// different queue lengths depending on their consumption patterns.
166 ///
167 /// # Panics
168 ///
169 /// Panics if the internal fork lock is poisoned. This should not happen
170 /// under normal circumstances.
171 ///
172 /// # Examples
173 ///
174 /// ```rust
175 /// use clone_stream::ForkStream;
176 /// use futures::stream;
177 ///
178 /// let stream = stream::iter(vec![1, 2, 3]);
179 /// let clone_stream = stream.fork();
180 /// assert_eq!(clone_stream.n_queued_items(), 0);
181 /// ```
182 #[must_use]
183 pub fn n_queued_items(&self) -> usize {
184 trace!("Getting the number of queued items for clone {}.", self.id);
185 self.fork
186 .read()
187 .expect("Fork lock poisoned during n_queued_items")
188 .remaining_queued_items(self.id)
189 }
190}