Crate orx_concurrent_recursive_iter

Crate orx_concurrent_recursive_iter 

Source
Expand description

§orx-concurrent-recursive-iter

orx-concurrent-recursive-iter crate orx-concurrent-recursive-iter crate orx-concurrent-recursive-iter documentation

A concurrent iterator (ConcurrentIter) that can be extended recursively by each of its items.

This is a no-std crate.

§Concurrent Recursive Iter

ConcurrentRecursiveIter is a ConcurrentIter implementation which

  • naturally shrinks as we iterate,
  • but can also grow as it allows to add new items to the iterator, during iteration.

Assume the item type of the iterator is T. Growth of the iterator is expressed by the extend: E function with the signature Fn(&T) -> I where I: IntoIterator<Item = T> with a known length.

In other words, for each element e pulled from the iterator, the iterator internally calls extend(&e) before returning it to the caller. All elements included in the iterator that extend returned are added to the end of the concurrent iterator, to be pulled later on.

The recursive concurrent iterator internally uses a ConcurrentQueue which allows for both concurrent push / extend and pop / pull operations.

§A simple example, extending by 0 or 1 elements

Consider the following example. We initiate the iterator with two elements, 1 and 2.

Growth is defined by the extend function. For every element x pulled from the iterator, we will add 10 * x to the iterator if x is less than a thousand.

use orx_concurrent_recursive_iter::*;

let initial = [1, 2];
let extend = |x: &usize, queue: &Queue<usize>| {
    if *x < 1000 {
        queue.push(x * 10);
    }
};

let iter = ConcurrentRecursiveIter::new(initial, extend);

let mut collected = vec![];
while let Some(x) = iter.next() {
    collected.push(x);
}

assert_eq!(collected, vec![1, 2, 10, 20, 100, 200, 1000, 2000]);

This sequential example allows us demonstrate the recursive iteration easily. Following is the list of events in order during the while let iteration:

  • iter has elements [1, 2]
  • we make the next call
    • 1 is pulled, iter has one element [2]
    • extend(&1) is called which returns 10, this is added to iter, [2, 10].
    • only then, 1 is returned; i.e., x is set to 1 which is then used by the caller (added to the collected here).
  • we make the second next call
    • 2 is pulled, iter has one element [10]
    • extend(&2) is called which returns 20, this is added to iter, [10, 20].
    • then 2 is assigned to x.
  • we make another next call while iter has one element [2000].
    • 2000 is pulled, leaving iter empty
    • extend(&2000) is called which returns None.
    • then 2000 is assigned to x.
  • finally, the iterator is empty and the next call returns None.

§A simple example, extending by 0 or multiple elements

The following is again a simple and sequential example, except that this time each element extends the recursive iterator by 0 or multiple elements.

use orx_concurrent_recursive_iter::*;

let initial = [1];
let extend = |x: &usize, queue: &Queue<usize>| if *x < 100 {
    queue.extend([x * 10, x * 20]);
};
let iter = ConcurrentRecursiveIter::new(initial, extend);

let collected: Vec<_> = iter.item_puller().collect();
assert_eq!(collected, vec![1, 10, 20, 100, 200, 200, 400]);

Here, we start with only one initial element, 1:

  • we make the first next call:
    • 1 is pulled leaving the iter empty.
    • extend(&1, &queue) call adds two elements to the iterator which then becomes [10, 20].
  • we make the second next call:
    • 10 is pulled and iter becomes [20].
    • extend(&10, &queue) adds two more elements which results in iter = [20, 100, 200].
  • we make another next call while iter has one element [400].
    • 400 is pulled, leaving iter empty
    • extend(&400, &queue) does nothing this time.
    • then 400 is assigned to x.
  • finally, the iterator is empty and the next call returns None.

§Concurrent recursive iteration

Consider a recursive tree structure defined by the Node struct. Node::new call creates some random tree with a total of 177 nodes. These nodes are stored as descendants of the root.

We create our recursive iterator with one initial element which is the root. We define the extend function from a node as its children. As mentioned above, notice that we only return the slice of children and do not need allocation here.

This allows us to process each of the 177 nodes concurrently.

