Trait ChunkPuller

Source
pub trait ChunkPuller {
    type ChunkItem;
    type Chunk<'c>: ExactSizeIterator<Item = Self::ChunkItem> + Default
       where Self: 'c;

    // Required methods
    fn chunk_size(&self) -> usize;
    fn pull(&mut self) -> Option<Self::Chunk<'_>>;
    fn pull_with_idx(&mut self) -> Option<(usize, Self::Chunk<'_>)>;

    // Provided methods
    fn flattened<'c>(self) -> FlattenedChunkPuller<'c, Self> 
       where Self: Sized { ... }
    fn flattened_with_idx<'c>(self) -> FlattenedEnumeratedChunkPuller<'c, Self> 
       where Self: Sized { ... }
}
Expand description

A chunk puller which is created from and linked to and pulls its elements from a ConcurrentIter.

It can be created using the chunk_puller method of a concurrent iterator by providing a desired chunk size.

Unlike the ItemPuller, a chunk puller pulls many items at once:

  • Its pull method pulls a chunk from the concurrent iterator, where:
    • the pulled chunk implements ExactSizeIterator,
    • it often has chunk_size elements as long as there are sufficient items; less items will be pulled only when the concurrent iterator runs out of elements,
    • it has at least 1 element, as pull returns None if there are no items left.

Three points are important:

  • Items in each pulled chunk are guaranteed to be sequential in the data source.
  • Pulling elements in chunks rather than one-by-one as by the ItemPuller is an optimization technique which aims to reduce the overhead of updating the atomic state of the concurrent iterator. This optimization is relevant for cases where the work done on the pulled elements are considerably small.
  • Pulling multiple elements or a chunk does not mean the elements are copied and stored elsewhere. It actually means reserving multiple elements at once for the pulling thread.

§Examples

use orx_concurrent_iter::*;

let num_threads = 4;
let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
let con_iter = data.con_iter();

let process = |_x: &String| {};

std::thread::scope(|s| {
    for _ in 0..num_threads {
        s.spawn(|| {
            // concurrently iterate over values in a `while let` loop
            // while pulling (up to) 10 elements every time
            let mut chunk_puller = con_iter.chunk_puller(10);
            while let Some(chunk) = chunk_puller.pull() {
                // chunk is an ExactSizeIterator
                for value in chunk {
                    process(value);
                }
            }
        });
    }
});

The above code conveniently allows for the iteration-by-chunks optimization. However, you might have noticed that now we have a nested while let and for loops. In terms of convenience, we can do better than this without losing any performance.

This can be achieved using the flattened method of the chunk puller (see also flattened_with_idx).

use orx_concurrent_iter::*;

let num_threads = 4;
let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
let con_iter = data.con_iter();

let process = |_x: &String| {};

std::thread::scope(|s| {
    for _ in 0..num_threads {
        s.spawn(|| {
            // concurrently iterate over values in a `for` loop
            // while concurrently pulling (up to) 10 elements every time
            for value in con_iter.chunk_puller(10).flattened() {
                process(value);
            }
        });
    }
});

A bit of magic here, that requires to be explained below.

Notice that this is a very convenient way to concurrently iterate over the elements using a simple for loop. However, it is important to note that, under the hood, this is equivalent to the program in the previous section where we used the pull method of the chunk puller.

The following happens under the hood:

  • We reach the concurrent iterator to pull 10 items at once from the data source. This is the intended performance optimization to reduce the updates of the atomic state.
  • Then, we iterate one-by-one over the pulled 10 items inside the thread as a regular iterator.
  • Once, we complete processing these 10 items, we approach the concurrent iterator again. Provided that there are elements left, we pull another chunk of 10 items.
  • Then, we iterate one-by-one …

See the ItemPuller documentation for the notes on how the pullers bring the convenience of Iterator methods to concurrent programs, which is demonstrated by a 4-line implementation of the parallelized reduce. We can add the iteration-by-chunks optimization on top of this while keeping the implementation as simple and fitting 4-lines due to the fact that flattened chunk puller implements Iterator.

In the following code, the sums are computed by 8 threads while each thread pulls elements in chunks of 64.

use orx_concurrent_iter::*;

fn parallel_reduce<T, F>(
    num_threads: usize,
    chunk: usize,
    con_iter: impl ConcurrentIter<Item = T>,
    reduce: F,
) -> Option<T>
where
    T: Send + Sync,
    F: Fn(T, T) -> T + Send + Sync,
{
    std::thread::scope(|s| {
        (0..num_threads)
            .map(|_| s.spawn(|| con_iter.chunk_puller(chunk).flattened().reduce(&reduce))) // reduce inside each thread
            .filter_map(|x| x.join().unwrap()) // join threads
            .reduce(&reduce) // reduce thread results to final result
    })
}

let sum = parallel_reduce(8, 64, (0..0).into_con_iter(), |a, b| a + b);
assert_eq!(sum, None);

let n = 10_000;
let data: Vec<_> = (0..n).collect();
let sum = parallel_reduce(8, 64, data.con_iter().copied(), |a, b| a + b);
assert_eq!(sum, Some(n * (n - 1) / 2));

Required Associated Types§

Source

type ChunkItem

Type of the element that the concurrent iterator yields.

Source

type Chunk<'c>: ExactSizeIterator<Item = Self::ChunkItem> + Default where Self: 'c

Type of the pulled chunks which implements ExactSizeIterator.

Required Methods§

Source

fn chunk_size(&self) -> usize

Target length of the pulled chunks.

Note that the pulled chunk might have a length of:

