git_features/parallel/
mod.rs

1//! Run computations in parallel, or not based the `parallel` feature toggle.
2//!
3//! ### in_parallel(…)
4//!
5//! The [`in_parallel(…)`][in_parallel()] is the typical fan-out-fan-in mode of parallelism, with thread local storage
6//! made available to a `consume(…)` function to process input. The result is sent to the [`Reduce`] running in the calling
7//! thread to aggregate the results into a single output, which is returned by [`in_parallel()`].
8//!
9//! Interruptions can be achieved by letting the reducers [`feed(…)`][Reduce::feed()]` method fail.
10//!
11//! It gets a boost in usability as it allows threads to borrow variables from the stack, most commonly the repository itself
12//! or the data to work on.
13//!
14//! This mode of operation doesn't lend itself perfectly to being wrapped for `async` as it appears like a single long-running
15//! operation which runs as fast as possible, which is cancellable only by merit of stopping the input or stopping the output
16//! aggregation.
17//!
18//! ### `reduce::Stepwise`
19//!
20//! The [`Stepwise`][reduce::Stepwise] iterator works exactly as [`in_parallel()`] except that the processing of the output produced by
21//! `consume(I, &mut State) -> O` is made accessible by the `Iterator` trait's `next()` method. As produced work is not
22//! buffered, the owner of the iterator controls the progress made.
23//!
24//! Getting the final output of the [`Reduce`] is achieved through the consuming [`Stepwise::finalize()`][reduce::Stepwise::finalize()] method, which
25//! is functionally equivalent to calling [`in_parallel()`].
26//!
27//! In an `async` context this means that progress is only made each time `next()` is called on the iterator, while merely dropping
28//! the iterator will wind down the computation without any result.
29//!
30//! #### Maintaining Safety
31//!
32//! In order to assure that threads don't outlive the data they borrow because their handles are leaked, we enforce
33//! the `'static` lifetime for its inputs, making it less intuitive to use. It is, however, possible to produce
34//! suitable input iterators as long as they can hold something on the heap.
35#[cfg(feature = "parallel")]
36mod in_parallel;
37#[cfg(feature = "parallel")]
38pub use in_parallel::{in_parallel, in_parallel_with_slice, join, threads};
39
40mod serial;
41#[cfg(not(feature = "parallel"))]
42pub use serial::{in_parallel, in_parallel_with_slice, join, threads};
43
44mod in_order;
45pub use in_order::{InOrderIter, SequenceId};
46
47mod eager_iter;
48pub use eager_iter::{EagerIter, EagerIterIf};
49
50/// A no-op returning the input _(`desired_chunk_size`, `Some(thread_limit)`, `thread_limit)_ used
51/// when the `parallel` feature toggle is not set.
52#[cfg(not(feature = "parallel"))]
53pub fn optimize_chunk_size_and_thread_limit(
54    desired_chunk_size: usize,
55    _num_items: Option<usize>,
56    thread_limit: Option<usize>,
57    _available_threads: Option<usize>,
58) -> (usize, Option<usize>, usize) {
59    (desired_chunk_size, thread_limit, num_threads(thread_limit))
60}
61
62/// Return the 'optimal' _(`size of chunks`,  `amount of threads as Option`, `amount of threads`)_ to use in [`in_parallel()`] for the given
63/// `desired_chunk_size`, `num_items`, `thread_limit` and `available_threads`.
64///
65/// * `desired_chunk_size` is the amount of items per chunk you think should be used.
66/// * `num_items` is the total amount of items in the iteration, if `Some`.
67///    Otherwise this knowledge will not affect the output of this function.
68/// * `thread_limit` is the amount of threads to use at most, if `Some`.
69///    Otherwise this knowledge will not affect the output of this function.
70/// * `available_threads` is the total amount of threads available, if `Some`.
71///    Otherwise the actual amount of available threads is determined by querying the system.
72///
73/// `Note` that this implementation is available only if the `parallel` feature toggle is set.
74#[cfg(feature = "parallel")]
75pub fn optimize_chunk_size_and_thread_limit(
76    desired_chunk_size: usize,
77    num_items: Option<usize>,
78    thread_limit: Option<usize>,
79    available_threads: Option<usize>,
80) -> (usize, Option<usize>, usize) {
81    let available_threads = available_threads.unwrap_or_else(num_cpus::get);
82    let available_threads = thread_limit
83        .map(|l| if l == 0 { available_threads } else { l })
84        .unwrap_or(available_threads);
85
86    let (lower, upper) = (50, 1000);
87    let (chunk_size, thread_limit) = num_items
88        .map(|num_items| {
89            let desired_chunks_per_thread_at_least = 2;
90            let items = num_items;
91            let chunk_size = (items / (available_threads * desired_chunks_per_thread_at_least)).clamp(1, upper);
92            let num_chunks = items / chunk_size;
93            let thread_limit = if num_chunks <= available_threads {
94                (num_chunks / desired_chunks_per_thread_at_least).max(1)
95            } else {
96                available_threads
97            };
98            (chunk_size, thread_limit)
99        })
100        .unwrap_or({
101            let chunk_size = if available_threads == 1 {
102                desired_chunk_size
103            } else if desired_chunk_size < lower {
104                lower
105            } else {
106                desired_chunk_size.min(upper)
107            };
108            (chunk_size, available_threads)
109        });
110    (chunk_size, Some(thread_limit), thread_limit)
111}
112
113/// Always returns 1, available when the `parallel` feature toggle is unset.
114#[cfg(not(feature = "parallel"))]
115pub fn num_threads(_thread_limit: Option<usize>) -> usize {
116    1
117}
118
119/// Returns the amount of threads the system can effectively use as the amount of its logical cores.
120///
121/// Only available with the `parallel` feature toggle set.
122#[cfg(feature = "parallel")]
123pub fn num_threads(thread_limit: Option<usize>) -> usize {
124    let logical_cores = num_cpus::get();
125    thread_limit
126        .map(|l| if l == 0 { logical_cores } else { l })
127        .unwrap_or(logical_cores)
128}
129
130/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
131///
132/// For parameters, see the documentation of [`in_parallel()`]
133#[cfg(feature = "parallel")]
134pub fn in_parallel_if<I, S, O, R>(
135    condition: impl FnOnce() -> bool,
136    input: impl Iterator<Item = I> + Send,
137    thread_limit: Option<usize>,
138    new_thread_state: impl Fn(usize) -> S + Send + Clone,
139    consume: impl Fn(I, &mut S) -> O + Send + Clone,
140    reducer: R,
141) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
142where
143    R: Reduce<Input = O>,
144    I: Send,
145    O: Send,
146{
147    if num_threads(thread_limit) > 1 && condition() {
148        in_parallel(input, thread_limit, new_thread_state, consume, reducer)
149    } else {
150        serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer)
151    }
152}
153
154/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
155///
156/// For parameters, see the documentation of [`in_parallel()`]
157///
158/// Note that the non-parallel version is equivalent to [`in_parallel()`].
159#[cfg(not(feature = "parallel"))]
160pub fn in_parallel_if<I, S, O, R>(
161    _condition: impl FnOnce() -> bool,
162    input: impl Iterator<Item = I>,
163    thread_limit: Option<usize>,
164    new_thread_state: impl Fn(usize) -> S,
165    consume: impl Fn(I, &mut S) -> O,
166    reducer: R,
167) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
168where
169    R: Reduce<Input = O>,
170    I: Send,
171    O: Send,
172{
173    serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer)
174}
175
176///
177pub mod reduce;
178pub use reduce::Reduce;