orx-concurrent-recursive-iter
A concurrent iterator (ConcurrentIter) that can be extended recursively by each of its items.
This is a no-std crate.
Concurrent Recursive Iter
ConcurrentRecursiveIter is a ConcurrentIter implementation which
- naturally shrinks as we iterate,
- but can also grow as it allows to add new items to the iterator, during iteration.
Assume the item type of the iterator is T. Growth of the iterator is expressed by the extend: E function with the signature Fn(&T) -> I where I: IntoIterator<Item = T> with a known length.
In other words, for each element e pulled from the iterator, the iterator internally calls extend(&e) before returning it to the caller. All elements included in the iterator that extend returned are added to the end of the concurrent iterator, to be pulled later on.
The recursive concurrent iterator internally uses a
ConcurrentQueuewhich allows for both concurrent push / extend and pop / pull operations.
A simple example, extending by 0 or 1 elements
Consider the following example. We initiate the iterator with two elements, 1 and 2.
Growth is defined by the extend function. For every element x pulled from the iterator, we will add 10 * x to the iterator if x is less than a thousand.
use *;
let initial = ;
let extend = ;
let iter = new;
let mut collected = vec!;
while let Some = iter.next
assert_eq!;
This sequential example allows us demonstrate the recursive iteration easily. Following is the list of events in order during the while let iteration:
iterhas elements[1, 2]- we make the
nextcall- 1 is pulled,
iterhas one element[2] extend(&1)is called which returns 10, this is added to iter,[2, 10].- only then, 1 is returned; i.e.,
xis set to 1 which is then used by the caller (added to thecollectedhere).
- 1 is pulled,
- we make the second
nextcall- 2 is pulled,
iterhas one element[10] extend(&2)is called which returns 20, this is added to iter,[10, 20].- then 2 is assigned to
x.
- 2 is pulled,
- ...
- we make another
nextcall whileiterhas one element[2000].- 2000 is pulled, leaving
iterempty extend(&2000)is called which returns None.- then 2000 is assigned to
x.
- 2000 is pulled, leaving
- finally, the iterator is empty and the
nextcall returnsNone.
A simple example, extending by 0 or multiple elements
The following is again a simple and sequential example, except that this time each element extends the recursive iterator by 0 or multiple elements.
use *;
let initial = ;
let extend = ;
let iter = new;
let collected: = iter.item_puller.collect;
assert_eq!;
Here, we start with only one initial element, 1:
- we make the first
nextcall:- 1 is pulled leaving the
iterempty. extend(&1)call returns two elements which are added to the iterator which then becomes[10, 20].
- 1 is pulled leaving the
- we make the second
nextcall:- 10 is pulled and
iterbecomes[20]. extend(&10)returns two more elements which results initer = [20, 100, 200].
- 10 is pulled and
- ...
- we make another
nextcall whileiterhas one element[400].- 400 is pulled, leaving
iterempty extend(&400)is called which returns nothing.- then 400 is assigned to
x.
- 400 is pulled, leaving
- finally, the iterator is empty and the
nextcall returnsNone.
The allocation problem
Notice in the above example that the extend method returns a vector. Therefore, each time it returns multiple elements, we need to allocate. This is due to the requirement that extend must return an ExactSizeIterator.
The same program can be implemented in the following way to avoid allocation:
use *;
let initial = ;
let extend = ;
let iter = new;
let collected: = iter.item_puller.collect;
assert_eq!;
However, this code unfortunately does not compile:
)
The problem is that FlatMap<std::option::IntoIter<usize>, impl FnMut(usize) -> [usize; 2]> does not implement ExactSizeIterator.
We might, however, argue that we know exactly the size of this iterator from the type signature:
- it is 0 if the option is
None, - it is 2 otherwise.
And we would be absolutely correct!
This seems like a missing implementation from the core library and will hopefully be added.
And in certain use cases when iterating over recursive data structures, we do not observe the allocation problem since the allocation is already done while defining the input collection. The following example demonstrates such a case.
Concurrent recursive iteration
Consider a recursive tree structure defined by the Node struct. Node::new call creates some random tree with a total of 177 nodes. These nodes are stored as descendants of the root.
We create our recursive iterator with one initial element which is the root. We define the extend function from a node as its children. As mentioned above, notice that we only return the slice of children and do not need allocation here.
This allows us to process each of the 177 nodes concurrently.
use *;
use ;
use ChaCha8Rng;
use ;
// this defines how the iterator must extend:
// each node drawn from the iterator adds its children to the end of the iterator
// initiate iter with a single element, `root`
// however, the iterator will `extend` on the fly as we keep drawing its elements
let root = new;
let iter = new;
let num_threads = 8;
let num_spawned = new;
let num_processed_nodes = new;
scope;
assert_eq!;
Contributing
Contributions are welcome! If you notice an error, have a question or think something could be improved, please open an issue or create a PR.
License
Dual-licensed under Apache 2.0 or MIT.