ConcurrentRecursiveIter

Struct ConcurrentRecursiveIter 

Source
pub struct ConcurrentRecursiveIter<T, E, I, P = DefaultConPinnedVec<T>>
where T: Send, E: Fn(&T) -> I + Sync, I: IntoIterator<Item = T>, I::IntoIter: ExactSizeIterator, P: ConcurrentPinnedVec<T>, <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
{ /* private fields */ }
Expand description

A recursive ConcurrentIter which:

  • naturally shrinks as we iterate,
  • but can also grow as it allows to add new items to the iterator, during iteration.

Growth of the iterator is expressed by the extend: E function with the signature Fn(&T) -> I, where I: IntoIterator<Item = T> with a known length.

In other words, for each element e pulled from the iterator, we call extend(&e) before returning it to the caller. All elements included in the iterator that extend returned are added to the end of the concurrent iterator, to be pulled later on.

The recursive concurrent iterator internally uses a ConcurrentQueue which allows for both concurrent push / extend and pop / pull operations.

§Example

The following example demonstrates a use case for the recursive concurrent iterator. Notice that the iterator is instantiated with:

  • a single element which is the root node,
  • and the extend method which defines how to extend the iterator from each node.

Including the root, there exist 177 nodes in the tree. We observe that all these nodes are concurrently added to the iterator, popped and processed.

use orx_concurrent_recursive_iter::ConcurrentRecursiveIter;
use orx_concurrent_iter::ConcurrentIter;
use std::sync::atomic::{AtomicUsize, Ordering};
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha8Rng;

struct Node {
    value: u64,
    children: Vec<Node>,
}

impl Node {
    fn new(rng: &mut impl Rng, value: u64) -> Self {
        let num_children = match value {
            0 => 0,
            n => rng.random_range(0..(n as usize)),
        };
        let children = (0..num_children)
            .map(|i| Self::new(rng, i as u64))
            .collect();
        Self { value, children }
    }
}

fn process(node_value: u64) {
    // fake computation
    std::thread::sleep(std::time::Duration::from_millis(node_value));
}

// this defines how the iterator must extend:
// each node drawn from the iterator adds its children to the end of the iterator
fn extend<'a, 'b>(node: &'a &'b Node) -> &'b [Node] {
    &node.children
}

// initiate iter with a single element, `root`
// however, the iterator will `extend` on the fly as we keep drawing its elements
let root = Node::new(&mut ChaCha8Rng::seed_from_u64(42), 70);
let iter = ConcurrentRecursiveIter::new(extend, [&root]);

let num_threads = 8;
let num_spawned = AtomicUsize::new(0);
let num_processed_nodes = AtomicUsize::new(0);

std::thread::scope(|s| {
    let mut handles = vec![];
    for _ in 0..num_threads {
        handles.push(s.spawn(|| {
            // allow all threads to be spawned
            _ = num_spawned.fetch_add(1, Ordering::Relaxed);
            while num_spawned.load(Ordering::Relaxed) < num_threads {}

            // `next` will first extend `iter` with children of `node,
            // and only then yield the `node`
            while let Some(node) = iter.next() {
                process(node.value);
                _ = num_processed_nodes.fetch_add(1, Ordering::Relaxed);
            }
        }));
    }
});

assert_eq!(num_processed_nodes.into_inner(), 177);

Implementations§

Source§

impl<T, E, I> ConcurrentRecursiveIter<T, E, I, DefaultConPinnedVec<T>>
where T: Send, E: Fn(&T) -> I + Sync, I: IntoIterator<Item = T>, I::IntoIter: ExactSizeIterator,

Source

pub fn new(extend: E, initial_elements: impl IntoIterator<Item = T>) -> Self

Creates a new dynamic concurrent iterator:

  • The iterator will initially contain initial_elements.
  • Before yielding each element, say e, to the caller, the elements returned by extend(&e) will be added to the concurrent iterator, to be yield later.

This constructor uses a ConcurrentQueue with the default pinned concurrent collection under the hood. In order to crate the iterator using a different queue use the From/Into traits, as demonstrated below.

§Examples

The following is a simple example to demonstrate how the dynamic iterator works.

use orx_concurrent_recursive_iter::ConcurrentRecursiveIter;
use orx_concurrent_iter::ConcurrentIter;

let extend = |x: &usize| (*x < 5).then_some(x + 1);
let initial_elements = [1];

let iter = ConcurrentRecursiveIter::new(extend, initial_elements);
let all: Vec<_> = iter.item_puller().collect();

assert_eq!(all, [1, 2, 3, 4, 5]);
use orx_concurrent_recursive_iter::ConcurrentRecursiveIter;
use orx_concurrent_iter::ConcurrentIter;

let extend = |x: &usize| (*x < 5).then_some(x + 1);
let initial_elements = [1];

let iter = ConcurrentRecursiveIter::new(extend, initial_elements);
let all: Vec<_> = iter.item_puller().collect();

assert_eq!(all, [1, 2, 3, 4, 5]);
§Examples - From

In the above example, the underlying pinned vector of the dynamic iterator created with new is a SplitVec with a Doubling growth strategy.

Alternatively, we can use a SplitVec with a Linear growth strategy, or a pre-allocated FixedVec as the underlying storage. In order to do so, we can use the From trait.

Trait Implementations§

Source§

