orx_concurrent_recursive_iter/con_iter.rs
1use crate::{chunk_puller::DynChunkPuller, dyn_seq_queue::DynSeqQueue};
2use core::sync::atomic::Ordering;
3use orx_concurrent_iter::ConcurrentIter;
4use orx_concurrent_queue::{ConcurrentQueue, DefaultConPinnedVec};
5use orx_pinned_vec::{ConcurrentPinnedVec, IntoConcurrentPinnedVec};
6use orx_split_vec::SplitVec;
7
8/// A recursive [`ConcurrentIter`] which:
9/// * naturally shrinks as we iterate,
10/// * but can also grow as it allows to add new items to the iterator, during iteration.
11///
12/// Growth of the iterator is expressed by the `extend: E` function with the signature `Fn(&T) -> I`,
13/// where `I: IntoIterator<Item = T>` with a known length.
14///
15/// In other words, for each element `e` pulled from the iterator, we call `extend(&e)` before
16/// returning it to the caller. All elements included in the iterator that `extend` returned
17/// are added to the end of the concurrent iterator, to be pulled later on.
18///
19/// *The recursive concurrent iterator internally uses a [`ConcurrentQueue`] which allows for both
20/// concurrent push / extend and pop / pull operations.*
21///
22/// # Example
23///
24/// The following example demonstrates a use case for the recursive concurrent iterator.
25/// Notice that the iterator is instantiated with:
26/// * a single element which is the root node,
27/// * and the extend method which defines how to extend the iterator from each node.
28///
29/// Including the root, there exist 177 nodes in the tree. We observe that all these
30/// nodes are concurrently added to the iterator, popped and processed.
31///
32/// ```
33/// use orx_concurrent_recursive_iter::ConcurrentRecursiveIter;
34/// use orx_concurrent_iter::ConcurrentIter;
35/// use std::sync::atomic::{AtomicUsize, Ordering};
36/// use rand::{Rng, SeedableRng};
37/// use rand_chacha::ChaCha8Rng;
38///
39/// struct Node {
40/// value: u64,
41/// children: Vec<Node>,
42/// }
43///
44/// impl Node {
45/// fn new(rng: &mut impl Rng, value: u64) -> Self {
46/// let num_children = match value {
47/// 0 => 0,
48/// n => rng.random_range(0..(n as usize)),
49/// };
50/// let children = (0..num_children)
51/// .map(|i| Self::new(rng, i as u64))
52/// .collect();
53/// Self { value, children }
54/// }
55/// }
56///
57/// fn process(node_value: u64) {
58/// // fake computation
59/// std::thread::sleep(std::time::Duration::from_millis(node_value));
60/// }
61///
62/// // this defines how the iterator must extend:
63/// // each node drawn from the iterator adds its children to the end of the iterator
64/// fn extend<'a, 'b>(node: &'a &'b Node) -> &'b [Node] {
65/// &node.children
66/// }
67///
68/// // initiate iter with a single element, `root`
69/// // however, the iterator will `extend` on the fly as we keep drawing its elements
70/// let root = Node::new(&mut ChaCha8Rng::seed_from_u64(42), 70);
71/// let iter = ConcurrentRecursiveIter::new(extend, [&root]);
72///
73/// let num_threads = 8;
74/// let num_spawned = AtomicUsize::new(0);
75/// let num_processed_nodes = AtomicUsize::new(0);
76///
77/// std::thread::scope(|s| {
78/// let mut handles = vec![];
79/// for _ in 0..num_threads {
80/// handles.push(s.spawn(|| {
81/// // allow all threads to be spawned
82/// _ = num_spawned.fetch_add(1, Ordering::Relaxed);
83/// while num_spawned.load(Ordering::Relaxed) < num_threads {}
84///
85/// // `next` will first extend `iter` with children of `node,
86/// // and only then yield the `node`
87/// while let Some(node) = iter.next() {
88/// process(node.value);
89/// _ = num_processed_nodes.fetch_add(1, Ordering::Relaxed);
90/// }
91/// }));
92/// }
93/// });
94///
95/// assert_eq!(num_processed_nodes.into_inner(), 177);
96/// ```
97pub struct ConcurrentRecursiveIter<T, E, I, P = DefaultConPinnedVec<T>>
98where
99 T: Send,
100 E: Fn(&T) -> I + Sync,
101 I: IntoIterator<Item = T>,
102 I::IntoIter: ExactSizeIterator,
103 P: ConcurrentPinnedVec<T>,
104 <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
105{
106 queue: ConcurrentQueue<T, P>,
107 extend: E,
108}
109
110impl<T, E, I, P> From<(E, ConcurrentQueue<T, P>)> for ConcurrentRecursiveIter<T, E, I, P>
111where
112 T: Send,
113 E: Fn(&T) -> I + Sync,
114 I: IntoIterator<Item = T>,
115 I::IntoIter: ExactSizeIterator,
116 P: ConcurrentPinnedVec<T>,
117 <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
118{
119 fn from((extend, queue): (E, ConcurrentQueue<T, P>)) -> Self {
120 Self { queue, extend }
121 }
122}
123
124impl<T, E, I> ConcurrentRecursiveIter<T, E, I, DefaultConPinnedVec<T>>
125where
126 T: Send,
127 E: Fn(&T) -> I + Sync,
128 I: IntoIterator<Item = T>,
129 I::IntoIter: ExactSizeIterator,
130{
131 /// Creates a new dynamic concurrent iterator:
132 ///
133 /// * The iterator will initially contain `initial_elements`.
134 /// * Before yielding each element, say `e`, to the caller, the elements returned
135 /// by `extend(&e)` will be added to the concurrent iterator, to be yield later.
136 ///
137 /// This constructor uses a [`ConcurrentQueue`] with the default pinned concurrent
138 /// collection under the hood. In order to crate the iterator using a different queue
139 /// use the `From`/`Into` traits, as demonstrated below.
140 ///
141 /// # Examples
142 ///
143 /// The following is a simple example to demonstrate how the dynamic iterator works.
144 ///
145 /// ```
146 /// use orx_concurrent_recursive_iter::ConcurrentRecursiveIter;
147 /// use orx_concurrent_iter::ConcurrentIter;
148 ///
149 /// let extend = |x: &usize| (*x < 5).then_some(x + 1);
150 /// let initial_elements = [1];
151 ///
152 /// let iter = ConcurrentRecursiveIter::new(extend, initial_elements);
153 /// let all: Vec<_> = iter.item_puller().collect();
154 ///
155 /// assert_eq!(all, [1, 2, 3, 4, 5]);
156 /// ```
157 ///
158 /// ```
159 /// use orx_concurrent_recursive_iter::ConcurrentRecursiveIter;
160 /// use orx_concurrent_iter::ConcurrentIter;
161 ///
162 /// let extend = |x: &usize| (*x < 5).then_some(x + 1);
163 /// let initial_elements = [1];
164 ///
165 /// let iter = ConcurrentRecursiveIter::new(extend, initial_elements);
166 /// let all: Vec<_> = iter.item_puller().collect();
167 ///
168 /// assert_eq!(all, [1, 2, 3, 4, 5]);
169 /// ```
170 ///
171 /// # Examples - From
172 ///
173 /// In the above example, the underlying pinned vector of the dynamic iterator created
174 /// with `new` is a [`SplitVec`] with a [`Doubling`] growth strategy.
175 ///
176 /// Alternatively, we can use a `SplitVec` with a [`Linear`] growth strategy, or a
177 /// pre-allocated [`FixedVec`] as the underlying storage. In order to do so, we can
178 /// use the `From` trait.
179 ///
180 /// [`SplitVec`]: orx_split_vec::SplitVec
181 /// [`FixedVec`]: orx_fixed_vec::FixedVec
182 /// [`Doubling`]: orx_split_vec::Doubling
183 /// [`Linear`]: orx_split_vec::Linear
184 pub fn new(extend: E, initial_elements: impl IntoIterator<Item = T>) -> Self {
185 let mut vec = SplitVec::with_doubling_growth_and_max_concurrent_capacity();
186 vec.extend(initial_elements);
187 let queue = vec.into();
188 Self { queue, extend }
189 }
190}
191
192impl<T, E, I, P> ConcurrentIter for ConcurrentRecursiveIter<T, E, I, P>
193where
194 T: Send,
195 E: Fn(&T) -> I + Sync,
196 I: IntoIterator<Item = T>,
197 I::IntoIter: ExactSizeIterator,
198 P: ConcurrentPinnedVec<T>,
199 <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
200{
201 type Item = T;
202
203 type SequentialIter = DynSeqQueue<T, P, E, I>;
204
205 type ChunkPuller<'i>
206 = DynChunkPuller<'i, T, E, I, P>
207 where
208 Self: 'i;
209
210 fn into_seq_iter(self) -> Self::SequentialIter {
211 // SAFETY: we destruct the queue and immediately convert it into a sequential
212 // queue together with `popped..written` valid range information.
213 let (vec, written, popped) = unsafe { self.queue.destruct() };
214 DynSeqQueue::new(vec, written, popped, self.extend)
215 }
216
217 fn skip_to_end(&self) {
218 let len = self.queue.num_write_reserved(Ordering::Acquire);
219 let _remaining_to_drop = self.queue.pull(len);
220 }
221
222 fn next(&self) -> Option<Self::Item> {
223 let n = self.queue.pop()?;
224 let children = (self.extend)(&n);
225 self.queue.extend(children);
226 Some(n)
227 }
228
229 fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
230 let (idx, n) = self.queue.pop_with_idx()?;
231 let children = (self.extend)(&n);
232 self.queue.extend(children);
233 Some((idx, n))
234 }
235
236 fn size_hint(&self) -> (usize, Option<usize>) {
237 let min = self.queue.len();
238 match min {
239 0 => (0, Some(0)),
240 n => (n, None),
241 }
242 }
243
244 fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
245 DynChunkPuller::new(&self.extend, &self.queue, chunk_size)
246 }
247}