use orx_concurrent_recursive_iter::*;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha8Rng;
use std::sync::atomic::{AtomicUsize, Ordering};

struct Node {
    value: u64,
    children: Vec<Node>,
}

impl Node {
    fn new(rng: &mut impl Rng, value: u64) -> Self {
        let num_children = match value {
            0 => 0,
            n => rng.random_range(0..(n as usize)),
        };
        let children = (0..num_children)
            .map(|i| Self::new(rng, i as u64))
            .collect();
        Self { value, children }
    }
}

fn process(node_value: u64) {
    // fake computation
    std::thread::sleep(std::time::Duration::from_millis(node_value));
}

// this defines how the iterator must extend:
// each node drawn from the iterator adds its children to the end of the iterator
fn extend<'a, 'b>(node: &'a &'b Node, queue: &Queue<&'b Node>) {
    queue.extend(&node.children);
}

// initiate iter with a single element, `root`
// however, the iterator will `extend` on the fly as we keep drawing its elements
let root = Node::new(&mut ChaCha8Rng::seed_from_u64(42), 70);
let iter = ConcurrentRecursiveIter::new([&root], extend);

let num_threads = 8;
let num_spawned = AtomicUsize::new(0);
let num_processed_nodes = AtomicUsize::new(0);

std::thread::scope(|s| {
    let mut handles = vec![];
    for _ in 0..num_threads {
        handles.push(s.spawn(|| {
            // allow all threads to be spawned
            _ = num_spawned.fetch_add(1, Ordering::Relaxed);
            while num_spawned.load(Ordering::Relaxed) < num_threads {}

            // `next` will first extend `iter` with children of `node,
            // and only then yield the `node`
            while let Some(node) = iter.next() {
                process(node.value);
                _ = num_processed_nodes.fetch_add(1, Ordering::Relaxed);
            }
        }));
    }
});

assert_eq!(num_processed_nodes.into_inner(), 177);

§Contributing

Contributions are welcome! If you notice an error, have a question or think something could be improved, please open an issue or create a PR.

§License

Dual-licensed under Apache 2.0 or MIT.

Modules§

chain
Chain of two or more concurrent iterators.
cloned
Cloned transformation of concurrent iterators.
copied
Copied transformation of concurrent iterators.
enumerate
Enumerated transformation of concurrent iterators.
implementations
Implementations of concurrent iterators.
iter
Module for creating special iterators.

Structs§

ConcurrentRecursiveIter
A recursive ConcurrentIter which:
EnumeratedItemPuller
A regular Iterator which is created from and linked to and pulls its elements from a ConcurrentIter.
FlattenedChunkPuller
Flattened version of a ChunkPuller which conveniently implements Iterator.
FlattenedEnumeratedChunkPuller
Flattened version of a ChunkPuller which conveniently implements Iterator.
ItemPuller
A regular Iterator which is created from and linked to and pulls its elements from a ConcurrentIter.
Queue
A queue of elements that will be returned by the ConcurrentRecursiveIter.

Traits§

ChunkPuller
A chunk puller which is created from and linked to and pulls its elements from a ConcurrentIter.
ConcurrentCollection
A type implementing ConcurrentCollection is a collection owning the elements such that
ConcurrentCollectionMut
A type implementing ConcurrentCollectionMut is a collection owning the elements such that
ConcurrentDrainableOverSlice
A type which can create a concurrent draining iterator over any of its sub-slices.
ConcurrentIter
An iterator which can safely be used concurrently by multiple threads.
ConcurrentIterable
ConcurrentIterable types are those from which concurrent iterators can be created multiple times using the con_iter method, since this method call does not consume the source.
ExactSizeConcurrentIter
A concurrent iterator which has a certain information of the number of remaining elements.
IntoConcurrentIter
Trait to convert a source (collection or generator) into a concurrent iterator; i.e., ConcurrentIter, using its into_con_iter method.
IterIntoConcurrentIter
Any regular iterator implements IterIntoConcurrentIter trait allowing them to be used as a concurrent iterator; i.e., ConcurrentIter, by calling iter_into_con_iter.