orx_concurrent_recursive_iter/
con_iter.rs

1use crate::{chunk_puller::DynChunkPuller, dyn_seq_queue::DynSeqQueue, queue::Queue};
2use core::sync::atomic::Ordering;
3use orx_concurrent_iter::ConcurrentIter;
4use orx_concurrent_queue::{ConcurrentQueue, DefaultConPinnedVec};
5use orx_pinned_vec::{ConcurrentPinnedVec, IntoConcurrentPinnedVec};
6use orx_split_vec::SplitVec;
7
8/// A recursive [`ConcurrentIter`] which:
9/// * naturally shrinks as we iterate,
10/// * but can also grow as it allows to add new items to the iterator, during iteration.
11///
12/// Growth of the iterator is expressed by the `extend: E` function with signature `E: Fn(&T, &Queue<T, P>)`.
13///
14/// [`Queue`] here is a wrapper around the the backing queue of elements which exposes only two methods:
15/// [`push`] and [`extend`]. Having access to growth methods of the queue, we can add elements to the iterator
16/// while we are processing.
17///
18/// Importantly note that extension happens before yielding the next element.
19///
20/// In other words, for each element `e` pulled from the iterator, we call `extend(&e, &queue)` before
21/// returning it to the caller.
22///
23/// *The recursive concurrent iterator internally uses a [`ConcurrentQueue`] which allows for both
24/// concurrent push / extend and pop / pull operations.*
25///
26/// [`push`]: Queue::push
27/// [`extend`]: Queue::extend
28///
29/// # Example
30///
31/// The following example demonstrates a use case for the recursive concurrent iterator.
32/// Notice that the iterator is instantiated with:
33/// * a single element which is the root node,
34/// * and the extend method which defines how to extend the iterator from each node.
35///
36/// Including the root, there exist 177 nodes in the tree. We observe that all these
37/// nodes are concurrently added to the iterator, popped and processed.
38///
39/// ```
40/// use orx_concurrent_recursive_iter::*;
41/// use orx_concurrent_iter::ConcurrentIter;
42/// use std::sync::atomic::{AtomicUsize, Ordering};
43/// use rand::{Rng, SeedableRng};
44/// use rand_chacha::ChaCha8Rng;
45///
46/// struct Node {
47///     value: u64,
48///     children: Vec<Node>,
49/// }
50///
51/// impl Node {
52///     fn new(rng: &mut impl Rng, value: u64) -> Self {
53///         let num_children = match value {
54///             0 => 0,
55///             n => rng.random_range(0..(n as usize)),
56///         };
57///         let children = (0..num_children)
58///             .map(|i| Self::new(rng, i as u64))
59///             .collect();
60///         Self { value, children }
61///     }
62/// }
63///
64/// fn process(node_value: u64) {
65///     // fake computation
66///     std::thread::sleep(std::time::Duration::from_millis(node_value));
67/// }
68///
69/// // this defines how the iterator must extend:
70/// // each node drawn from the iterator adds its children to the end of the iterator
71/// fn extend<'a, 'b>(node: &'a &'b Node, queue: &Queue<&'b Node>) {
72///     queue.extend(&node.children);
73/// }
74///
75/// // initiate iter with a single element, `root`
76/// // however, the iterator will `extend` on the fly as we keep drawing its elements
77/// let root = Node::new(&mut ChaCha8Rng::seed_from_u64(42), 70);
78/// let iter = ConcurrentRecursiveIter::new([&root], extend);
79///
80/// let num_threads = 8;
81/// let num_spawned = AtomicUsize::new(0);
82/// let num_processed_nodes = AtomicUsize::new(0);
83///
84/// std::thread::scope(|s| {
85///     let mut handles = vec![];
86///     for _ in 0..num_threads {
87///         handles.push(s.spawn(|| {
88///             // allow all threads to be spawned
89///             _ = num_spawned.fetch_add(1, Ordering::Relaxed);
90///             while num_spawned.load(Ordering::Relaxed) < num_threads {}
91///
92///             // `next` will first extend `iter` with children of `node,
93///             // and only then yield the `node`
94///             while let Some(node) = iter.next() {
95///                 process(node.value);
96///                 _ = num_processed_nodes.fetch_add(1, Ordering::Relaxed);
97///             }
98///         }));
99///     }
100/// });
101///
102/// assert_eq!(num_processed_nodes.into_inner(), 177);
103/// ```
104pub struct ConcurrentRecursiveIter<T, E, P = DefaultConPinnedVec<T>>
105where
106    T: Send,
107    P: ConcurrentPinnedVec<T>,
108    <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
109    E: Fn(&T, &Queue<T, P>) + Sync,
110{
111    queue: ConcurrentQueue<T, P>,
112    extend: E,
113    exact_len: Option<usize>,
114}
115
116impl<T, E, P> From<(ConcurrentQueue<T, P>, E)> for ConcurrentRecursiveIter<T, E, P>
117where
118    T: Send,
119    P: ConcurrentPinnedVec<T>,
120    <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
121    E: Fn(&T, &Queue<T, P>) + Sync,
122{
123    fn from((queue, extend): (ConcurrentQueue<T, P>, E)) -> Self {
124        Self {
125            queue,
126            extend,
127            exact_len: None,
128        }
129    }
130}
131
132impl<T, E, P> From<(ConcurrentQueue<T, P>, E, usize)> for ConcurrentRecursiveIter<T, E, P>
133where
134    T: Send,
135    P: ConcurrentPinnedVec<T>,
136    <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
137    E: Fn(&T, &Queue<T, P>) + Sync,
138{
139    fn from((queue, extend, exact_len): (ConcurrentQueue<T, P>, E, usize)) -> Self {
140        Self {
141            queue,
142            extend,
143            exact_len: Some(exact_len),
144        }
145    }
146}
147
148impl<T, E> ConcurrentRecursiveIter<T, E, DefaultConPinnedVec<T>>
149where
150    T: Send,
151    E: Fn(&T, &Queue<T, DefaultConPinnedVec<T>>) + Sync,
152{
153    /// Creates a new dynamic concurrent iterator:
154    ///
155    /// * The iterator will initially contain `initial_elements`.
156    /// * Before yielding each element, say `e`, to the caller, the elements returned
157    ///   by `extend(&e, &queue)` will called to create elements on the fly.
158    ///
159    /// This constructor uses a [`ConcurrentQueue`] with the default pinned concurrent
160    /// collection under the hood. In order to create the iterator using a different queue
161    /// use the `From`/`Into` traits, as demonstrated below.
162    ///
163    /// # UnknownSize vs ExactSize
164    ///
165    /// Size refers to the total number of elements that will be returned by the iterator,
166    /// which is the total of initial elements and all elements created by the recursive
167    /// extend calls.
168    ///
169    /// Note that the iterator created with this method will have an unknown size.
170    /// In order to create a recursive iterator with a known exact length, you may use
171    /// [`new_exact`] function.
172    ///
173    /// Providing an `exact_len` impacts the following:
174    /// * When the exact length is provided, `try_get_len` method can provide the number of remaining
175    ///   elements. When this is not necessary, the exact length argument can simply be skipped.
176    /// * On the other hand, a known length is very useful for performance optimization
177    ///   when the recursive iterator is used as the input of a parallel iterator of the
178    ///   [orx_parallel](https://crates.io/crates/orx-parallel) crate.
179    ///
180    /// [`new_exact`]: ConcurrentRecursiveIter::new_exact
181    ///
182    /// # Examples
183    ///
184    /// The following is a simple example to demonstrate how the dynamic iterator works.
185    ///
186    /// ```
187    /// use orx_concurrent_recursive_iter::{ConcurrentRecursiveIter, Queue};
188    /// use orx_concurrent_iter::ConcurrentIter;
189    ///
190    /// let extend = |x: &usize, queue: &Queue<usize>| {
191    ///     if *x < 5 {
192    ///         queue.push(x + 1);
193    ///     }
194    /// };
195    ///
196    /// let initial_elements = [1];
197    ///
198    /// let iter = ConcurrentRecursiveIter::new(initial_elements, extend);
199    /// let all: Vec<_> = iter.item_puller().collect();
200    ///
201    /// assert_eq!(all, [1, 2, 3, 4, 5]);
202    /// ```
203    ///
204    /// # Examples - From
205    ///
206    /// In the above example, the underlying pinned vector of the dynamic iterator created
207    /// with `new` is a [`SplitVec`] with a [`Doubling`] growth strategy.
208    ///
209    /// Alternatively, we can use a `SplitVec` with a [`Linear`] growth strategy, or a
210    /// pre-allocated [`FixedVec`] as the underlying storage. In order to do so, we can
211    /// use the `From` trait.
212    ///
213    /// ```
214    /// use orx_concurrent_recursive_iter::*;
215    /// use orx_concurrent_queue::ConcurrentQueue;
216    /// use orx_pinned_vec::IntoConcurrentPinnedVec;
217    /// use orx_split_vec::{SplitVec, Linear};
218    /// use orx_fixed_vec::FixedVec;
219    ///
220    /// let initial_elements = [1];
221    /// fn extend<P>(x: &usize, queue: &Queue<usize, P::ConPinnedVec>)
222    /// where
223    ///     P: IntoConcurrentPinnedVec<usize>,
224    /// {
225    ///     if *x < 5 {
226    ///         queue.push(x + 1);
227    ///     }
228    /// }
229    ///
230    /// // SplitVec with Linear growth
231    /// let queue = ConcurrentQueue::with_linear_growth(10, 4);
232    /// queue.extend(initial_elements);
233    /// let iter = ConcurrentRecursiveIter::from((queue, extend::<SplitVec<_, Linear>>));
234    ///
235    /// let all: Vec<_> = iter.item_puller().collect();
236    /// assert_eq!(all, [1, 2, 3, 4, 5]);
237    ///
238    /// // FixedVec with fixed capacity
239    /// let queue = ConcurrentQueue::with_fixed_capacity(5);
240    /// queue.extend(initial_elements);
241    /// let iter = ConcurrentRecursiveIter::from((queue, extend::<FixedVec<_>>));
242    ///
243    /// let all: Vec<_> = iter.item_puller().collect();
244    /// assert_eq!(all, [1, 2, 3, 4, 5]);
245    /// ```
246    ///
247    /// [`SplitVec`]: orx_split_vec::SplitVec
248    /// [`FixedVec`]: orx_fixed_vec::FixedVec
249    /// [`Doubling`]: orx_split_vec::Doubling
250    /// [`Linear`]: orx_split_vec::Linear
251    pub fn new(initial_elements: impl IntoIterator<Item = T>, extend: E) -> Self {
252        let mut vec = SplitVec::with_doubling_growth_and_max_concurrent_capacity();
253        vec.extend(initial_elements);
254        let queue = vec.into();
255        (queue, extend).into()
256    }
257
258    /// Creates a new dynamic concurrent iterator:
259    ///
260    /// * The iterator will initially contain `initial_elements`.
261    /// * Before yielding each element, say `e`, to the caller, the elements returned
262    ///   by `extend(&e, &queue)` will called to create elements on the fly.
263    ///
264    /// This constructor uses a [`ConcurrentQueue`] with the default pinned concurrent
265    /// collection under the hood. In order to create the iterator using a different queue
266    /// use the `From`/`Into` traits, as demonstrated below.
267    ///
268    /// # UnknownSize vs ExactSize
269    ///
270    /// Size refers to the total number of elements that will be returned by the iterator,
271    /// which is the total of initial elements and all elements created by the recursive
272    /// extend calls.
273    ///
274    /// Note that the iterator created with this method will have an unknown size.
275    /// In order to create a recursive iterator with a known exact length, you may use
276    /// [`new_exact`] function.
277    ///
278    /// Providing an `exact_len` impacts the following:
279    /// * When the exact length is provided, `try_get_len` method can provide the number of remaining
280    ///   elements. When this is not necessary, the exact length argument can simply be skipped.
281    /// * On the other hand, a known length is very useful for performance optimization
282    ///   when the recursive iterator is used as the input of a parallel iterator of the
283    ///   [orx_parallel](https://crates.io/crates/orx-parallel) crate.
284    ///
285    /// [`new_exact`]: ConcurrentRecursiveIter::new_exact
286    ///
287    /// # Examples
288    ///
289    /// The following is a simple example to demonstrate how the dynamic iterator works.
290    ///
291    /// ```
292    /// use orx_concurrent_recursive_iter::{ConcurrentRecursiveIter, Queue};
293    /// use orx_concurrent_iter::ConcurrentIter;
294    ///
295    /// let extend = |x: &usize, queue: &Queue<usize>| {
296    ///     if *x < 5 {
297    ///         queue.push(x + 1);
298    ///     }
299    /// };
300    ///
301    /// let initial_elements = [1];
302    ///
303    /// let iter = ConcurrentRecursiveIter::new(initial_elements, extend);
304    /// let all: Vec<_> = iter.item_puller().collect();
305    ///
306    /// assert_eq!(all, [1, 2, 3, 4, 5]);
307    /// ```
308    ///
309    /// # Examples - From
310    ///
311    /// In the above example, the underlying pinned vector of the dynamic iterator created
312    /// with `new` is a [`SplitVec`] with a [`Doubling`] growth strategy.
313    ///
314    /// Alternatively, we can use a `SplitVec` with a [`Linear`] growth strategy, or a
315    /// pre-allocated [`FixedVec`] as the underlying storage. In order to do so, we can
316    /// use the `From` trait.
317    ///
318    /// ```
319    /// use orx_concurrent_recursive_iter::*;
320    /// use orx_concurrent_queue::ConcurrentQueue;
321    /// use orx_pinned_vec::IntoConcurrentPinnedVec;
322    /// use orx_split_vec::{SplitVec, Linear};
323    /// use orx_fixed_vec::FixedVec;
324    ///
325    /// let initial_elements = [1];
326    /// fn extend<P>(x: &usize, queue: &Queue<usize, P::ConPinnedVec>)
327    /// where
328    ///     P: IntoConcurrentPinnedVec<usize>,
329    /// {
330    ///     if *x < 5 {
331    ///         queue.push(x + 1);
332    ///     }
333    /// }
334    ///
335    /// // SplitVec with Linear growth
336    /// let queue = ConcurrentQueue::with_linear_growth(10, 4);
337    /// queue.extend(initial_elements);
338    /// let iter = ConcurrentRecursiveIter::from((queue, extend::<SplitVec<_, Linear>>));
339    ///
340    /// let all: Vec<_> = iter.item_puller().collect();
341    /// assert_eq!(all, [1, 2, 3, 4, 5]);
342    ///
343    /// // FixedVec with fixed capacity
344    /// let queue = ConcurrentQueue::with_fixed_capacity(5);
345    /// queue.extend(initial_elements);
346    /// let iter = ConcurrentRecursiveIter::from((queue, extend::<FixedVec<_>>));
347    ///
348    /// let all: Vec<_> = iter.item_puller().collect();
349    /// assert_eq!(all, [1, 2, 3, 4, 5]);
350    /// ```
351    ///
352    /// [`SplitVec`]: orx_split_vec::SplitVec
353    /// [`FixedVec`]: orx_fixed_vec::FixedVec
354    /// [`Doubling`]: orx_split_vec::Doubling
355    /// [`Linear`]: orx_split_vec::Linear
356    pub fn new_exact(
357        initial_elements: impl IntoIterator<Item = T>,
358        extend: E,
359        exact_len: usize,
360    ) -> Self {
361        let mut vec = SplitVec::with_doubling_growth_and_max_concurrent_capacity();
362        vec.extend(initial_elements);
363        let queue = vec.into();
364        (queue, extend, exact_len).into()
365    }
366}
367
368impl<T, E, P> ConcurrentIter for ConcurrentRecursiveIter<T, E, P>
369where
370    T: Send,
371    P: ConcurrentPinnedVec<T>,
372    <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
373    E: Fn(&T, &Queue<T, P>) + Sync,
374{
375    type Item = T;
376
377    type SequentialIter = DynSeqQueue<T, P, E>;
378
379    type ChunkPuller<'i>
380        = DynChunkPuller<'i, T, E, P>
381    where
382        Self: 'i;
383
384    fn into_seq_iter(self) -> Self::SequentialIter {
385        DynSeqQueue::new(self.queue, self.extend)
386    }
387
388    fn skip_to_end(&self) {
389        let len = self.queue.num_write_reserved(Ordering::Acquire);
390        let _remaining_to_drop = self.queue.pull(len);
391    }
392
393    fn next(&self) -> Option<Self::Item> {
394        let n = self.queue.pop()?;
395        (self.extend)(&n, &Queue::from(&self.queue));
396        Some(n)
397    }
398
399    fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
400        let (idx, n) = self.queue.pop_with_idx()?;
401        (self.extend)(&n, &Queue::from(&self.queue));
402        Some((idx, n))
403    }
404
405    fn size_hint(&self) -> (usize, Option<usize>) {
406        match self.exact_len {
407            Some(exact_len) => {
408                let popped = self.queue.num_popped(Ordering::Relaxed);
409                let remaining = exact_len - popped;
410                (remaining, Some(remaining))
411            }
412            None => match self.queue.len() {
413                0 => (0, Some(0)),
414                n => (n, None),
415            },
416        }
417    }
418
419    fn is_completed_when_none_returned(&self) -> bool {
420        let popped = self.queue.num_popped(Ordering::Relaxed);
421        let write_reserved = self.queue.num_write_reserved(Ordering::Relaxed);
422        popped >= write_reserved
423    }
424
425    fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
426        DynChunkPuller::new(&self.extend, &self.queue, chunk_size)
427    }
428}