orx_concurrent_recursive_iter/
con_iter.rs

1use crate::{chunk_puller::DynChunkPuller, dyn_seq_queue::DynSeqQueue};
2use core::sync::atomic::Ordering;
3use orx_concurrent_iter::ConcurrentIter;
4use orx_concurrent_queue::{ConcurrentQueue, DefaultConPinnedVec};
5use orx_pinned_vec::{ConcurrentPinnedVec, IntoConcurrentPinnedVec};
6use orx_split_vec::SplitVec;
7
8/// A recursive [`ConcurrentIter`] which:
9/// * naturally shrinks as we iterate,
10/// * but can also grow as it allows to add new items to the iterator, during iteration.
11///
12/// Growth of the iterator is expressed by the `extend: E` function with the signature `Fn(&T) -> I`,
13/// where `I: IntoIterator<Item = T>` with a known length.
14///
15/// In other words, for each element `e` pulled from the iterator, we call `extend(&e)` before
16/// returning it to the caller. All elements included in the iterator that `extend` returned
17/// are added to the end of the concurrent iterator, to be pulled later on.
18///
19/// *The recursive concurrent iterator internally uses a [`ConcurrentQueue`] which allows for both
20/// concurrent push / extend and pop / pull operations.*
21///
22/// # Example
23///
24/// The following example demonstrates a use case for the recursive concurrent iterator.
25/// Notice that the iterator is instantiated with:
26/// * a single element which is the root node,
27/// * and the extend method which defines how to extend the iterator from each node.
28///
29/// Including the root, there exist 177 nodes in the tree. We observe that all these
30/// nodes are concurrently added to the iterator, popped and processed.
31///
32/// ```
33/// use orx_concurrent_recursive_iter::ConcurrentRecursiveIter;
34/// use orx_concurrent_iter::ConcurrentIter;
35/// use std::sync::atomic::{AtomicUsize, Ordering};
36/// use rand::{Rng, SeedableRng};
37/// use rand_chacha::ChaCha8Rng;
38///
39/// struct Node {
40///     value: u64,
41///     children: Vec<Node>,
42/// }
43///
44/// impl Node {
45///     fn new(rng: &mut impl Rng, value: u64) -> Self {
46///         let num_children = match value {
47///             0 => 0,
48///             n => rng.random_range(0..(n as usize)),
49///         };
50///         let children = (0..num_children)
51///             .map(|i| Self::new(rng, i as u64))
52///             .collect();
53///         Self { value, children }
54///     }
55/// }
56///
57/// fn process(node_value: u64) {
58///     // fake computation
59///     std::thread::sleep(std::time::Duration::from_millis(node_value));
60/// }
61///
62/// // this defines how the iterator must extend:
63/// // each node drawn from the iterator adds its children to the end of the iterator
64/// fn extend<'a, 'b>(node: &'a &'b Node) -> &'b [Node] {
65///     &node.children
66/// }
67///
68/// // initiate iter with a single element, `root`
69/// // however, the iterator will `extend` on the fly as we keep drawing its elements
70/// let root = Node::new(&mut ChaCha8Rng::seed_from_u64(42), 70);
71/// let iter = ConcurrentRecursiveIter::new(extend, [&root]);
72///
73/// let num_threads = 8;
74/// let num_spawned = AtomicUsize::new(0);
75/// let num_processed_nodes = AtomicUsize::new(0);
76///
77/// std::thread::scope(|s| {
78///     let mut handles = vec![];
79///     for _ in 0..num_threads {
80///         handles.push(s.spawn(|| {
81///             // allow all threads to be spawned
82///             _ = num_spawned.fetch_add(1, Ordering::Relaxed);
83///             while num_spawned.load(Ordering::Relaxed) < num_threads {}
84///
85///             // `next` will first extend `iter` with children of `node,
86///             // and only then yield the `node`
87///             while let Some(node) = iter.next() {
88///                 process(node.value);
89///                 _ = num_processed_nodes.fetch_add(1, Ordering::Relaxed);
90///             }
91///         }));
92///     }
93/// });
94///
95/// assert_eq!(num_processed_nodes.into_inner(), 177);
96/// ```
97pub struct ConcurrentRecursiveIter<T, E, I, P = DefaultConPinnedVec<T>>
98where
99    T: Send,
100    E: Fn(&T) -> I + Sync,
101    I: IntoIterator<Item = T>,
102    I::IntoIter: ExactSizeIterator,
103    P: ConcurrentPinnedVec<T>,
104    <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
105{
106    queue: ConcurrentQueue<T, P>,
107    extend: E,
108}
109
110impl<T, E, I, P> From<(E, ConcurrentQueue<T, P>)> for ConcurrentRecursiveIter<T, E, I, P>
111where
112    T: Send,
113    E: Fn(&T) -> I + Sync,
114    I: IntoIterator<Item = T>,
115    I::IntoIter: ExactSizeIterator,
116    P: ConcurrentPinnedVec<T>,
117    <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
118{
119    fn from((extend, queue): (E, ConcurrentQueue<T, P>)) -> Self {
120        Self { queue, extend }
121    }
122}
123
124impl<T, E, I> ConcurrentRecursiveIter<T, E, I, DefaultConPinnedVec<T>>
125where
126    T: Send,
127    E: Fn(&T) -> I + Sync,
128    I: IntoIterator<Item = T>,
129    I::IntoIter: ExactSizeIterator,
130{
131    /// Creates a new dynamic concurrent iterator:
132    ///
133    /// * The iterator will initially contain `initial_elements`.
134    /// * Before yielding each element, say `e`, to the caller, the elements returned
135    ///   by `extend(&e)` will be added to the concurrent iterator, to be yield later.
136    ///
137    /// This constructor uses a [`ConcurrentQueue`] with the default pinned concurrent
138    /// collection under the hood. In order to crate the iterator using a different queue
139    /// use the `From`/`Into` traits, as demonstrated below.
140    ///
141    /// # Examples
142    ///
143    /// The following is a simple example to demonstrate how the dynamic iterator works.
144    ///
145    /// ```
146    /// use orx_concurrent_recursive_iter::ConcurrentRecursiveIter;
147    /// use orx_concurrent_iter::ConcurrentIter;
148    ///
149    /// let extend = |x: &usize| (*x < 5).then_some(x + 1);
150    /// let initial_elements = [1];
151    ///
152    /// let iter = ConcurrentRecursiveIter::new(extend, initial_elements);
153    /// let all: Vec<_> = iter.item_puller().collect();
154    ///
155    /// assert_eq!(all, [1, 2, 3, 4, 5]);
156    /// ```
157    ///
158    /// ```
159    /// use orx_concurrent_recursive_iter::ConcurrentRecursiveIter;
160    /// use orx_concurrent_iter::ConcurrentIter;
161    ///
162    /// let extend = |x: &usize| (*x < 5).then_some(x + 1);
163    /// let initial_elements = [1];
164    ///
165    /// let iter = ConcurrentRecursiveIter::new(extend, initial_elements);
166    /// let all: Vec<_> = iter.item_puller().collect();
167    ///
168    /// assert_eq!(all, [1, 2, 3, 4, 5]);
169    /// ```
170    ///
171    /// # Examples - From
172    ///
173    /// In the above example, the underlying pinned vector of the dynamic iterator created
174    /// with `new` is a [`SplitVec`] with a [`Doubling`] growth strategy.
175    ///
176    /// Alternatively, we can use a `SplitVec` with a [`Linear`] growth strategy, or a
177    /// pre-allocated [`FixedVec`] as the underlying storage. In order to do so, we can
178    /// use the `From` trait.
179    ///
180    /// [`SplitVec`]: orx_split_vec::SplitVec
181    /// [`FixedVec`]: orx_fixed_vec::FixedVec
182    /// [`Doubling`]: orx_split_vec::Doubling
183    /// [`Linear`]: orx_split_vec::Linear
184    pub fn new(extend: E, initial_elements: impl IntoIterator<Item = T>) -> Self {
185        let mut vec = SplitVec::with_doubling_growth_and_max_concurrent_capacity();
186        vec.extend(initial_elements);
187        let queue = vec.into();
188        Self { queue, extend }
189    }
190}
191
192impl<T, E, I, P> ConcurrentIter for ConcurrentRecursiveIter<T, E, I, P>
193where
194    T: Send,
195    E: Fn(&T) -> I + Sync,
196    I: IntoIterator<Item = T>,
197    I::IntoIter: ExactSizeIterator,
198    P: ConcurrentPinnedVec<T>,
199    <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
200{
201    type Item = T;
202
203    type SequentialIter = DynSeqQueue<T, P, E, I>;
204
205    type ChunkPuller<'i>
206        = DynChunkPuller<'i, T, E, I, P>
207    where
208        Self: 'i;
209
210    fn into_seq_iter(self) -> Self::SequentialIter {
211        // SAFETY: we destruct the queue and immediately convert it into a sequential
212        // queue together with `popped..written` valid range information.
213        let (vec, written, popped) = unsafe { self.queue.destruct() };
214        DynSeqQueue::new(vec, written, popped, self.extend)
215    }
216
217    fn skip_to_end(&self) {
218        let len = self.queue.num_write_reserved(Ordering::Acquire);
219        let _remaining_to_drop = self.queue.pull(len);
220    }
221
222    fn next(&self) -> Option<Self::Item> {
223        let n = self.queue.pop()?;
224        let children = (self.extend)(&n);
225        self.queue.extend(children);
226        Some(n)
227    }
228
229    fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
230        let (idx, n) = self.queue.pop_with_idx()?;
231        let children = (self.extend)(&n);
232        self.queue.extend(children);
233        Some((idx, n))
234    }
235
236    fn size_hint(&self) -> (usize, Option<usize>) {
237        let min = self.queue.len();
238        match min {
239            0 => (0, Some(0)),
240            n => (n, None),
241        }
242    }
243
244    fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
245        DynChunkPuller::new(&self.extend, &self.queue, chunk_size)
246    }
247}