orx-concurrent-recursive-iter 2.0.0

A concurrent iterator that can be extended recursively by each of its items.
Documentation
# orx-concurrent-recursive-iter

[![orx-concurrent-recursive-iter crate](https://img.shields.io/crates/v/orx-concurrent-recursive-iter.svg)](https://crates.io/crates/orx-concurrent-recursive-iter)
[![orx-concurrent-recursive-iter crate](https://img.shields.io/crates/d/orx-concurrent-recursive-iter.svg)](https://crates.io/crates/orx-concurrent-recursive-iter)
[![orx-concurrent-recursive-iter documentation](https://docs.rs/orx-concurrent-recursive-iter/badge.svg)](https://docs.rs/orx-concurrent-recursive-iter)

A concurrent iterator ([ConcurrentIter](https://docs.rs/orx-concurrent-iter/latest/orx_concurrent_iter/trait.ConcurrentIter.html)) that can be extended recursively by each of its items.

> This is a **no-std** crate.

## Concurrent Recursive Iter

[`ConcurrentRecursiveIter`](https://docs.rs/orx-concurrent-iter/latest/orx_concurrent_recursive_iter/struct.ConcurrentRecursiveIter.html) is a [ConcurrentIter](https://docs.rs/orx-concurrent-iter/latest/orx_concurrent_iter/trait.ConcurrentIter.html) implementation which

* naturally shrinks as we iterate,
* but can also grow as it allows to add new items to the iterator, during iteration.

Assume the item type of the iterator is `T`. Growth of the iterator is expressed by the `extend: E` function with the signature `Fn(&T) -> I` where `I: IntoIterator<Item = T>` with a known length.

In other words, for each element `e` pulled from the iterator, the iterator internally calls `extend(&e)` before returning it to the caller. All elements included in the iterator that `extend` returned are added to the end of the concurrent iterator, to be pulled later on.

> The recursive concurrent iterator internally uses a [`ConcurrentQueue`]https://docs.rs/orx-concurrent-queue/latest/orx_concurrent_queue/struct.ConcurrentQueue.html which allows for both concurrent push / extend and pop / pull operations.

### A simple example, extending by 0 or 1 elements

Consider the following example. We initiate the iterator with two elements, 1 and 2.

Growth is defined by the `extend` function. For every element `x` pulled from the iterator, we will add `10 * x` to the iterator if `x` is less than a thousand.

```rust
use orx_concurrent_recursive_iter::*;

let initial = [1, 2];
let extend = |x: &usize, queue: &Queue<usize>| {
    if *x < 1000 {
        queue.push(x * 10);
    }
};

let iter = ConcurrentRecursiveIter::new(initial, extend);

let mut collected = vec![];
while let Some(x) = iter.next() {
    collected.push(x);
}

assert_eq!(collected, vec![1, 2, 10, 20, 100, 200, 1000, 2000]);
```

This sequential example allows us demonstrate the recursive iteration easily. Following is the list of events in order during the `while let` iteration:
* `iter` has elements `[1, 2]`
* we make the `next` call
  * 1 is pulled, `iter` has one element `[2]`
  * `extend(&1)` is called which returns 10, this is added to iter, `[2, 10]`.
  * only then, 1 is returned; i.e., `x` is set to 1 which is then used by the caller (added to the `collected` here).
* we make the second `next` call
  * 2 is pulled, `iter` has one element `[10]`
  * `extend(&2)` is called which returns 20, this is added to iter, `[10, 20]`.
  * then 2 is assigned to `x`.
* ...
* we make another `next` call while `iter` has one element `[2000]`.
  * 2000 is pulled, leaving `iter` empty
  * `extend(&2000)` is called which returns None.
  * then 2000 is assigned to `x`.
* finally, the iterator is empty and the `next` call returns `None`.

### A simple example, extending by 0 or multiple elements

The following is again a simple and sequential example, except that this time each element extends the recursive iterator by 0 or multiple elements.

```rust
use orx_concurrent_recursive_iter::*;

let initial = [1];
let extend = |x: &usize, queue: &Queue<usize>| if *x < 100 {
    queue.extend([x * 10, x * 20]);
};
let iter = ConcurrentRecursiveIter::new(initial, extend);

let collected: Vec<_> = iter.item_puller().collect();
assert_eq!(collected, vec![1, 10, 20, 100, 200, 200, 400]);
```

Here, we start with only one initial element, 1:

* we make the first `next` call:
  * 1 is pulled leaving the `iter` empty.
  * `extend(&1, &queue)` call adds two elements to the iterator which then becomes `[10, 20]`.
* we make the second `next` call:
  * 10 is pulled and `iter` becomes `[20]`.
  * `extend(&10, &queue)` adds two more elements which results in `iter = [20, 100, 200]`.
* ...
* we make another `next` call while `iter` has one element `[400]`.
  * 400 is pulled, leaving `iter` empty
  * `extend(&400, &queue)` does nothing this time.
  * then 400 is assigned to `x`.
* finally, the iterator is empty and the `next` call returns `None`.

### Concurrent recursive iteration

Consider a recursive tree structure defined by the `Node` struct. `Node::new` call creates some random tree with a total of 177 nodes. These nodes are stored as descendants of the `root`.

We create our recursive iterator with one initial element which is the root. We define the `extend` function from a node as its children. As mentioned above, notice that we only return the slice of children and do not need allocation here.

This allows us to `process` each of the 177 nodes concurrently.

```rust
use orx_concurrent_recursive_iter::*;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha8Rng;
use std::sync::atomic::{AtomicUsize, Ordering};

struct Node {
    value: u64,
    children: Vec<Node>,
}

impl Node {
    fn new(rng: &mut impl Rng, value: u64) -> Self {
        let num_children = match value {
            0 => 0,
            n => rng.random_range(0..(n as usize)),
        };
        let children = (0..num_children)
            .map(|i| Self::new(rng, i as u64))
            .collect();
        Self { value, children }
    }
}

fn process(node_value: u64) {
    // fake computation
    std::thread::sleep(std::time::Duration::from_millis(node_value));
}

// this defines how the iterator must extend:
// each node drawn from the iterator adds its children to the end of the iterator
fn extend<'a, 'b>(node: &'a &'b Node, queue: &Queue<&'b Node>) {
    queue.extend(&node.children);
}

// initiate iter with a single element, `root`
// however, the iterator will `extend` on the fly as we keep drawing its elements
let root = Node::new(&mut ChaCha8Rng::seed_from_u64(42), 70);
let iter = ConcurrentRecursiveIter::new([&root], extend);

let num_threads = 8;
let num_spawned = AtomicUsize::new(0);
let num_processed_nodes = AtomicUsize::new(0);

std::thread::scope(|s| {
    let mut handles = vec![];
    for _ in 0..num_threads {
        handles.push(s.spawn(|| {
            // allow all threads to be spawned
            _ = num_spawned.fetch_add(1, Ordering::Relaxed);
            while num_spawned.load(Ordering::Relaxed) < num_threads {}

            // `next` will first extend `iter` with children of `node,
            // and only then yield the `node`
            while let Some(node) = iter.next() {
                process(node.value);
                _ = num_processed_nodes.fetch_add(1, Ordering::Relaxed);
            }
        }));
    }
});

assert_eq!(num_processed_nodes.into_inner(), 177);
```

## Contributing

Contributions are welcome! If you notice an error, have a question or think something could be improved, please open an [issue](https://github.com/orxfun/orx-concurrent-recursive-iter/issues/new) or create a PR.

## License

Dual-licensed under [Apache 2.0](LICENSE-APACHE) or [MIT](LICENSE-MIT).