clone_stream/clone.rs
1use std::{
2 pin::Pin,
3 sync::{Arc, RwLock},
4 task::{Context, Poll},
5};
6
7use futures::{Stream, stream::FusedStream};
8use log::trace;
9
10use crate::fork::Fork;
11
12/// A stream that implements `Clone` and returns cloned items from a base
13/// stream.
14///
15/// This is the main type provided by this crate. It wraps any [`Stream`] whose
16/// items implement [`Clone`], allowing the stream itself to be cloned. Each
17/// clone operates independently but shares the same underlying stream data.
18///
19/// # Examples
20///
21/// ```rust
22/// use clone_stream::ForkStream;
23/// use futures::{StreamExt, stream};
24///
25/// # #[tokio::main]
26/// # async fn main() {
27/// let stream = stream::iter(vec![1, 2, 3]);
28/// let clone_stream = stream.fork();
29///
30/// // Create multiple clones that can be used independently
31/// let mut clone1 = clone_stream.clone();
32/// let mut clone2 = clone_stream.clone();
33///
34/// // Each clone can be polled independently
35/// let item1 = clone1.next().await;
36/// let item2 = clone2.next().await;
37///
38/// println!("Clone1 got: {:?}, Clone2 got: {:?}", item1, item2);
39/// # }
40/// ```
41///
42/// # Performance
43///
44/// Items are cached internally until all clones have consumed them. The memory
45/// usage grows with the number of items that haven't been consumed by all
46/// clones yet.
47pub struct CloneStream<BaseStream>
48where
49 BaseStream: Stream<Item: Clone>,
50{
51 pub(crate) fork: Arc<RwLock<Fork<BaseStream>>>,
52 /// Unique identifier for this clone within the fork
53 pub id: usize,
54}
55
56impl<BaseStream> From<Fork<BaseStream>> for CloneStream<BaseStream>
57where
58 BaseStream: Stream<Item: Clone>,
59{
60 fn from(mut fork: Fork<BaseStream>) -> Self {
61 let id = fork
62 .clone_registry
63 .register()
64 .expect("Failed to register initial clone");
65
66 Self {
67 id,
68 fork: Arc::new(RwLock::new(fork)),
69 }
70 }
71}
72
73impl<BaseStream> Clone for CloneStream<BaseStream>
74where
75 BaseStream: Stream<Item: Clone>,
76{
77 /// Creates a new clone of this stream.
78 ///
79 /// # Panics
80 ///
81 /// Panics if the maximum number of clones has been exceeded for this
82 /// stream. The limit is set when creating the stream with
83 /// [`ForkStream::fork_with_limits`].
84 ///
85 /// [`ForkStream::fork_with_limits`]: crate::ForkStream::fork_with_limits
86 fn clone(&self) -> Self {
87 let mut fork = self.fork.write().expect("Fork lock poisoned during clone");
88 let clone_id = fork
89 .clone_registry
90 .register()
91 .expect("Failed to register clone - clone limit exceeded");
92 drop(fork);
93
94 Self {
95 fork: self.fork.clone(),
96 id: clone_id,
97 }
98 }
99}
100
101impl<BaseStream> Stream for CloneStream<BaseStream>
102where
103 BaseStream: Stream<Item: Clone>,
104{
105 type Item = BaseStream::Item;
106
107 fn poll_next(self: Pin<&mut Self>, current_task: &mut Context) -> Poll<Option<Self::Item>> {
108 trace!("Polling next item for clone {}.", self.id);
109 let waker = current_task.waker();
110 let mut fork = self
111 .fork
112 .write()
113 .expect("Fork lock poisoned during poll_next");
114 fork.poll_clone(self.id, waker)
115 }
116
117 fn size_hint(&self) -> (usize, Option<usize>) {
118 let fork = self
119 .fork
120 .read()
121 .expect("Fork lock poisoned during size_hint");
122 let (lower, upper) = fork.size_hint();
123 let n_cached = fork.remaining_queued_items(self.id);
124 (lower + n_cached, upper.map(|u| u + n_cached))
125 }
126}
127
128impl<BaseStream> FusedStream for CloneStream<BaseStream>
129where
130 BaseStream: FusedStream<Item: Clone>,
131{
132 /// Returns `true` if the stream is terminated.
133 ///
134 /// A clone stream is considered terminated when both:
135 /// 1. The underlying base stream is terminated
136 /// 2. This clone has no remaining queued items to consume
137 fn is_terminated(&self) -> bool {
138 let fork = self
139 .fork
140 .read()
141 .expect("Fork lock poisoned during is_terminated");
142 fork.is_terminated() && fork.remaining_queued_items(self.id) == 0
143 }
144}
145
146impl<BaseStream> Drop for CloneStream<BaseStream>
147where
148 BaseStream: Stream<Item: Clone>,
149{
150 fn drop(&mut self) {
151 if let Ok(mut fork) = self.fork.try_write() {
152 fork.unregister(self.id);
153 } else {
154 log::warn!(
155 "Failed to acquire lock during clone drop for clone {}",
156 self.id
157 );
158 }
159 }
160}
161
162impl<BaseStream> CloneStream<BaseStream>
163where
164 BaseStream: Stream<Item: Clone>,
165{
166 /// Returns the number of items currently queued for this clone.
167 ///
168 /// This represents items that have been produced by the base stream but not
169 /// yet consumed by this particular clone. Other clones may have
170 /// different queue lengths depending on their consumption patterns.
171 ///
172 /// # Panics
173 ///
174 /// Panics if the internal fork lock is poisoned. This should not happen
175 /// under normal circumstances.
176 ///
177 /// # Examples
178 ///
179 /// ```rust
180 /// use clone_stream::ForkStream;
181 /// use futures::stream;
182 ///
183 /// let stream = stream::iter(vec![1, 2, 3]);
184 /// let clone_stream = stream.fork();
185 /// assert_eq!(clone_stream.n_queued_items(), 0);
186 /// ```
187 #[must_use]
188 pub fn n_queued_items(&self) -> usize {
189 trace!("Getting the number of queued items for clone {}.", self.id);
190 self.fork
191 .read()
192 .expect("Fork lock poisoned during n_queued_items")
193 .remaining_queued_items(self.id)
194 }
195}