Expand description
§orx-concurrent-recursive-iter
A concurrent iterator (ConcurrentIter) that can be extended recursively by each of its items.
This is a no-std crate.
§Concurrent Recursive Iter
ConcurrentRecursiveIter is a ConcurrentIter implementation which
- naturally shrinks as we iterate,
- but can also grow as it allows to add new items to the iterator, during iteration.
Assume the item type of the iterator is T. Growth of the iterator is expressed by the extend: E function with the signature Fn(&T) -> I where I: IntoIterator<Item = T> with a known length.
In other words, for each element e pulled from the iterator, the iterator internally calls extend(&e) before returning it to the caller. All elements included in the iterator that extend returned are added to the end of the concurrent iterator, to be pulled later on.
The recursive concurrent iterator internally uses a
ConcurrentQueuewhich allows for both concurrent push / extend and pop / pull operations.
§A simple example, extending by 0 or 1 elements
Consider the following example. We initiate the iterator with two elements, 1 and 2.
Growth is defined by the extend function. For every element x pulled from the iterator, we will add 10 * x to the iterator if x is less than a thousand.
use orx_concurrent_recursive_iter::*;
let initial = [1, 2];
let extend = |x: &usize| (*x < 1000).then_some(x * 10);
let iter = ConcurrentRecursiveIter::new(extend, initial);
let mut collected = vec![];
while let Some(x) = iter.next() {
collected.push(x);
}
assert_eq!(collected, vec![1, 2, 10, 20, 100, 200, 1000, 2000]);This sequential example allows us demonstrate the recursive iteration easily. Following is the list of events in order during the while let iteration:
iterhas elements[1, 2]- we make the
nextcall- 1 is pulled,
iterhas one element[2] extend(&1)is called which returns 10, this is added to iter,[2, 10].- only then, 1 is returned; i.e.,
xis set to 1 which is then used by the caller (added to thecollectedhere).
- 1 is pulled,
- we make the second
nextcall- 2 is pulled,
iterhas one element[10] extend(&2)is called which returns 20, this is added to iter,[10, 20].- then 2 is assigned to
x.
- 2 is pulled,
- …
- we make another
nextcall whileiterhas one element[2000].- 2000 is pulled, leaving
iterempty extend(&2000)is called which returns None.- then 2000 is assigned to
x.
- 2000 is pulled, leaving
- finally, the iterator is empty and the
nextcall returnsNone.
§A simple example, extending by 0 or multiple elements
The following is again a simple and sequential example, except that this time each element extends the recursive iterator by 0 or multiple elements.
use orx_concurrent_recursive_iter::*;
let initial = [1];
let extend = |x: &usize| match *x < 100 {
true => vec![x * 10, x * 20],
false => vec![],
};
let iter = ConcurrentRecursiveIter::new(extend, initial);
let collected: Vec<_> = iter.item_puller().collect();
assert_eq!(collected, vec![1, 10, 20, 100, 200, 200, 400]);Here, we start with only one initial element, 1:
- we make the first
nextcall:- 1 is pulled leaving the
iterempty. extend(&1)call returns two elements which are added to the iterator which then becomes[10, 20].
- 1 is pulled leaving the
- we make the second
nextcall:- 10 is pulled and
iterbecomes[20]. extend(&10)returns two more elements which results initer = [20, 100, 200].
- 10 is pulled and
- …
- we make another
nextcall whileiterhas one element[400].- 400 is pulled, leaving
iterempty extend(&400)is called which returns nothing.- then 400 is assigned to
x.
- 400 is pulled, leaving
- finally, the iterator is empty and the
nextcall returnsNone.
§The allocation problem
Notice in the above example that the extend method returns a vector. Therefore, each time it returns multiple elements, we need to allocate. This is due to the requirement that extend must return an ExactSizeIterator.
The same program can be implemented in the following way to avoid allocation:
use orx_concurrent_recursive_iter::*;
let initial = [1];
let extend = |x: &usize| {
(*x < 100)
.then_some(*x)
.into_iter()
.flat_map(|x| [x * 10, x * 20])
};
let iter = ConcurrentRecursiveIter::new(extend, initial);
let collected: Vec<_> = iter.item_puller().collect();
assert_eq!(collected, vec![1, 10, 20, 100, 200, 200, 400]);However, this code unfortunately does not compile:
FlatMap<std::option::IntoIter<usize>, [usize; 2], impl FnMut(usize) -> [usize; 2]>: ExactSizeIterator` is not satisfiedThe problem is that FlatMap<std::option::IntoIter<usize>, impl FnMut(usize) -> [usize; 2]> does not implement ExactSizeIterator.
We might, however, argue that we know exactly the size of this iterator from the type signature:
- it is 0 if the option is
None, - it is 2 otherwise.
And we would be absolutely correct!
This seems like a missing implementation from the core library and will hopefully be added.
And in certain use cases when iterating over recursive data structures, we do not observe the allocation problem since the allocation is already done while defining the input collection. The following example demonstrates such a case.
§Concurrent recursive iteration
Consider a recursive tree structure defined by the Node struct. Node::new call creates some random tree with a total of 177 nodes. These nodes are stored as descendants of the root.
We create our recursive iterator with one initial element which is the root. We define the extend function from a node as its children. As mentioned above, notice that we only return the slice of children and do not need allocation here.
This allows us to process each of the 177 nodes concurrently.
use orx_concurrent_recursive_iter::*;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha8Rng;
use std::sync::atomic::{AtomicUsize, Ordering};
struct Node {
value: u64,
children: Vec<Node>,
}
impl Node {
fn new(rng: &mut impl Rng, value: u64) -> Self {
let num_children = match value {
0 => 0,
n => rng.random_range(0..(n as usize)),
};
let children = (0..num_children)
.map(|i| Self::new(rng, i as u64))
.collect();
Self { value, children }
}
}
fn process(node_value: u64) {
// fake computation
std::thread::sleep(std::time::Duration::from_millis(node_value));
}
// this defines how the iterator must extend:
// each node drawn from the iterator adds its children to the end of the iterator
fn extend<'a, 'b>(node: &'a &'b Node) -> &'b [Node] {
&node.children
}
// initiate iter with a single element, `root`
// however, the iterator will `extend` on the fly as we keep drawing its elements
let root = Node::new(&mut ChaCha8Rng::seed_from_u64(42), 70);
let iter = ConcurrentRecursiveIter::new(extend, [&root]);
let num_threads = 8;
let num_spawned = AtomicUsize::new(0);
let num_processed_nodes = AtomicUsize::new(0);
std::thread::scope(|s| {
let mut handles = vec![];
for _ in 0..num_threads {
handles.push(s.spawn(|| {
// allow all threads to be spawned
_ = num_spawned.fetch_add(1, Ordering::Relaxed);
while num_spawned.load(Ordering::Relaxed) < num_threads {}
// `next` will first extend `iter` with children of `node,
// and only then yield the `node`
while let Some(node) = iter.next() {
process(node.value);
_ = num_processed_nodes.fetch_add(1, Ordering::Relaxed);
}
}));
}
});
assert_eq!(num_processed_nodes.into_inner(), 177);§Contributing
Contributions are welcome! If you notice an error, have a question or think something could be improved, please open an issue or create a PR.
§License
Dual-licensed under Apache 2.0 or MIT.
Modules§
- chain
- Chain of two or more concurrent iterators.
- cloned
- Cloned transformation of concurrent iterators.
- copied
- Copied transformation of concurrent iterators.
- enumerate
- Enumerated transformation of concurrent iterators.
- implementations
- Implementations of concurrent iterators.
- iter
- Module for creating special iterators.
Structs§
- Concurrent
Recursive Iter - A recursive
ConcurrentIterwhich: - Enumerated
Item Puller - A regular
Iteratorwhich is created from and linked to and pulls its elements from aConcurrentIter. - Flattened
Chunk Puller - Flattened version of a
ChunkPullerwhich conveniently implementsIterator. - Flattened
Enumerated Chunk Puller - Flattened version of a
ChunkPullerwhich conveniently implementsIterator. - Item
Puller - A regular
Iteratorwhich is created from and linked to and pulls its elements from aConcurrentIter.
Traits§
- Chunk
Puller - A chunk puller which is created from and linked to and pulls its elements
from a
ConcurrentIter. - Concurrent
Collection - A type implementing
ConcurrentCollectionis a collection owning the elements such that - Concurrent
Collection Mut - A type implementing
ConcurrentCollectionMutis a collection owning the elements such that - Concurrent
Drainable Over Slice - A type which can create a concurrent draining iterator over any of its sub-slices.
- Concurrent
Iter - An iterator which can safely be used concurrently by multiple threads.
- Concurrent
Iterable ConcurrentIterabletypes are those from which concurrent iterators can be created multiple times using thecon_itermethod, since this method call does not consume the source.- Exact
Size Concurrent Iter - A concurrent iterator which has a certain information of the number of remaining elements.
- Into
Concurrent Iter - Trait to convert a source (collection or generator) into a concurrent iterator; i.e.,
ConcurrentIter, using itsinto_con_itermethod. - Iter
Into Concurrent Iter - Any regular iterator implements
IterIntoConcurrentItertrait allowing them to be used as a concurrent iterator; i.e.,ConcurrentIter, by callingiter_into_con_iter.