pub struct ConcurrentRecursiveIter<T, E, I, P = DefaultConPinnedVec<T>>where
T: Send,
E: Fn(&T) -> I + Sync,
I: IntoIterator<Item = T>,
I::IntoIter: ExactSizeIterator,
P: ConcurrentPinnedVec<T>,
<P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,{ /* private fields */ }Expand description
A recursive ConcurrentIter which:
- naturally shrinks as we iterate,
- but can also grow as it allows to add new items to the iterator, during iteration.
Growth of the iterator is expressed by the extend: E function with the signature Fn(&T) -> I,
where I: IntoIterator<Item = T> with a known length.
In other words, for each element e pulled from the iterator, we call extend(&e) before
returning it to the caller. All elements included in the iterator that extend returned
are added to the end of the concurrent iterator, to be pulled later on.
The recursive concurrent iterator internally uses a ConcurrentQueue which allows for both
concurrent push / extend and pop / pull operations.
§Example
The following example demonstrates a use case for the recursive concurrent iterator. Notice that the iterator is instantiated with:
- a single element which is the root node,
- and the extend method which defines how to extend the iterator from each node.
Including the root, there exist 177 nodes in the tree. We observe that all these nodes are concurrently added to the iterator, popped and processed.
use orx_concurrent_recursive_iter::ConcurrentRecursiveIter;
use orx_concurrent_iter::ConcurrentIter;
use std::sync::atomic::{AtomicUsize, Ordering};
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha8Rng;
struct Node {
value: u64,
children: Vec<Node>,
}
impl Node {
fn new(rng: &mut impl Rng, value: u64) -> Self {
let num_children = match value {
0 => 0,
n => rng.random_range(0..(n as usize)),
};
let children = (0..num_children)
.map(|i| Self::new(rng, i as u64))
.collect();
Self { value, children }
}
}
fn process(node_value: u64) {
// fake computation
std::thread::sleep(std::time::Duration::from_millis(node_value));
}
// this defines how the iterator must extend:
// each node drawn from the iterator adds its children to the end of the iterator
fn extend<'a, 'b>(node: &'a &'b Node) -> &'b [Node] {
&node.children
}
// initiate iter with a single element, `root`
// however, the iterator will `extend` on the fly as we keep drawing its elements
let root = Node::new(&mut ChaCha8Rng::seed_from_u64(42), 70);
let iter = ConcurrentRecursiveIter::new(extend, [&root]);
let num_threads = 8;
let num_spawned = AtomicUsize::new(0);
let num_processed_nodes = AtomicUsize::new(0);
std::thread::scope(|s| {
let mut handles = vec![];
for _ in 0..num_threads {
handles.push(s.spawn(|| {
// allow all threads to be spawned
_ = num_spawned.fetch_add(1, Ordering::Relaxed);
while num_spawned.load(Ordering::Relaxed) < num_threads {}
// `next` will first extend `iter` with children of `node,
// and only then yield the `node`
while let Some(node) = iter.next() {
process(node.value);
_ = num_processed_nodes.fetch_add(1, Ordering::Relaxed);
}
}));
}
});
assert_eq!(num_processed_nodes.into_inner(), 177);Implementations§
Source§impl<T, E, I> ConcurrentRecursiveIter<T, E, I, DefaultConPinnedVec<T>>
impl<T, E, I> ConcurrentRecursiveIter<T, E, I, DefaultConPinnedVec<T>>
Sourcepub fn new(extend: E, initial_elements: impl IntoIterator<Item = T>) -> Self
pub fn new(extend: E, initial_elements: impl IntoIterator<Item = T>) -> Self
Creates a new dynamic concurrent iterator:
- The iterator will initially contain
initial_elements. - Before yielding each element, say
e, to the caller, the elements returned byextend(&e)will be added to the concurrent iterator, to be yield later.
This constructor uses a ConcurrentQueue with the default pinned concurrent
collection under the hood. In order to crate the iterator using a different queue
use the From/Into traits, as demonstrated below.
§Examples
The following is a simple example to demonstrate how the dynamic iterator works.
use orx_concurrent_recursive_iter::ConcurrentRecursiveIter;
use orx_concurrent_iter::ConcurrentIter;
let extend = |x: &usize| (*x < 5).then_some(x + 1);
let initial_elements = [1];
let iter = ConcurrentRecursiveIter::new(extend, initial_elements);
let all: Vec<_> = iter.item_puller().collect();
assert_eq!(all, [1, 2, 3, 4, 5]);use orx_concurrent_recursive_iter::ConcurrentRecursiveIter;
use orx_concurrent_iter::ConcurrentIter;
let extend = |x: &usize| (*x < 5).then_some(x + 1);
let initial_elements = [1];
let iter = ConcurrentRecursiveIter::new(extend, initial_elements);
let all: Vec<_> = iter.item_puller().collect();
assert_eq!(all, [1, 2, 3, 4, 5]);§Examples - From
In the above example, the underlying pinned vector of the dynamic iterator created
with new is a SplitVec with a Doubling growth strategy.
Alternatively, we can use a SplitVec with a Linear growth strategy, or a
pre-allocated FixedVec as the underlying storage. In order to do so, we can
use the From trait.
Trait Implementations§
Source§impl<T, E, I, P> ConcurrentIter for ConcurrentRecursiveIter<T, E, I, P>where
T: Send,
E: Fn(&T) -> I + Sync,
I: IntoIterator<Item = T>,
I::IntoIter: ExactSizeIterator,
P: ConcurrentPinnedVec<T>,
<P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
impl<T, E, I, P> ConcurrentIter for ConcurrentRecursiveIter<T, E, I, P>where
T: Send,
E: Fn(&T) -> I + Sync,
I: IntoIterator<Item = T>,
I::IntoIter: ExactSizeIterator,
P: ConcurrentPinnedVec<T>,
<P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
Source§type SequentialIter = DynSeqQueue<T, P, E, I>
type SequentialIter = DynSeqQueue<T, P, E, I>
into_seq_iter method.Source§type ChunkPuller<'i> = DynChunkPuller<'i, T, E, I, P>
where
Self: 'i
type ChunkPuller<'i> = DynChunkPuller<'i, T, E, I, P> where Self: 'i
chunk_puller method.Source§fn into_seq_iter(self) -> Self::SequentialIter
fn into_seq_iter(self) -> Self::SequentialIter
Iterator, and hence,
does not have any overhead related with atomic states. Therefore, it is
useful where the program decides to iterate over a single thread rather
than concurrently by multiple threads. Read moreSource§fn skip_to_end(&self)
fn skip_to_end(&self)
Source§fn next(&self) -> Option<Self::Item>
fn next(&self) -> Option<Self::Item>
Source§fn next_with_idx(&self) -> Option<(usize, Self::Item)>
fn next_with_idx(&self) -> Option<(usize, Self::Item)>
Source§fn size_hint(&self) -> (usize, Option<usize>)
fn size_hint(&self) -> (usize, Option<usize>)
Source§fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_>
fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_>
ChunkPuller from the concurrent iterator.
The created chunk puller can be used to pull chunk_size elements at once from the
data source, rather than pulling one by one. Read moreSource§fn try_get_len(&self) -> Option<usize>
fn try_get_len(&self) -> Option<usize>
Some(x) if the number of remaining items is known with certainly and if it
is equal to x. Read moreSource§fn item_puller(&self) -> ItemPuller<'_, Self> ⓘwhere
Self: Sized,
fn item_puller(&self) -> ItemPuller<'_, Self> ⓘwhere
Self: Sized,
ItemPuller from the concurrent iterator.
The created item puller can be used to pull elements one by one from the
data source. Read moreSource§fn item_puller_with_idx(&self) -> EnumeratedItemPuller<'_, Self> ⓘwhere
Self: Sized,
fn item_puller_with_idx(&self) -> EnumeratedItemPuller<'_, Self> ⓘwhere
Self: Sized,
EnumeratedItemPuller from the concurrent iterator.
The created item puller can be used to pull elements one by one from the
data source together with the index of the elements. Read moreSource§fn copied<'a, T>(self) -> ConIterCopied<'a, Self, T>
fn copied<'a, T>(self) -> ConIterCopied<'a, Self, T>
Source§fn cloned<'a, T>(self) -> ConIterCloned<'a, Self, T>
fn cloned<'a, T>(self) -> ConIterCloned<'a, Self, T>
Source§fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
Source§fn chain_inexact<C>(
self,
other: C,
) -> ChainUnknownLenI<Self, <C as IntoConcurrentIter>::IntoIter>
fn chain_inexact<C>( self, other: C, ) -> ChainUnknownLenI<Self, <C as IntoConcurrentIter>::IntoIter>
other concurrent iterators. Read moreSource§impl<T, E, I, P> From<(E, ConcurrentQueue<T, P>)> for ConcurrentRecursiveIter<T, E, I, P>where
T: Send,
E: Fn(&T) -> I + Sync,
I: IntoIterator<Item = T>,
I::IntoIter: ExactSizeIterator,
P: ConcurrentPinnedVec<T>,
<P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
impl<T, E, I, P> From<(E, ConcurrentQueue<T, P>)> for ConcurrentRecursiveIter<T, E, I, P>where
T: Send,
E: Fn(&T) -> I + Sync,
I: IntoIterator<Item = T>,
I::IntoIter: ExactSizeIterator,
P: ConcurrentPinnedVec<T>,
<P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
Source§fn from((extend, queue): (E, ConcurrentQueue<T, P>)) -> Self
fn from((extend, queue): (E, ConcurrentQueue<T, P>)) -> Self
Auto Trait Implementations§
impl<T, E, I, P = <SplitVec<T> as IntoConcurrentPinnedVec<T>>::ConPinnedVec> !Freeze for ConcurrentRecursiveIter<T, E, I, P>
impl<T, E, I, P> RefUnwindSafe for ConcurrentRecursiveIter<T, E, I, P>
impl<T, E, I, P> Send for ConcurrentRecursiveIter<T, E, I, P>
impl<T, E, I, P> Sync for ConcurrentRecursiveIter<T, E, I, P>
impl<T, E, I, P> Unpin for ConcurrentRecursiveIter<T, E, I, P>
impl<T, E, I, P> UnwindSafe for ConcurrentRecursiveIter<T, E, I, P>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<I> IntoConcurrentIter for Iwhere
I: ConcurrentIter,
impl<I> IntoConcurrentIter for Iwhere
I: ConcurrentIter,
Source§type Item = <I as ConcurrentIter>::Item
type Item = <I as ConcurrentIter>::Item
Source§fn into_con_iter(self) -> <I as IntoConcurrentIter>::IntoIter
fn into_con_iter(self) -> <I as IntoConcurrentIter>::IntoIter
ConcurrentIter,
using its into_con_iter method. Read more