Expand description

Limit the concurrency of an individual rayon parallel iterator method with a convenient macro.

§Example

This example demonstrates applying a concurrency-limited map to an iterator with the iter_concurrent_limit macro. map is one of many supported methods of the macro.

use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon_iter_concurrent_limit::iter_concurrent_limit;
const N: usize = 1000;
let op = |i: usize| -> usize {
    let alloc = vec![i; N]; // max 2 concurrent allocations in this example
    alloc.into_par_iter().sum::<usize>() // runs on all threads
};
let sum_iter = iter_concurrent_limit!(2, (0..100), map, op);
let output = sum_iter
    .map(|alloc_sum| -> usize {
        alloc_sum / N // runs on all threads
    })
    .collect::<Vec<usize>>();
assert_eq!(output, (0..100).into_iter().collect::<Vec<usize>>());

The equivalent sum_iter expression using iter_subdivide is:

let sum_iter = iter_subdivide(2, (0..100).into_par_iter())
    .flat_map_iter(|chunk| chunk.into_iter().map(op));

The equivalent expression without using functionality in this crate is:

let sum_iter = (0..100)
    .into_par_iter()
    .chunks((100 + 2 - 1) / 2)
    .flat_map_iter(|chunk| chunk.into_iter().map(op));

§Motivation

Consider this example:

use rayon::iter::{IntoParallelIterator, ParallelIterator};
let op = |_: usize| {
    // operation involving a large allocation
};
(0..100).into_par_iter().for_each(op);

In this case, it may be necessary to limit the number of concurrent executions of op due to memory constraints. The number of threads could be limited with rayon::ThreadPool::install like so:

let thread_pool = rayon::ThreadPoolBuilder::new().num_threads(1).build()?;
thread_pool.install(|| {
    (0..100).into_par_iter().for_each(op);
});

However, this has some limitations and footguns:

  • Any parallel operations within op will use the same thread-limited thread pool, unless install is called internally with a different thread pool.
  • If install is called internally, op can yield and multiple instances of op may run concurrently on a thread. This is detailed here in the install documentation.
  • An iterator must be consumed in the install scope of a ThreadPool, otherwise it will not use that thread pool.

§Solution

This crate provides iter_concurrent_limit, a macro that enables many rayon::iter::ParallelIterator methods to execute their operands with limited concurrency.

The Examples section of iter_concurrent_limit has usage examples for each method supported by the macro.

§Implementation

The macro limits concurrency by calling IndexedParallelIterator::chunks on the parallel iterator (using the iter_subdivide method) to reduce the number of work items for rayon. Internally, the iter_subdivide method calculates the chunk size as iterator.len().ceiling_div(concurrent_limit). The function passed to the macro is called sequentially on the items in each chunk, but in parallel over the chunks. The output of the function is flattened for methods with an iterator output, like map and filter.

§Limitations

Macros§

Functions§