orx_concurrent_recursive_iter/con_iter.rs
1use crate::{chunk_puller::DynChunkPuller, dyn_seq_queue::DynSeqQueue, queue::Queue};
2use core::sync::atomic::Ordering;
3use orx_concurrent_iter::ConcurrentIter;
4use orx_concurrent_queue::{ConcurrentQueue, DefaultConPinnedVec};
5use orx_pinned_vec::{ConcurrentPinnedVec, IntoConcurrentPinnedVec};
6use orx_split_vec::SplitVec;
7
8/// A recursive [`ConcurrentIter`] which:
9/// * naturally shrinks as we iterate,
10/// * but can also grow as it allows to add new items to the iterator, during iteration.
11///
12/// Growth of the iterator is expressed by the `extend: E` function with signature `E: Fn(&T, &Queue<T, P>)`.
13///
14/// [`Queue`] here is a wrapper around the the backing queue of elements which exposes only two methods:
15/// [`push`] and [`extend`]. Having access to growth methods of the queue, we can add elements to the iterator
16/// while we are processing.
17///
18/// Importantly note that extension happens before yielding the next element.
19///
20/// In other words, for each element `e` pulled from the iterator, we call `extend(&e, &queue)` before
21/// returning it to the caller.
22///
23/// *The recursive concurrent iterator internally uses a [`ConcurrentQueue`] which allows for both
24/// concurrent push / extend and pop / pull operations.*
25///
26/// [`push`]: Queue::push
27/// [`extend`]: Queue::extend
28///
29/// # Example
30///
31/// The following example demonstrates a use case for the recursive concurrent iterator.
32/// Notice that the iterator is instantiated with:
33/// * a single element which is the root node,
34/// * and the extend method which defines how to extend the iterator from each node.
35///
36/// Including the root, there exist 177 nodes in the tree. We observe that all these
37/// nodes are concurrently added to the iterator, popped and processed.
38///
39/// ```
40/// use orx_concurrent_recursive_iter::*;
41/// use orx_concurrent_iter::ConcurrentIter;
42/// use std::sync::atomic::{AtomicUsize, Ordering};
43/// use rand::{Rng, SeedableRng};
44/// use rand_chacha::ChaCha8Rng;
45///
46/// struct Node {
47/// value: u64,
48/// children: Vec<Node>,
49/// }
50///
51/// impl Node {
52/// fn new(rng: &mut impl Rng, value: u64) -> Self {
53/// let num_children = match value {
54/// 0 => 0,
55/// n => rng.random_range(0..(n as usize)),
56/// };
57/// let children = (0..num_children)
58/// .map(|i| Self::new(rng, i as u64))
59/// .collect();
60/// Self { value, children }
61/// }
62/// }
63///
64/// fn process(node_value: u64) {
65/// // fake computation
66/// std::thread::sleep(std::time::Duration::from_millis(node_value));
67/// }
68///
69/// // this defines how the iterator must extend:
70/// // each node drawn from the iterator adds its children to the end of the iterator
71/// fn extend<'a, 'b>(node: &'a &'b Node, queue: &Queue<&'b Node>) {
72/// queue.extend(&node.children);
73/// }
74///
75/// // initiate iter with a single element, `root`
76/// // however, the iterator will `extend` on the fly as we keep drawing its elements
77/// let root = Node::new(&mut ChaCha8Rng::seed_from_u64(42), 70);
78/// let iter = ConcurrentRecursiveIter::new([&root], extend);
79///
80/// let num_threads = 8;
81/// let num_spawned = AtomicUsize::new(0);
82/// let num_processed_nodes = AtomicUsize::new(0);
83///
84/// std::thread::scope(|s| {
85/// let mut handles = vec![];
86/// for _ in 0..num_threads {
87/// handles.push(s.spawn(|| {
88/// // allow all threads to be spawned
89/// _ = num_spawned.fetch_add(1, Ordering::Relaxed);
90/// while num_spawned.load(Ordering::Relaxed) < num_threads {}
91///
92/// // `next` will first extend `iter` with children of `node,
93/// // and only then yield the `node`
94/// while let Some(node) = iter.next() {
95/// process(node.value);
96/// _ = num_processed_nodes.fetch_add(1, Ordering::Relaxed);
97/// }
98/// }));
99/// }
100/// });
101///
102/// assert_eq!(num_processed_nodes.into_inner(), 177);
103/// ```
104pub struct ConcurrentRecursiveIter<T, E, P = DefaultConPinnedVec<T>>
105where
106 T: Send,
107 P: ConcurrentPinnedVec<T>,
108 <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
109 E: Fn(&T, &Queue<T, P>) + Sync,
110{
111 queue: ConcurrentQueue<T, P>,
112 extend: E,
113 exact_len: Option<usize>,
114}
115
116impl<T, E, P> From<(ConcurrentQueue<T, P>, E)> for ConcurrentRecursiveIter<T, E, P>
117where
118 T: Send,
119 P: ConcurrentPinnedVec<T>,
120 <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
121 E: Fn(&T, &Queue<T, P>) + Sync,
122{
123 fn from((queue, extend): (ConcurrentQueue<T, P>, E)) -> Self {
124 Self {
125 queue,
126 extend,
127 exact_len: None,
128 }
129 }
130}
131
132impl<T, E, P> From<(ConcurrentQueue<T, P>, E, usize)> for ConcurrentRecursiveIter<T, E, P>
133where
134 T: Send,
135 P: ConcurrentPinnedVec<T>,
136 <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
137 E: Fn(&T, &Queue<T, P>) + Sync,
138{
139 fn from((queue, extend, exact_len): (ConcurrentQueue<T, P>, E, usize)) -> Self {
140 Self {
141 queue,
142 extend,
143 exact_len: Some(exact_len),
144 }
145 }
146}
147
148impl<T, E> ConcurrentRecursiveIter<T, E, DefaultConPinnedVec<T>>
149where
150 T: Send,
151 E: Fn(&T, &Queue<T, DefaultConPinnedVec<T>>) + Sync,
152{
153 /// Creates a new dynamic concurrent iterator:
154 ///
155 /// * The iterator will initially contain `initial_elements`.
156 /// * Before yielding each element, say `e`, to the caller, the elements returned
157 /// by `extend(&e, &queue)` will called to create elements on the fly.
158 ///
159 /// This constructor uses a [`ConcurrentQueue`] with the default pinned concurrent
160 /// collection under the hood. In order to create the iterator using a different queue
161 /// use the `From`/`Into` traits, as demonstrated below.
162 ///
163 /// # UnknownSize vs ExactSize
164 ///
165 /// Size refers to the total number of elements that will be returned by the iterator,
166 /// which is the total of initial elements and all elements created by the recursive
167 /// extend calls.
168 ///
169 /// Note that the iterator created with this method will have an unknown size.
170 /// In order to create a recursive iterator with a known exact length, you may use
171 /// [`new_exact`] function.
172 ///
173 /// Providing an `exact_len` impacts the following:
174 /// * When the exact length is provided, `try_get_len` method can provide the number of remaining
175 /// elements. When this is not necessary, the exact length argument can simply be skipped.
176 /// * On the other hand, a known length is very useful for performance optimization
177 /// when the recursive iterator is used as the input of a parallel iterator of the
178 /// [orx_parallel](https://crates.io/crates/orx-parallel) crate.
179 ///
180 /// [`new_exact`]: ConcurrentRecursiveIter::new_exact
181 ///
182 /// # Examples
183 ///
184 /// The following is a simple example to demonstrate how the dynamic iterator works.
185 ///
186 /// ```
187 /// use orx_concurrent_recursive_iter::{ConcurrentRecursiveIter, Queue};
188 /// use orx_concurrent_iter::ConcurrentIter;
189 ///
190 /// let extend = |x: &usize, queue: &Queue<usize>| {
191 /// if *x < 5 {
192 /// queue.push(x + 1);
193 /// }
194 /// };
195 ///
196 /// let initial_elements = [1];
197 ///
198 /// let iter = ConcurrentRecursiveIter::new(initial_elements, extend);
199 /// let all: Vec<_> = iter.item_puller().collect();
200 ///
201 /// assert_eq!(all, [1, 2, 3, 4, 5]);
202 /// ```
203 ///
204 /// # Examples - From
205 ///
206 /// In the above example, the underlying pinned vector of the dynamic iterator created
207 /// with `new` is a [`SplitVec`] with a [`Doubling`] growth strategy.
208 ///
209 /// Alternatively, we can use a `SplitVec` with a [`Linear`] growth strategy, or a
210 /// pre-allocated [`FixedVec`] as the underlying storage. In order to do so, we can
211 /// use the `From` trait.
212 ///
213 /// ```
214 /// use orx_concurrent_recursive_iter::*;
215 /// use orx_concurrent_queue::ConcurrentQueue;
216 /// use orx_pinned_vec::IntoConcurrentPinnedVec;
217 /// use orx_split_vec::{SplitVec, Linear};
218 /// use orx_fixed_vec::FixedVec;
219 ///
220 /// let initial_elements = [1];
221 /// fn extend<P>(x: &usize, queue: &Queue<usize, P::ConPinnedVec>)
222 /// where
223 /// P: IntoConcurrentPinnedVec<usize>,
224 /// {
225 /// if *x < 5 {
226 /// queue.push(x + 1);
227 /// }
228 /// }
229 ///
230 /// // SplitVec with Linear growth
231 /// let queue = ConcurrentQueue::with_linear_growth(10, 4);
232 /// queue.extend(initial_elements);
233 /// let iter = ConcurrentRecursiveIter::from((queue, extend::<SplitVec<_, Linear>>));
234 ///
235 /// let all: Vec<_> = iter.item_puller().collect();
236 /// assert_eq!(all, [1, 2, 3, 4, 5]);
237 ///
238 /// // FixedVec with fixed capacity
239 /// let queue = ConcurrentQueue::with_fixed_capacity(5);
240 /// queue.extend(initial_elements);
241 /// let iter = ConcurrentRecursiveIter::from((queue, extend::<FixedVec<_>>));
242 ///
243 /// let all: Vec<_> = iter.item_puller().collect();
244 /// assert_eq!(all, [1, 2, 3, 4, 5]);
245 /// ```
246 ///
247 /// [`SplitVec`]: orx_split_vec::SplitVec
248 /// [`FixedVec`]: orx_fixed_vec::FixedVec
249 /// [`Doubling`]: orx_split_vec::Doubling
250 /// [`Linear`]: orx_split_vec::Linear
251 pub fn new(initial_elements: impl IntoIterator<Item = T>, extend: E) -> Self {
252 let mut vec = SplitVec::with_doubling_growth_and_max_concurrent_capacity();
253 vec.extend(initial_elements);
254 let queue = vec.into();
255 (queue, extend).into()
256 }
257
258 /// Creates a new dynamic concurrent iterator:
259 ///
260 /// * The iterator will initially contain `initial_elements`.
261 /// * Before yielding each element, say `e`, to the caller, the elements returned
262 /// by `extend(&e, &queue)` will called to create elements on the fly.
263 ///
264 /// This constructor uses a [`ConcurrentQueue`] with the default pinned concurrent
265 /// collection under the hood. In order to create the iterator using a different queue
266 /// use the `From`/`Into` traits, as demonstrated below.
267 ///
268 /// # UnknownSize vs ExactSize
269 ///
270 /// Size refers to the total number of elements that will be returned by the iterator,
271 /// which is the total of initial elements and all elements created by the recursive
272 /// extend calls.
273 ///
274 /// Note that the iterator created with this method will have an unknown size.
275 /// In order to create a recursive iterator with a known exact length, you may use
276 /// [`new_exact`] function.
277 ///
278 /// Providing an `exact_len` impacts the following:
279 /// * When the exact length is provided, `try_get_len` method can provide the number of remaining
280 /// elements. When this is not necessary, the exact length argument can simply be skipped.
281 /// * On the other hand, a known length is very useful for performance optimization
282 /// when the recursive iterator is used as the input of a parallel iterator of the
283 /// [orx_parallel](https://crates.io/crates/orx-parallel) crate.
284 ///
285 /// [`new_exact`]: ConcurrentRecursiveIter::new_exact
286 ///
287 /// # Examples
288 ///
289 /// The following is a simple example to demonstrate how the dynamic iterator works.
290 ///
291 /// ```
292 /// use orx_concurrent_recursive_iter::{ConcurrentRecursiveIter, Queue};
293 /// use orx_concurrent_iter::ConcurrentIter;
294 ///
295 /// let extend = |x: &usize, queue: &Queue<usize>| {
296 /// if *x < 5 {
297 /// queue.push(x + 1);
298 /// }
299 /// };
300 ///
301 /// let initial_elements = [1];
302 ///
303 /// let iter = ConcurrentRecursiveIter::new(initial_elements, extend);
304 /// let all: Vec<_> = iter.item_puller().collect();
305 ///
306 /// assert_eq!(all, [1, 2, 3, 4, 5]);
307 /// ```
308 ///
309 /// # Examples - From
310 ///
311 /// In the above example, the underlying pinned vector of the dynamic iterator created
312 /// with `new` is a [`SplitVec`] with a [`Doubling`] growth strategy.
313 ///
314 /// Alternatively, we can use a `SplitVec` with a [`Linear`] growth strategy, or a
315 /// pre-allocated [`FixedVec`] as the underlying storage. In order to do so, we can
316 /// use the `From` trait.
317 ///
318 /// ```
319 /// use orx_concurrent_recursive_iter::*;
320 /// use orx_concurrent_queue::ConcurrentQueue;
321 /// use orx_pinned_vec::IntoConcurrentPinnedVec;
322 /// use orx_split_vec::{SplitVec, Linear};
323 /// use orx_fixed_vec::FixedVec;
324 ///
325 /// let initial_elements = [1];
326 /// fn extend<P>(x: &usize, queue: &Queue<usize, P::ConPinnedVec>)
327 /// where
328 /// P: IntoConcurrentPinnedVec<usize>,
329 /// {
330 /// if *x < 5 {
331 /// queue.push(x + 1);
332 /// }
333 /// }
334 ///
335 /// // SplitVec with Linear growth
336 /// let queue = ConcurrentQueue::with_linear_growth(10, 4);
337 /// queue.extend(initial_elements);
338 /// let iter = ConcurrentRecursiveIter::from((queue, extend::<SplitVec<_, Linear>>));
339 ///
340 /// let all: Vec<_> = iter.item_puller().collect();
341 /// assert_eq!(all, [1, 2, 3, 4, 5]);
342 ///
343 /// // FixedVec with fixed capacity
344 /// let queue = ConcurrentQueue::with_fixed_capacity(5);
345 /// queue.extend(initial_elements);
346 /// let iter = ConcurrentRecursiveIter::from((queue, extend::<FixedVec<_>>));
347 ///
348 /// let all: Vec<_> = iter.item_puller().collect();
349 /// assert_eq!(all, [1, 2, 3, 4, 5]);
350 /// ```
351 ///
352 /// [`SplitVec`]: orx_split_vec::SplitVec
353 /// [`FixedVec`]: orx_fixed_vec::FixedVec
354 /// [`Doubling`]: orx_split_vec::Doubling
355 /// [`Linear`]: orx_split_vec::Linear
356 pub fn new_exact(
357 initial_elements: impl IntoIterator<Item = T>,
358 extend: E,
359 exact_len: usize,
360 ) -> Self {
361 let mut vec = SplitVec::with_doubling_growth_and_max_concurrent_capacity();
362 vec.extend(initial_elements);
363 let queue = vec.into();
364 (queue, extend, exact_len).into()
365 }
366}
367
368impl<T, E, P> ConcurrentIter for ConcurrentRecursiveIter<T, E, P>
369where
370 T: Send,
371 P: ConcurrentPinnedVec<T>,
372 <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
373 E: Fn(&T, &Queue<T, P>) + Sync,
374{
375 type Item = T;
376
377 type SequentialIter = DynSeqQueue<T, P, E>;
378
379 type ChunkPuller<'i>
380 = DynChunkPuller<'i, T, E, P>
381 where
382 Self: 'i;
383
384 fn into_seq_iter(self) -> Self::SequentialIter {
385 DynSeqQueue::new(self.queue, self.extend)
386 }
387
388 fn skip_to_end(&self) {
389 let len = self.queue.num_write_reserved(Ordering::Acquire);
390 let _remaining_to_drop = self.queue.pull(len);
391 }
392
393 fn next(&self) -> Option<Self::Item> {
394 let n = self.queue.pop()?;
395 (self.extend)(&n, &Queue::from(&self.queue));
396 Some(n)
397 }
398
399 fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
400 let (idx, n) = self.queue.pop_with_idx()?;
401 (self.extend)(&n, &Queue::from(&self.queue));
402 Some((idx, n))
403 }
404
405 fn size_hint(&self) -> (usize, Option<usize>) {
406 match self.exact_len {
407 Some(exact_len) => {
408 let popped = self.queue.num_popped(Ordering::Relaxed);
409 let remaining = exact_len - popped;
410 (remaining, Some(remaining))
411 }
412 None => match self.queue.len() {
413 0 => (0, Some(0)),
414 n => (n, None),
415 },
416 }
417 }
418
419 fn is_completed_when_none_returned(&self) -> bool {
420 let popped = self.queue.num_popped(Ordering::Relaxed);
421 let write_reserved = self.queue.num_write_reserved(Ordering::Relaxed);
422 popped >= write_reserved
423 }
424
425 fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
426 DynChunkPuller::new(&self.extend, &self.queue, chunk_size)
427 }
428}