orx_concurrent_recursive_iter/
con_iter.rs

1use crate::{
2    ExactSize, Size, UnknownSize, chunk_puller::DynChunkPuller, dyn_seq_queue::DynSeqQueue,
3};
4use core::{marker::PhantomData, sync::atomic::Ordering};
5use orx_concurrent_iter::{ConcurrentIter, ExactSizeConcurrentIter};
6use orx_concurrent_queue::{ConcurrentQueue, DefaultConPinnedVec};
7use orx_pinned_vec::{ConcurrentPinnedVec, IntoConcurrentPinnedVec};
8use orx_split_vec::SplitVec;
9
10// type aliases for exact an unknown
11
12/// A [`ConcurrentRecursiveIterCore`] with [`UnknownSize`].
13pub type ConcurrentRecursiveIter<T, E, I, P = DefaultConPinnedVec<T>> =
14    ConcurrentRecursiveIterCore<UnknownSize, T, E, I, P>;
15
16/// A [`ConcurrentRecursiveIterCore`] with [`ExactSize`].
17pub type ConcurrentRecursiveIterExact<T, E, I, P = DefaultConPinnedVec<T>> =
18    ConcurrentRecursiveIterCore<ExactSize, T, E, I, P>;
19
20// core
21
22/// A recursive [`ConcurrentIter`] which:
23/// * naturally shrinks as we iterate,
24/// * but can also grow as it allows to add new items to the iterator, during iteration.
25///
26/// Growth of the iterator is expressed by the `extend: E` function with the signature `Fn(&T) -> I`,
27/// where `I: IntoIterator<Item = T>` with a known length.
28///
29/// In other words, for each element `e` pulled from the iterator, we call `extend(&e)` before
30/// returning it to the caller. All elements included in the iterator that `extend` returned
31/// are added to the end of the concurrent iterator, to be pulled later on.
32///
33/// *The recursive concurrent iterator internally uses a [`ConcurrentQueue`] which allows for both
34/// concurrent push / extend and pop / pull operations.*
35///
36/// # Example
37///
38/// The following example demonstrates a use case for the recursive concurrent iterator.
39/// Notice that the iterator is instantiated with:
40/// * a single element which is the root node,
41/// * and the extend method which defines how to extend the iterator from each node.
42///
43/// Including the root, there exist 177 nodes in the tree. We observe that all these
44/// nodes are concurrently added to the iterator, popped and processed.
45///
46/// ```
47/// use orx_concurrent_recursive_iter::ConcurrentRecursiveIter;
48/// use orx_concurrent_iter::ConcurrentIter;
49/// use std::sync::atomic::{AtomicUsize, Ordering};
50/// use rand::{Rng, SeedableRng};
51/// use rand_chacha::ChaCha8Rng;
52///
53/// struct Node {
54///     value: u64,
55///     children: Vec<Node>,
56/// }
57///
58/// impl Node {
59///     fn new(rng: &mut impl Rng, value: u64) -> Self {
60///         let num_children = match value {
61///             0 => 0,
62///             n => rng.random_range(0..(n as usize)),
63///         };
64///         let children = (0..num_children)
65///             .map(|i| Self::new(rng, i as u64))
66///             .collect();
67///         Self { value, children }
68///     }
69/// }
70///
71/// fn process(node_value: u64) {
72///     // fake computation
73///     std::thread::sleep(std::time::Duration::from_millis(node_value));
74/// }
75///
76/// // this defines how the iterator must extend:
77/// // each node drawn from the iterator adds its children to the end of the iterator
78/// fn extend<'a, 'b>(node: &'a &'b Node) -> &'b [Node] {
79///     &node.children
80/// }
81///
82/// // initiate iter with a single element, `root`
83/// // however, the iterator will `extend` on the fly as we keep drawing its elements
84/// let root = Node::new(&mut ChaCha8Rng::seed_from_u64(42), 70);
85/// let iter = ConcurrentRecursiveIter::new(extend, [&root]);
86///
87/// let num_threads = 8;
88/// let num_spawned = AtomicUsize::new(0);
89/// let num_processed_nodes = AtomicUsize::new(0);
90///
91/// std::thread::scope(|s| {
92///     let mut handles = vec![];
93///     for _ in 0..num_threads {
94///         handles.push(s.spawn(|| {
95///             // allow all threads to be spawned
96///             _ = num_spawned.fetch_add(1, Ordering::Relaxed);
97///             while num_spawned.load(Ordering::Relaxed) < num_threads {}
98///
99///             // `next` will first extend `iter` with children of `node,
100///             // and only then yield the `node`
101///             while let Some(node) = iter.next() {
102///                 process(node.value);
103///                 _ = num_processed_nodes.fetch_add(1, Ordering::Relaxed);
104///             }
105///         }));
106///     }
107/// });
108///
109/// assert_eq!(num_processed_nodes.into_inner(), 177);
110/// ```
111pub struct ConcurrentRecursiveIterCore<S, T, E, I, P = DefaultConPinnedVec<T>>
112where
113    S: Size,
114    T: Send,
115    E: Fn(&T) -> I + Sync,
116    I: IntoIterator<Item = T>,
117    I::IntoIter: ExactSizeIterator,
118    P: ConcurrentPinnedVec<T>,
119    <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
120{
121    queue: ConcurrentQueue<T, P>,
122    extend: E,
123    exact_len: S,
124    p: PhantomData<S>,
125}
126
127// new with unknown size
128
129impl<T, E, I, P> From<(E, ConcurrentQueue<T, P>)>
130    for ConcurrentRecursiveIterCore<UnknownSize, T, E, I, P>
131where
132    T: Send,
133    E: Fn(&T) -> I + Sync,
134    I: IntoIterator<Item = T>,
135    I::IntoIter: ExactSizeIterator,
136    P: ConcurrentPinnedVec<T>,
137    <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
138{
139    fn from((extend, queue): (E, ConcurrentQueue<T, P>)) -> Self {
140        Self {
141            queue,
142            extend,
143            exact_len: UnknownSize,
144            p: PhantomData,
145        }
146    }
147}
148
149impl<T, E, I> ConcurrentRecursiveIterCore<UnknownSize, T, E, I, DefaultConPinnedVec<T>>
150where
151    T: Send,
152    E: Fn(&T) -> I + Sync,
153    I: IntoIterator<Item = T>,
154    I::IntoIter: ExactSizeIterator,
155{
156    /// Creates a new dynamic concurrent iterator:
157    ///
158    /// * The iterator will initially contain `initial_elements`.
159    /// * Before yielding each element, say `e`, to the caller, the elements returned
160    ///   by `extend(&e)` will be added to the concurrent iterator, to be yield later.
161    ///
162    /// This constructor uses a [`ConcurrentQueue`] with the default pinned concurrent
163    /// collection under the hood. In order to crate the iterator using a different queue
164    /// use the `From`/`Into` traits, as demonstrated below.
165    ///
166    /// # UnknownSize vs ExactSize
167    ///
168    /// Size refers to the total number of elements that will be returned by the iterator,
169    /// which is the total of initial elements and all elements created by the recursive
170    /// extend calls.
171    ///
172    /// Note that the iterator created with this method will have [`UnknownSize`].
173    /// In order to create a recursive iterator with a known exact length, you may use
174    /// [`new_exact`] function.
175    ///
176    /// Providing an `exact_len` impacts the following:
177    /// * When an exact length is provided, the recursive iterator implements
178    ///   [`ExactSizeConcurrentIter`] in addition to [`ConcurrentIter`].
179    ///   This enables the `len` method to access the number of remaining elements in a
180    ///   concurrent program. When this is not necessary, the exact length argument
181    ///   can simply be skipped.
182    /// * On the other hand, a known length is very useful for performance optimization
183    ///   when the recursive iterator is used as the input of a parallel iterator of the
184    ///   [orx_parallel](https://crates.io/crates/orx-parallel) crate.
185    ///
186    /// [`new_exact`]: ConcurrentRecursiveIterExact::new_exact
187    ///
188    /// # Examples
189    ///
190    /// The following is a simple example to demonstrate how the dynamic iterator works.
191    ///
192    /// ```
193    /// use orx_concurrent_recursive_iter::ConcurrentRecursiveIter;
194    /// use orx_concurrent_iter::ConcurrentIter;
195    ///
196    /// let extend = |x: &usize| (*x < 5).then_some(x + 1);
197    /// let initial_elements = [1];
198    ///
199    /// let iter = ConcurrentRecursiveIter::new(extend, initial_elements);
200    /// let all: Vec<_> = iter.item_puller().collect();
201    ///
202    /// assert_eq!(all, [1, 2, 3, 4, 5]);
203    /// ```
204    ///
205    /// # Examples - From
206    ///
207    /// In the above example, the underlying pinned vector of the dynamic iterator created
208    /// with `new` is a [`SplitVec`] with a [`Doubling`] growth strategy.
209    ///
210    /// Alternatively, we can use a `SplitVec` with a [`Linear`] growth strategy, or a
211    /// pre-allocated [`FixedVec`] as the underlying storage. In order to do so, we can
212    /// use the `From` trait.
213    ///
214    /// ```
215    /// use orx_concurrent_recursive_iter::*;
216    /// use orx_concurrent_queue::ConcurrentQueue;
217    ///
218    /// let initial_elements = [1];
219    ///
220    /// // SplitVec with Linear growth
221    /// let queue = ConcurrentQueue::with_linear_growth(10, 4);
222    /// queue.extend(initial_elements);
223    /// let extend = |x: &usize| (*x < 5).then_some(x + 1);
224    /// let iter = ConcurrentRecursiveIter::from((extend, queue));
225    ///
226    /// let all: Vec<_> = iter.item_puller().collect();
227    /// assert_eq!(all, [1, 2, 3, 4, 5]);
228    ///
229    /// // FixedVec with fixed capacity
230    /// let queue = ConcurrentQueue::with_fixed_capacity(5);
231    /// queue.extend(initial_elements);
232    /// let extend = |x: &usize| (*x < 5).then_some(x + 1);
233    /// let iter = ConcurrentRecursiveIter::from((extend, queue));
234    ///
235    /// let all: Vec<_> = iter.item_puller().collect();
236    /// assert_eq!(all, [1, 2, 3, 4, 5]);
237    /// ```
238    ///
239    /// [`SplitVec`]: orx_split_vec::SplitVec
240    /// [`FixedVec`]: orx_fixed_vec::FixedVec
241    /// [`Doubling`]: orx_split_vec::Doubling
242    /// [`Linear`]: orx_split_vec::Linear
243    pub fn new(extend: E, initial_elements: impl IntoIterator<Item = T>) -> Self {
244        let mut vec = SplitVec::with_doubling_growth_and_max_concurrent_capacity();
245        vec.extend(initial_elements);
246        let queue = vec.into();
247        (extend, queue).into()
248    }
249}
250
251// new with exact size
252
253impl<T, E, I, P> From<(E, ConcurrentQueue<T, P>, usize)>
254    for ConcurrentRecursiveIterCore<ExactSize, T, E, I, P>
255where
256    T: Send,
257    E: Fn(&T) -> I + Sync,
258    I: IntoIterator<Item = T>,
259    I::IntoIter: ExactSizeIterator,
260    P: ConcurrentPinnedVec<T>,
261    <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
262{
263    fn from((extend, queue, exact_len): (E, ConcurrentQueue<T, P>, usize)) -> Self {
264        Self {
265            queue,
266            extend,
267            exact_len: ExactSize(exact_len),
268            p: PhantomData,
269        }
270    }
271}
272
273impl<T, E, I> ConcurrentRecursiveIterCore<ExactSize, T, E, I, DefaultConPinnedVec<T>>
274where
275    T: Send,
276    E: Fn(&T) -> I + Sync,
277    I: IntoIterator<Item = T>,
278    I::IntoIter: ExactSizeIterator,
279{
280    /// Creates a new dynamic concurrent iterator:
281    ///
282    /// * The iterator will initially contain `initial_elements`.
283    /// * Before yielding each element, say `e`, to the caller, the elements returned
284    ///   by `extend(&e)` will be added to the concurrent iterator, to be yield later.
285    ///
286    /// This constructor uses a [`ConcurrentQueue`] with the default pinned concurrent
287    /// collection under the hood. In order to crate the iterator using a different queue
288    /// use the `From`/`Into` traits, as demonstrated below.
289    ///
290    /// # UnknownSize vs ExactSize
291    ///
292    /// Size refers to the total number of elements that will be returned by the iterator,
293    /// which is the total of initial elements and all elements created by the recursive
294    /// extend calls.
295    ///
296    /// Note that the iterator created with this method will have [`ExactSize`].
297    /// In order to create a recursive iterator with an unknown length, you may use
298    /// [`new`] function.
299    ///
300    /// Providing an `exact_len` impacts the following:
301    /// * When an exact length is provided, the recursive iterator implements
302    ///   [`ExactSizeConcurrentIter`] in addition to [`ConcurrentIter`].
303    ///   This enables the `len` method to access the number of remaining elements in a
304    ///   concurrent program. When this is not necessary, the exact length argument
305    ///   can simply be skipped.
306    /// * On the other hand, a known length is very useful for performance optimization
307    ///   when the recursive iterator is used as the input of a parallel iterator of the
308    ///   [orx_parallel](https://crates.io/crates/orx-parallel) crate.
309    ///
310    /// [`new`]: ConcurrentRecursiveIter::new
311    ///
312    /// # Examples
313    ///
314    /// The following is a simple example to demonstrate how the dynamic iterator works.
315    ///
316    /// ```
317    /// use orx_concurrent_recursive_iter::*;
318    ///
319    /// let extend = |x: &usize| (*x < 5).then_some(x + 1);
320    /// let initial_elements = [1];
321    ///
322    /// let iter = ConcurrentRecursiveIterExact::new_exact(extend, initial_elements, 5);
323    /// assert_eq!(iter.len(), 5);
324    ///
325    /// assert_eq!(iter.next(), Some(1));
326    /// assert_eq!(iter.len(), 4);
327    ///
328    /// assert_eq!(iter.next(), Some(2));
329    /// assert_eq!(iter.len(), 3);
330    ///
331    /// let remaining: Vec<_> = iter.item_puller().collect();
332    /// assert_eq!(remaining, [3, 4, 5]);
333    /// assert_eq!(iter.len(), 0);
334    ///
335    /// assert_eq!(iter.next(), None);
336    /// assert_eq!(iter.len(), 0);
337    /// ```
338    ///
339    /// # Examples - From
340    ///
341    /// In the above example, the underlying pinned vector of the dynamic iterator created
342    /// with `new` is a [`SplitVec`] with a [`Doubling`] growth strategy.
343    ///
344    /// Alternatively, we can use a `SplitVec` with a [`Linear`] growth strategy, or a
345    /// pre-allocated [`FixedVec`] as the underlying storage. In order to do so, we can
346    /// use the `From` trait.
347    ///
348    /// Note that:
349    /// * `From<(Extend, ConcurrentQueue<T, P>)>` creates a recursive concurrent iter with
350    ///   [`UnknownSize`], while
351    /// * `From<(Extend, ConcurrentQueue<T, P>, usize)>` creates one with [`ExactSize`].
352    ///
353    /// ```
354    /// use orx_concurrent_recursive_iter::*;
355    /// use orx_concurrent_queue::ConcurrentQueue;
356    ///
357    /// let initial_elements = [1];
358    ///
359    /// // SplitVec with Linear growth
360    /// let queue = ConcurrentQueue::with_linear_growth(10, 4);
361    /// queue.extend(initial_elements);
362    /// let extend = |x: &usize| (*x < 5).then_some(x + 1);
363    /// let iter = ConcurrentRecursiveIterExact::from((extend, queue, 5));
364    ///
365    /// let all: Vec<_> = iter.item_puller().collect();
366    /// assert_eq!(all, [1, 2, 3, 4, 5]);
367    ///
368    /// // FixedVec with fixed capacity
369    /// let queue = ConcurrentQueue::with_fixed_capacity(5);
370    /// queue.extend(initial_elements);
371    /// let extend = |x: &usize| (*x < 5).then_some(x + 1);
372    /// let iter = ConcurrentRecursiveIterExact::from((extend, queue, 5));
373    ///
374    /// let all: Vec<_> = iter.item_puller().collect();
375    /// assert_eq!(all, [1, 2, 3, 4, 5]);
376    /// ```
377    ///
378    /// [`SplitVec`]: orx_split_vec::SplitVec
379    /// [`FixedVec`]: orx_fixed_vec::FixedVec
380    /// [`Doubling`]: orx_split_vec::Doubling
381    /// [`Linear`]: orx_split_vec::Linear
382    pub fn new_exact(
383        extend: E,
384        initial_elements: impl IntoIterator<Item = T>,
385        exact_len: usize,
386    ) -> Self {
387        let mut vec = SplitVec::with_doubling_growth_and_max_concurrent_capacity();
388        vec.extend(initial_elements);
389        let queue = vec.into();
390        (extend, queue, exact_len).into()
391    }
392}
393
394// con iter
395
396impl<S, T, E, I, P> ConcurrentIter for ConcurrentRecursiveIterCore<S, T, E, I, P>
397where
398    S: Size,
399    T: Send,
400    E: Fn(&T) -> I + Sync,
401    I: IntoIterator<Item = T>,
402    I::IntoIter: ExactSizeIterator,
403    P: ConcurrentPinnedVec<T>,
404    <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
405{
406    type Item = T;
407
408    type SequentialIter = DynSeqQueue<T, P, E, I>;
409
410    type ChunkPuller<'i>
411        = DynChunkPuller<'i, T, E, I, P>
412    where
413        Self: 'i;
414
415    fn into_seq_iter(self) -> Self::SequentialIter {
416        // SAFETY: we destruct the queue and immediately convert it into a sequential
417        // queue together with `popped..written` valid range information.
418        let (vec, written, popped) = unsafe { self.queue.destruct() };
419        DynSeqQueue::new(vec, written, popped, self.extend)
420    }
421
422    fn skip_to_end(&self) {
423        let len = self.queue.num_write_reserved(Ordering::Acquire);
424        let _remaining_to_drop = self.queue.pull(len);
425    }
426
427    fn next(&self) -> Option<Self::Item> {
428        let n = self.queue.pop()?;
429        let children = (self.extend)(&n);
430        self.queue.extend(children);
431        Some(n)
432    }
433
434    fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
435        let (idx, n) = self.queue.pop_with_idx()?;
436        let children = (self.extend)(&n);
437        self.queue.extend(children);
438        Some((idx, n))
439    }
440
441    fn size_hint(&self) -> (usize, Option<usize>) {
442        match self.exact_len.exact_len() {
443            Some(exact_len) => {
444                let popped = self.queue.num_popped(Ordering::Relaxed);
445                let remaining = exact_len - popped;
446                (remaining, Some(remaining))
447            }
448            None => {
449                let min = self.queue.len();
450                match min {
451                    0 => (0, Some(0)),
452                    n => (n, None),
453                }
454            }
455        }
456    }
457
458    fn is_completed_when_none_returned(&self) -> bool {
459        let popped = self.queue.num_popped(Ordering::Relaxed);
460        let write_reserved = self.queue.num_write_reserved(Ordering::Relaxed);
461        popped >= write_reserved
462    }
463
464    fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
465        DynChunkPuller::new(&self.extend, &self.queue, chunk_size)
466    }
467}
468
469impl<T, E, I, P> ExactSizeConcurrentIter for ConcurrentRecursiveIterCore<ExactSize, T, E, I, P>
470where
471    T: Send,
472    E: Fn(&T) -> I + Sync,
473    I: IntoIterator<Item = T>,
474    I::IntoIter: ExactSizeIterator,
475    P: ConcurrentPinnedVec<T>,
476    <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
477{
478    fn len(&self) -> usize {
479        self.exact_len.0 - self.queue.num_popped(Ordering::Relaxed)
480    }
481}