impl<T, E, I, P> ConcurrentIter for ConcurrentRecursiveIter<T, E, I, P>
where T: Send, E: Fn(&T) -> I + Sync, I: IntoIterator<Item = T>, I::IntoIter: ExactSizeIterator, P: ConcurrentPinnedVec<T>, <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,

Source§

type Item = T

Type of the element that the concurrent iterator yields.
Source§

type SequentialIter = DynSeqQueue<T, P, E, I>

Type of the sequential iterator that the concurrent iterator can be converted into using the into_seq_iter method.
Source§

type ChunkPuller<'i> = DynChunkPuller<'i, T, E, I, P> where Self: 'i

Type of the chunk puller that can be created using the chunk_puller method.
Source§

fn into_seq_iter(self) -> Self::SequentialIter

Converts the concurrent iterator into its sequential regular counterpart. Note that the sequential iterator is a regular Iterator, and hence, does not have any overhead related with atomic states. Therefore, it is useful where the program decides to iterate over a single thread rather than concurrently by multiple threads. Read more
Source§

fn skip_to_end(&self)

Immediately jumps to the end of the iterator, skipping the remaining elements. Read more
Source§

fn next(&self) -> Option<Self::Item>

Returns the next element of the iterator. It returns None if there are no more elements left. Read more
Source§

fn next_with_idx(&self) -> Option<(usize, Self::Item)>

Returns the next element of the iterator together its index. It returns None if there are no more elements left. Read more
Source§

fn size_hint(&self) -> (usize, Option<usize>)

Returns the bounds on the remaining length of the iterator. Read more
Source§

fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_>

Creates a ChunkPuller from the concurrent iterator. The created chunk puller can be used to pull chunk_size elements at once from the data source, rather than pulling one by one. Read more
Source§

fn try_get_len(&self) -> Option<usize>

Returns Some(x) if the number of remaining items is known with certainly and if it is equal to x. Read more
Source§

fn item_puller(&self) -> ItemPuller<'_, Self>
where Self: Sized,

Creates a ItemPuller from the concurrent iterator. The created item puller can be used to pull elements one by one from the data source. Read more
Source§

fn item_puller_with_idx(&self) -> EnumeratedItemPuller<'_, Self>
where Self: Sized,

Creates a EnumeratedItemPuller from the concurrent iterator. The created item puller can be used to pull elements one by one from the data source together with the index of the elements. Read more
Source§

fn copied<'a, T>(self) -> ConIterCopied<'a, Self, T>
where T: Copy, Self: Sized + ConcurrentIter<Item = &'a T>,

Creates an iterator which copies all of its elements. Read more
Source§

fn cloned<'a, T>(self) -> ConIterCloned<'a, Self, T>
where T: Clone, Self: Sized + ConcurrentIter<Item = &'a T>,

Creates an iterator which clones all of its elements. Read more
Source§

fn enumerate(self) -> Enumerate<Self>
where Self: Sized,

Creates an iterator which gives the current iteration count as well as the next value. Read more
Source§

fn chain_inexact<C>( self, other: C, ) -> ChainUnknownLenI<Self, <C as IntoConcurrentIter>::IntoIter>
where C: IntoConcurrentIter<Item = Self::Item>, Self: Sized,

Creates a chain of this and other concurrent iterators. Read more
Source§

impl<T, E, I, P> From<(E, ConcurrentQueue<T, P>)> for ConcurrentRecursiveIter<T, E, I, P>
where T: Send, E: Fn(&T) -> I + Sync, I: IntoIterator<Item = T>, I::IntoIter: ExactSizeIterator, P: ConcurrentPinnedVec<T>, <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,

Source§

fn from((extend, queue): (E, ConcurrentQueue<T, P>)) -> Self

Converts to this type from the input type.

Auto Trait Implementations§

§

impl<T, E, I, P = <SplitVec<T> as IntoConcurrentPinnedVec<T>>::ConPinnedVec> !Freeze for ConcurrentRecursiveIter<T, E, I, P>

§

impl<T, E, I, P> RefUnwindSafe for ConcurrentRecursiveIter<T, E, I, P>

§

impl<T, E, I, P> Send for ConcurrentRecursiveIter<T, E, I, P>
where E: Send, P: Send,

§

impl<T, E, I, P> Sync for ConcurrentRecursiveIter<T, E, I, P>

§

impl<T, E, I, P> Unpin for ConcurrentRecursiveIter<T, E, I, P>
where E: Unpin, P: Unpin, T: Unpin,

§

impl<T, E, I, P> UnwindSafe for ConcurrentRecursiveIter<T, E, I, P>
where E: UnwindSafe, P: UnwindSafe, T: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<I> IntoConcurrentIter for I
where I: ConcurrentIter,

Source§

type Item = <I as ConcurrentIter>::Item

Type of the element that the concurrent iterator yields.
Source§

type IntoIter = I

Type of the concurrent iterator that this type can be converted into.
Source§

fn into_con_iter(self) -> <I as IntoConcurrentIter>::IntoIter

Trait to convert a source (collection or generator) into a concurrent iterator; i.e., ConcurrentIter, using its into_con_iter method. Read more
Source§

impl<T> SoM<T> for T

Source§

fn get_ref(&self) -> &T

Returns a reference to self.
Source§

fn get_mut(&mut self) -> &mut T

Returns a mutable reference to self.
Source§

impl<T> SoR<T> for T

Source§

fn get_ref(&self) -> &T

Returns a reference to self.
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.