Expand description
Limit the concurrency of an individual rayon parallel iterator method with a convenient macro.
§Example
This example demonstrates applying a concurrency-limited map to an iterator with the iter_concurrent_limit macro.
map is one of many supported methods of the macro.
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon_iter_concurrent_limit::iter_concurrent_limit;
const N: usize = 1000;
let op = |i: usize| -> usize {
let alloc = vec![i; N]; // max 2 concurrent allocations in this example
alloc.into_par_iter().sum::<usize>() // runs on all threads
};
let sum_iter = iter_concurrent_limit!(2, (0..100), map, op);
let output = sum_iter
.map(|alloc_sum| -> usize {
alloc_sum / N // runs on all threads
})
.collect::<Vec<usize>>();
assert_eq!(output, (0..100).into_iter().collect::<Vec<usize>>());The equivalent sum_iter expression using iter_subdivide is:
let sum_iter = iter_subdivide(2, (0..100).into_par_iter())
.flat_map_iter(|chunk| chunk.into_iter().map(op));The equivalent expression without using functionality in this crate is:
let sum_iter = (0..100)
.into_par_iter()
.chunks((100 + 2 - 1) / 2)
.flat_map_iter(|chunk| chunk.into_iter().map(op));§Motivation
Consider this example:
use rayon::iter::{IntoParallelIterator, ParallelIterator};
let op = |_: usize| {
// operation involving a large allocation
};
(0..100).into_par_iter().for_each(op);In this case, it may be necessary to limit the number of concurrent executions of op due to memory constraints.
The number of threads could be limited with rayon::ThreadPool::install like so:
let thread_pool = rayon::ThreadPoolBuilder::new().num_threads(1).build()?;
thread_pool.install(|| {
(0..100).into_par_iter().for_each(op);
});However, this has some limitations and footguns:
- Any parallel operations within
opwill use the same thread-limited thread pool, unlessinstallis called internally with a different thread pool. - If
installis called internally,opcan yield and multiple instances ofopmay run concurrently on a thread. This is detailed here in theinstalldocumentation. - An iterator must be consumed in the
installscope of aThreadPool, otherwise it will not use that thread pool.
§Solution
This crate provides iter_concurrent_limit, a macro that enables many rayon::iter::ParallelIterator methods to execute their operands with limited concurrency.
The Examples section of iter_concurrent_limit has usage examples for each method supported by the macro.
§Implementation
The macro limits concurrency by calling IndexedParallelIterator::chunks on the parallel iterator (using the iter_subdivide method) to reduce the number of work items for rayon.
Internally, the iter_subdivide method calculates the chunk size as iterator.len().ceiling_div(concurrent_limit).
The function passed to the macro is called sequentially on the items in each chunk, but in parallel over the chunks.
The output of the function is flattened for methods with an iterator output, like map and filter.
§Limitations
- Iterators passed to
iter_concurrent_limitoriter_subdividemust implementstd::iter::IntoIteratorandrayon::iter::IntoParallelIterator, and the created parallel iterator must implementrayon::iter::IndexedParallelIterator. - Only a subset of relevant
ParallelIterator/IndexedParallelIteratormethods are currently supported by theiter_concurrent_limitmacro. - If the operator/predicate passed to
iter_concurrent_limitis a closure, its signature might have to be made explicit
Macros§
- iter_
concurrent_ limit - Apply a method on a
rayon::iter::IndexedParallelIteratorwith a limit on the number of concurrent executions of the function passed to the method.
Functions§
- iter_
subdivide - Subdivide a
rayon::iter::IndexedParallelIteratorintonum_chunkschunks.