orx_concurrent_recursive_iter/con_iter.rs
1use crate::{
2 ExactSize, Size, UnknownSize, chunk_puller::DynChunkPuller, dyn_seq_queue::DynSeqQueue,
3};
4use core::{marker::PhantomData, sync::atomic::Ordering};
5use orx_concurrent_iter::{ConcurrentIter, ExactSizeConcurrentIter};
6use orx_concurrent_queue::{ConcurrentQueue, DefaultConPinnedVec};
7use orx_pinned_vec::{ConcurrentPinnedVec, IntoConcurrentPinnedVec};
8use orx_split_vec::SplitVec;
9
10// type aliases for exact an unknown
11
12/// A [`ConcurrentRecursiveIterCore`] with [`UnknownSize`].
13pub type ConcurrentRecursiveIter<T, E, I, P = DefaultConPinnedVec<T>> =
14 ConcurrentRecursiveIterCore<UnknownSize, T, E, I, P>;
15
16/// A [`ConcurrentRecursiveIterCore`] with [`ExactSize`].
17pub type ConcurrentRecursiveIterExact<T, E, I, P = DefaultConPinnedVec<T>> =
18 ConcurrentRecursiveIterCore<ExactSize, T, E, I, P>;
19
20// core
21
22/// A recursive [`ConcurrentIter`] which:
23/// * naturally shrinks as we iterate,
24/// * but can also grow as it allows to add new items to the iterator, during iteration.
25///
26/// Growth of the iterator is expressed by the `extend: E` function with the signature `Fn(&T) -> I`,
27/// where `I: IntoIterator<Item = T>` with a known length.
28///
29/// In other words, for each element `e` pulled from the iterator, we call `extend(&e)` before
30/// returning it to the caller. All elements included in the iterator that `extend` returned
31/// are added to the end of the concurrent iterator, to be pulled later on.
32///
33/// *The recursive concurrent iterator internally uses a [`ConcurrentQueue`] which allows for both
34/// concurrent push / extend and pop / pull operations.*
35///
36/// # Example
37///
38/// The following example demonstrates a use case for the recursive concurrent iterator.
39/// Notice that the iterator is instantiated with:
40/// * a single element which is the root node,
41/// * and the extend method which defines how to extend the iterator from each node.
42///
43/// Including the root, there exist 177 nodes in the tree. We observe that all these
44/// nodes are concurrently added to the iterator, popped and processed.
45///
46/// ```
47/// use orx_concurrent_recursive_iter::ConcurrentRecursiveIter;
48/// use orx_concurrent_iter::ConcurrentIter;
49/// use std::sync::atomic::{AtomicUsize, Ordering};
50/// use rand::{Rng, SeedableRng};
51/// use rand_chacha::ChaCha8Rng;
52///
53/// struct Node {
54/// value: u64,
55/// children: Vec<Node>,
56/// }
57///
58/// impl Node {
59/// fn new(rng: &mut impl Rng, value: u64) -> Self {
60/// let num_children = match value {
61/// 0 => 0,
62/// n => rng.random_range(0..(n as usize)),
63/// };
64/// let children = (0..num_children)
65/// .map(|i| Self::new(rng, i as u64))
66/// .collect();
67/// Self { value, children }
68/// }
69/// }
70///
71/// fn process(node_value: u64) {
72/// // fake computation
73/// std::thread::sleep(std::time::Duration::from_millis(node_value));
74/// }
75///
76/// // this defines how the iterator must extend:
77/// // each node drawn from the iterator adds its children to the end of the iterator
78/// fn extend<'a, 'b>(node: &'a &'b Node) -> &'b [Node] {
79/// &node.children
80/// }
81///
82/// // initiate iter with a single element, `root`
83/// // however, the iterator will `extend` on the fly as we keep drawing its elements
84/// let root = Node::new(&mut ChaCha8Rng::seed_from_u64(42), 70);
85/// let iter = ConcurrentRecursiveIter::new(extend, [&root]);
86///
87/// let num_threads = 8;
88/// let num_spawned = AtomicUsize::new(0);
89/// let num_processed_nodes = AtomicUsize::new(0);
90///
91/// std::thread::scope(|s| {
92/// let mut handles = vec![];
93/// for _ in 0..num_threads {
94/// handles.push(s.spawn(|| {
95/// // allow all threads to be spawned
96/// _ = num_spawned.fetch_add(1, Ordering::Relaxed);
97/// while num_spawned.load(Ordering::Relaxed) < num_threads {}
98///
99/// // `next` will first extend `iter` with children of `node,
100/// // and only then yield the `node`
101/// while let Some(node) = iter.next() {
102/// process(node.value);
103/// _ = num_processed_nodes.fetch_add(1, Ordering::Relaxed);
104/// }
105/// }));
106/// }
107/// });
108///
109/// assert_eq!(num_processed_nodes.into_inner(), 177);
110/// ```
111pub struct ConcurrentRecursiveIterCore<S, T, E, I, P = DefaultConPinnedVec<T>>
112where
113 S: Size,
114 T: Send,
115 E: Fn(&T) -> I + Sync,
116 I: IntoIterator<Item = T>,
117 I::IntoIter: ExactSizeIterator,
118 P: ConcurrentPinnedVec<T>,
119 <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
120{
121 queue: ConcurrentQueue<T, P>,
122 extend: E,
123 exact_len: S,
124 p: PhantomData<S>,
125}
126
127// new with unknown size
128
129impl<T, E, I, P> From<(E, ConcurrentQueue<T, P>)>
130 for ConcurrentRecursiveIterCore<UnknownSize, T, E, I, P>
131where
132 T: Send,
133 E: Fn(&T) -> I + Sync,
134 I: IntoIterator<Item = T>,
135 I::IntoIter: ExactSizeIterator,
136 P: ConcurrentPinnedVec<T>,
137 <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
138{
139 fn from((extend, queue): (E, ConcurrentQueue<T, P>)) -> Self {
140 Self {
141 queue,
142 extend,
143 exact_len: UnknownSize,
144 p: PhantomData,
145 }
146 }
147}
148
149impl<T, E, I> ConcurrentRecursiveIterCore<UnknownSize, T, E, I, DefaultConPinnedVec<T>>
150where
151 T: Send,
152 E: Fn(&T) -> I + Sync,
153 I: IntoIterator<Item = T>,
154 I::IntoIter: ExactSizeIterator,
155{
156 /// Creates a new dynamic concurrent iterator:
157 ///
158 /// * The iterator will initially contain `initial_elements`.
159 /// * Before yielding each element, say `e`, to the caller, the elements returned
160 /// by `extend(&e)` will be added to the concurrent iterator, to be yield later.
161 ///
162 /// This constructor uses a [`ConcurrentQueue`] with the default pinned concurrent
163 /// collection under the hood. In order to crate the iterator using a different queue
164 /// use the `From`/`Into` traits, as demonstrated below.
165 ///
166 /// # UnknownSize vs ExactSize
167 ///
168 /// Size refers to the total number of elements that will be returned by the iterator,
169 /// which is the total of initial elements and all elements created by the recursive
170 /// extend calls.
171 ///
172 /// Note that the iterator created with this method will have [`UnknownSize`].
173 /// In order to create a recursive iterator with a known exact length, you may use
174 /// [`new_exact`] function.
175 ///
176 /// Providing an `exact_len` impacts the following:
177 /// * When an exact length is provided, the recursive iterator implements
178 /// [`ExactSizeConcurrentIter`] in addition to [`ConcurrentIter`].
179 /// This enables the `len` method to access the number of remaining elements in a
180 /// concurrent program. When this is not necessary, the exact length argument
181 /// can simply be skipped.
182 /// * On the other hand, a known length is very useful for performance optimization
183 /// when the recursive iterator is used as the input of a parallel iterator of the
184 /// [orx_parallel](https://crates.io/crates/orx-parallel) crate.
185 ///
186 /// [`new_exact`]: ConcurrentRecursiveIterExact::new_exact
187 ///
188 /// # Examples
189 ///
190 /// The following is a simple example to demonstrate how the dynamic iterator works.
191 ///
192 /// ```
193 /// use orx_concurrent_recursive_iter::ConcurrentRecursiveIter;
194 /// use orx_concurrent_iter::ConcurrentIter;
195 ///
196 /// let extend = |x: &usize| (*x < 5).then_some(x + 1);
197 /// let initial_elements = [1];
198 ///
199 /// let iter = ConcurrentRecursiveIter::new(extend, initial_elements);
200 /// let all: Vec<_> = iter.item_puller().collect();
201 ///
202 /// assert_eq!(all, [1, 2, 3, 4, 5]);
203 /// ```
204 ///
205 /// # Examples - From
206 ///
207 /// In the above example, the underlying pinned vector of the dynamic iterator created
208 /// with `new` is a [`SplitVec`] with a [`Doubling`] growth strategy.
209 ///
210 /// Alternatively, we can use a `SplitVec` with a [`Linear`] growth strategy, or a
211 /// pre-allocated [`FixedVec`] as the underlying storage. In order to do so, we can
212 /// use the `From` trait.
213 ///
214 /// ```
215 /// use orx_concurrent_recursive_iter::*;
216 /// use orx_concurrent_queue::ConcurrentQueue;
217 ///
218 /// let initial_elements = [1];
219 ///
220 /// // SplitVec with Linear growth
221 /// let queue = ConcurrentQueue::with_linear_growth(10, 4);
222 /// queue.extend(initial_elements);
223 /// let extend = |x: &usize| (*x < 5).then_some(x + 1);
224 /// let iter = ConcurrentRecursiveIter::from((extend, queue));
225 ///
226 /// let all: Vec<_> = iter.item_puller().collect();
227 /// assert_eq!(all, [1, 2, 3, 4, 5]);
228 ///
229 /// // FixedVec with fixed capacity
230 /// let queue = ConcurrentQueue::with_fixed_capacity(5);
231 /// queue.extend(initial_elements);
232 /// let extend = |x: &usize| (*x < 5).then_some(x + 1);
233 /// let iter = ConcurrentRecursiveIter::from((extend, queue));
234 ///
235 /// let all: Vec<_> = iter.item_puller().collect();
236 /// assert_eq!(all, [1, 2, 3, 4, 5]);
237 /// ```
238 ///
239 /// [`SplitVec`]: orx_split_vec::SplitVec
240 /// [`FixedVec`]: orx_fixed_vec::FixedVec
241 /// [`Doubling`]: orx_split_vec::Doubling
242 /// [`Linear`]: orx_split_vec::Linear
243 pub fn new(extend: E, initial_elements: impl IntoIterator<Item = T>) -> Self {
244 let mut vec = SplitVec::with_doubling_growth_and_max_concurrent_capacity();
245 vec.extend(initial_elements);
246 let queue = vec.into();
247 (extend, queue).into()
248 }
249}
250
251// new with exact size
252
253impl<T, E, I, P> From<(E, ConcurrentQueue<T, P>, usize)>
254 for ConcurrentRecursiveIterCore<ExactSize, T, E, I, P>
255where
256 T: Send,
257 E: Fn(&T) -> I + Sync,
258 I: IntoIterator<Item = T>,
259 I::IntoIter: ExactSizeIterator,
260 P: ConcurrentPinnedVec<T>,
261 <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
262{
263 fn from((extend, queue, exact_len): (E, ConcurrentQueue<T, P>, usize)) -> Self {
264 Self {
265 queue,
266 extend,
267 exact_len: ExactSize(exact_len),
268 p: PhantomData,
269 }
270 }
271}
272
273impl<T, E, I> ConcurrentRecursiveIterCore<ExactSize, T, E, I, DefaultConPinnedVec<T>>
274where
275 T: Send,
276 E: Fn(&T) -> I + Sync,
277 I: IntoIterator<Item = T>,
278 I::IntoIter: ExactSizeIterator,
279{
280 /// Creates a new dynamic concurrent iterator:
281 ///
282 /// * The iterator will initially contain `initial_elements`.
283 /// * Before yielding each element, say `e`, to the caller, the elements returned
284 /// by `extend(&e)` will be added to the concurrent iterator, to be yield later.
285 ///
286 /// This constructor uses a [`ConcurrentQueue`] with the default pinned concurrent
287 /// collection under the hood. In order to crate the iterator using a different queue
288 /// use the `From`/`Into` traits, as demonstrated below.
289 ///
290 /// # UnknownSize vs ExactSize
291 ///
292 /// Size refers to the total number of elements that will be returned by the iterator,
293 /// which is the total of initial elements and all elements created by the recursive
294 /// extend calls.
295 ///
296 /// Note that the iterator created with this method will have [`ExactSize`].
297 /// In order to create a recursive iterator with an unknown length, you may use
298 /// [`new`] function.
299 ///
300 /// Providing an `exact_len` impacts the following:
301 /// * When an exact length is provided, the recursive iterator implements
302 /// [`ExactSizeConcurrentIter`] in addition to [`ConcurrentIter`].
303 /// This enables the `len` method to access the number of remaining elements in a
304 /// concurrent program. When this is not necessary, the exact length argument
305 /// can simply be skipped.
306 /// * On the other hand, a known length is very useful for performance optimization
307 /// when the recursive iterator is used as the input of a parallel iterator of the
308 /// [orx_parallel](https://crates.io/crates/orx-parallel) crate.
309 ///
310 /// [`new`]: ConcurrentRecursiveIter::new
311 ///
312 /// # Examples
313 ///
314 /// The following is a simple example to demonstrate how the dynamic iterator works.
315 ///
316 /// ```
317 /// use orx_concurrent_recursive_iter::*;
318 ///
319 /// let extend = |x: &usize| (*x < 5).then_some(x + 1);
320 /// let initial_elements = [1];
321 ///
322 /// let iter = ConcurrentRecursiveIterExact::new_exact(extend, initial_elements, 5);
323 /// assert_eq!(iter.len(), 5);
324 ///
325 /// assert_eq!(iter.next(), Some(1));
326 /// assert_eq!(iter.len(), 4);
327 ///
328 /// assert_eq!(iter.next(), Some(2));
329 /// assert_eq!(iter.len(), 3);
330 ///
331 /// let remaining: Vec<_> = iter.item_puller().collect();
332 /// assert_eq!(remaining, [3, 4, 5]);
333 /// assert_eq!(iter.len(), 0);
334 ///
335 /// assert_eq!(iter.next(), None);
336 /// assert_eq!(iter.len(), 0);
337 /// ```
338 ///
339 /// # Examples - From
340 ///
341 /// In the above example, the underlying pinned vector of the dynamic iterator created
342 /// with `new` is a [`SplitVec`] with a [`Doubling`] growth strategy.
343 ///
344 /// Alternatively, we can use a `SplitVec` with a [`Linear`] growth strategy, or a
345 /// pre-allocated [`FixedVec`] as the underlying storage. In order to do so, we can
346 /// use the `From` trait.
347 ///
348 /// Note that:
349 /// * `From<(Extend, ConcurrentQueue<T, P>)>` creates a recursive concurrent iter with
350 /// [`UnknownSize`], while
351 /// * `From<(Extend, ConcurrentQueue<T, P>, usize)>` creates one with [`ExactSize`].
352 ///
353 /// ```
354 /// use orx_concurrent_recursive_iter::*;
355 /// use orx_concurrent_queue::ConcurrentQueue;
356 ///
357 /// let initial_elements = [1];
358 ///
359 /// // SplitVec with Linear growth
360 /// let queue = ConcurrentQueue::with_linear_growth(10, 4);
361 /// queue.extend(initial_elements);
362 /// let extend = |x: &usize| (*x < 5).then_some(x + 1);
363 /// let iter = ConcurrentRecursiveIterExact::from((extend, queue, 5));
364 ///
365 /// let all: Vec<_> = iter.item_puller().collect();
366 /// assert_eq!(all, [1, 2, 3, 4, 5]);
367 ///
368 /// // FixedVec with fixed capacity
369 /// let queue = ConcurrentQueue::with_fixed_capacity(5);
370 /// queue.extend(initial_elements);
371 /// let extend = |x: &usize| (*x < 5).then_some(x + 1);
372 /// let iter = ConcurrentRecursiveIterExact::from((extend, queue, 5));
373 ///
374 /// let all: Vec<_> = iter.item_puller().collect();
375 /// assert_eq!(all, [1, 2, 3, 4, 5]);
376 /// ```
377 ///
378 /// [`SplitVec`]: orx_split_vec::SplitVec
379 /// [`FixedVec`]: orx_fixed_vec::FixedVec
380 /// [`Doubling`]: orx_split_vec::Doubling
381 /// [`Linear`]: orx_split_vec::Linear
382 pub fn new_exact(
383 extend: E,
384 initial_elements: impl IntoIterator<Item = T>,
385 exact_len: usize,
386 ) -> Self {
387 let mut vec = SplitVec::with_doubling_growth_and_max_concurrent_capacity();
388 vec.extend(initial_elements);
389 let queue = vec.into();
390 (extend, queue, exact_len).into()
391 }
392}
393
394// con iter
395
396impl<S, T, E, I, P> ConcurrentIter for ConcurrentRecursiveIterCore<S, T, E, I, P>
397where
398 S: Size,
399 T: Send,
400 E: Fn(&T) -> I + Sync,
401 I: IntoIterator<Item = T>,
402 I::IntoIter: ExactSizeIterator,
403 P: ConcurrentPinnedVec<T>,
404 <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
405{
406 type Item = T;
407
408 type SequentialIter = DynSeqQueue<T, P, E, I>;
409
410 type ChunkPuller<'i>
411 = DynChunkPuller<'i, T, E, I, P>
412 where
413 Self: 'i;
414
415 fn into_seq_iter(self) -> Self::SequentialIter {
416 // SAFETY: we destruct the queue and immediately convert it into a sequential
417 // queue together with `popped..written` valid range information.
418 let (vec, written, popped) = unsafe { self.queue.destruct() };
419 DynSeqQueue::new(vec, written, popped, self.extend)
420 }
421
422 fn skip_to_end(&self) {
423 let len = self.queue.num_write_reserved(Ordering::Acquire);
424 let _remaining_to_drop = self.queue.pull(len);
425 }
426
427 fn next(&self) -> Option<Self::Item> {
428 let n = self.queue.pop()?;
429 let children = (self.extend)(&n);
430 self.queue.extend(children);
431 Some(n)
432 }
433
434 fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
435 let (idx, n) = self.queue.pop_with_idx()?;
436 let children = (self.extend)(&n);
437 self.queue.extend(children);
438 Some((idx, n))
439 }
440
441 fn size_hint(&self) -> (usize, Option<usize>) {
442 match self.exact_len.exact_len() {
443 Some(exact_len) => {
444 let popped = self.queue.num_popped(Ordering::Relaxed);
445 let remaining = exact_len - popped;
446 (remaining, Some(remaining))
447 }
448 None => {
449 let min = self.queue.len();
450 match min {
451 0 => (0, Some(0)),
452 n => (n, None),
453 }
454 }
455 }
456 }
457
458 fn is_completed_when_none_returned(&self) -> bool {
459 let popped = self.queue.num_popped(Ordering::Relaxed);
460 let write_reserved = self.queue.num_write_reserved(Ordering::Relaxed);
461 popped >= write_reserved
462 }
463
464 fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
465 DynChunkPuller::new(&self.extend, &self.queue, chunk_size)
466 }
467}
468
469impl<T, E, I, P> ExactSizeConcurrentIter for ConcurrentRecursiveIterCore<ExactSize, T, E, I, P>
470where
471 T: Send,
472 E: Fn(&T) -> I + Sync,
473 I: IntoIterator<Item = T>,
474 I::IntoIter: ExactSizeIterator,
475 P: ConcurrentPinnedVec<T>,
476 <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
477{
478 fn len(&self) -> usize {
479 self.exact_len.0 - self.queue.num_popped(Ordering::Relaxed)
480 }
481}