Expand description
Limit the concurrency of an individual rayon parallel iterator method with a convenient macro.
§Example
This example demonstrates applying a concurrency-limited map
to an iterator with the iter_concurrent_limit
macro.
map
is one of many supported methods of the macro.
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon_iter_concurrent_limit::iter_concurrent_limit;
const N: usize = 1000;
let op = |i: usize| -> usize {
let alloc = vec![i; N]; // max 2 concurrent allocations in this example
alloc.into_par_iter().sum::<usize>() // runs on all threads
};
let sum_iter = iter_concurrent_limit!(2, (0..100), map, op);
let output = sum_iter
.map(|alloc_sum| -> usize {
alloc_sum / N // runs on all threads
})
.collect::<Vec<usize>>();
assert_eq!(output, (0..100).into_iter().collect::<Vec<usize>>());
The equivalent sum_iter
expression using iter_subdivide
is:
let sum_iter = iter_subdivide(2, (0..100).into_par_iter())
.flat_map_iter(|chunk| chunk.into_iter().map(op));
The equivalent expression without using functionality in this crate is:
let sum_iter = (0..100)
.into_par_iter()
.chunks((100 + 2 - 1) / 2)
.flat_map_iter(|chunk| chunk.into_iter().map(op));
§Motivation
Consider this example:
use rayon::iter::{IntoParallelIterator, ParallelIterator};
let op = |_: usize| {
// operation involving a large allocation
};
(0..100).into_par_iter().for_each(op);
In this case, it may be necessary to limit the number of concurrent executions of op
due to memory constraints.
The number of threads could be limited with rayon::ThreadPool::install
like so:
let thread_pool = rayon::ThreadPoolBuilder::new().num_threads(1).build()?;
thread_pool.install(|| {
(0..100).into_par_iter().for_each(op);
});
However, this has some limitations and footguns:
- Any parallel operations within
op
will use the same thread-limited thread pool, unlessinstall
is called internally with a different thread pool. - If
install
is called internally,op
can yield and multiple instances ofop
may run concurrently on a thread. This is detailed here in theinstall
documentation. - An iterator must be consumed in the
install
scope of aThreadPool
, otherwise it will not use that thread pool.
§Solution
This crate provides iter_concurrent_limit
, a macro that enables many rayon::iter::ParallelIterator
methods to execute their operands with limited concurrency.
The Examples section of iter_concurrent_limit
has usage examples for each method supported by the macro.
§Implementation
The macro limits concurrency by calling IndexedParallelIterator::chunks
on the parallel iterator (using the iter_subdivide
method) to reduce the number of work items for rayon
.
Internally, the iter_subdivide
method calculates the chunk size as iterator.len().ceiling_div(concurrent_limit)
.
The function passed to the macro is called sequentially on the items in each chunk, but in parallel over the chunks.
The output of the function is flattened for methods with an iterator output, like map
and filter
.
§Limitations
- Iterators passed to
iter_concurrent_limit
oriter_subdivide
must implementstd::iter::IntoIterator
andrayon::iter::IntoParallelIterator
, and the created parallel iterator must implementrayon::iter::IndexedParallelIterator
. - Only a subset of relevant
ParallelIterator
/IndexedParallelIterator
methods are currently supported by theiter_concurrent_limit
macro. - If the operator/predicate passed to
iter_concurrent_limit
is a closure, its signature might have to be made explicit
Macros§
- Apply a method on a
rayon::iter::IndexedParallelIterator
with a limit on the number of concurrent executions of the function passed to the method.
Functions§
- Subdivide a
rayon::iter::IndexedParallelIterator
intonum_chunks
chunks.