  • chunk_size if there are at least chunk_size items in the concurrent iterator when the pull method is called; or
  • n items where 1 <= n < chunk_size items if the concurrent iterator still has items; however, fewer than chunk_size.

Notice that the chunk cannot contain 0 elements; in which case the pull method returns None which signals to complete the iteration. This design choice allows to use the while let Some(chunk) = chunk_puller.pull() { } loops.

Source

fn pull(&mut self) -> Option<Self::Chunk<'_>>

Pulls the next chunk from the connected concurrent iterator.

The pulled chunk has a known length, and hence, implements ExactSizeIterator. It might have a length of:

  • chunk_size if there are at least chunk_size sequential items in the concurrent iterator when the pull method is called; or
  • n items where 1 <= n < chunk_size sequential items if the concurrent iterator still has items; however, fewer than chunk_size.

Notice that the chunk cannot contain 0 elements; in which case the pull method returns None which signals to complete the iteration. This design choice allows to use the while let Some(chunk) = chunk_puller.pull() { } loops.

Source

fn pull_with_idx(&mut self) -> Option<(usize, Self::Chunk<'_>)>

Pulls the next chunk from the connected concurrent iterator together with the index of the first element of the chunk.

Since all items in the pulled chunk are sequential, knowing the index of the first item of the chunk provides the complete information of all indices.

§Examples

Performing an enumerated iteration while using chunks is demonstrated below.

use orx_concurrent_iter::*;

let num_threads = 4;
let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
let con_iter = data.con_iter();

std::thread::scope(|s| {
    for _ in 0..num_threads {
        s.spawn(|| {
            let mut chunk_puller = con_iter.chunk_puller(4);
            while let Some((begin_idx, chunk)) = chunk_puller.pull_with_idx() {
                for (i, value) in chunk.enumerate() { // i is the index within chunk
                    let idx = begin_idx + i; // idx is the index in the input collection
                    assert_eq!(value, &idx.to_string());
                }
            }
        });
    }
});

Provided Methods§

Source

fn flattened<'c>(self) -> FlattenedChunkPuller<'c, Self>
where Self: Sized,

Converts the ChunkPuller into a FlattenedChunkPuller which is still connected to and pulls its elements from the same concurrent iterator; while allowing for:

  • avoiding nested loops:
    • while let loop to pull the chunk, and then
    • iterate over the chunk;
  • bringing Iterator methods to the concurrent programs since FlattenedChunkPuller implements the regular Iterator.
§Examples

See the ItemPuller documentation for the notes on how the pullers bring the convenience of Iterator methods to concurrent programs, which is demonstrated by a 4-line implementation of the parallelized reduce. We can add the iteration-by-chunks optimization on top of this while keeping the implementation as simple and fitting 4-lines due to the fact that flattened chunk puller implements Iterator.

In the following code, the sums are computed by 8 threads while each thread pulls elements in chunks of 64.

use orx_concurrent_iter::*;

fn parallel_reduce<T, F>(
    num_threads: usize,
    chunk: usize,
    con_iter: impl ConcurrentIter<Item = T>,
    reduce: F,
) -> Option<T>
where
    T: Send + Sync,
    F: Fn(T, T) -> T + Send + Sync,
{
    std::thread::scope(|s| {
        (0..num_threads)
            .map(|_| s.spawn(|| con_iter.chunk_puller(chunk).flattened().reduce(&reduce))) // reduce inside each thread
            .filter_map(|x| x.join().unwrap()) // join threads
            .reduce(&reduce) // reduce thread results to final result
    })
}

let sum = parallel_reduce(8, 64, (0..0).into_con_iter(), |a, b| a + b);
assert_eq!(sum, None);

let n = 10_000;
let data: Vec<_> = (0..n).collect();
let sum = parallel_reduce(8, 64, data.con_iter().copied(), |a, b| a + b);
assert_eq!(sum, Some(n * (n - 1) / 2));
Source

fn flattened_with_idx<'c>(self) -> FlattenedEnumeratedChunkPuller<'c, Self>
where Self: Sized,

Converts the ChunkPuller into a FlattenedEnumeratedChunkPuller which is still connected to and pulls its elements from the same concurrent iterator; while allowing for:

  • avoiding nested loops:
    • while let loop to pull the chunk, and then
    • iterate over the chunk;
  • bringing Iterator methods to the concurrent programs since FlattenedEnumeratedChunkPuller implements the regular Iterator.

Similar to flattened except that returned iterator additionally returns the indices of the elements in the concurrent iterator.

§Examples
use orx_concurrent_iter::*;

let num_threads = 4;
let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
let con_iter = data.con_iter();

std::thread::scope(|s| {
    for _ in 0..num_threads {
        s.spawn(|| {
            for (idx, value) in con_iter.chunk_puller(10).flattened_with_idx() {
                assert_eq!(value, &idx.to_string());
            }
        });
    }
});

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§

Source§

impl<'i, T, P> ChunkPuller for ClonedChunkPuller<'i, T, P>
where T: Clone + 'i, P: ChunkPuller<ChunkItem = &'i T>,

Source§

type ChunkItem = T

Source§

type Chunk<'c> = Cloned<<P as ChunkPuller>::Chunk<'c>> where Self: 'c

Source§

impl<'i, T, P> ChunkPuller for CopiedChunkPuller<'i, T, P>
where T: Copy + 'i, P: ChunkPuller<ChunkItem = &'i T>,

Source§

type ChunkItem = T

Source§

type Chunk<'c> = Copied<<P as ChunkPuller>::Chunk<'c>> where Self: 'c

Source§

impl<P> ChunkPuller for EnumeratedChunkPuller<P>
where P: ChunkPuller,

Source§

type ChunkItem = (usize, <P as ChunkPuller>::ChunkItem)

Source§

type Chunk<'c> = EnumeratedChunk<<P as ChunkPuller>::Chunk<'c>> where Self: 'c