1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#[cfg(feature = "parallel")]
mod in_parallel;
mod serial;
#[cfg(not(feature = "parallel"))]
pub use serial::*;
#[cfg(feature = "parallel")]
pub use in_parallel::*;
mod eager;
pub use eager::{EagerIter, EagerIterIf};
#[cfg(not(feature = "parallel"))]
pub fn optimize_chunk_size_and_thread_limit(
desired_chunk_size: usize,
_num_items: Option<usize>,
thread_limit: Option<usize>,
_available_threads: Option<usize>,
) -> (usize, Option<usize>, usize) {
return (desired_chunk_size, thread_limit, num_threads(thread_limit));
}
#[cfg(feature = "parallel")]
pub fn optimize_chunk_size_and_thread_limit(
desired_chunk_size: usize,
num_items: Option<usize>,
thread_limit: Option<usize>,
available_threads: Option<usize>,
) -> (usize, Option<usize>, usize) {
let available_threads = available_threads.unwrap_or_else(num_cpus::get);
let available_threads = thread_limit
.map(|l| if l == 0 { available_threads } else { l })
.unwrap_or(available_threads);
let (lower, upper) = (50, 1000);
let (chunk_size, thread_limit) = num_items
.map(|num_items| {
let desired_chunks_per_thread_at_least = 2;
let items = num_items;
let chunk_size = (items / (available_threads * desired_chunks_per_thread_at_least))
.max(1)
.min(upper);
let num_chunks = items / chunk_size;
let thread_limit = if num_chunks <= available_threads {
(num_chunks / desired_chunks_per_thread_at_least).max(1)
} else {
available_threads
};
(chunk_size, thread_limit)
})
.unwrap_or({
let chunk_size = if available_threads == 1 {
desired_chunk_size
} else if desired_chunk_size < lower {
lower
} else {
desired_chunk_size.min(upper)
};
(chunk_size, available_threads)
});
(chunk_size, Some(thread_limit), thread_limit)
}
#[cfg(not(feature = "parallel"))]
pub(crate) fn num_threads(_thread_limit: Option<usize>) -> usize {
return 1;
}
#[cfg(feature = "parallel")]
pub(crate) fn num_threads(thread_limit: Option<usize>) -> usize {
let logical_cores = num_cpus::get();
thread_limit
.map(|l| if l == 0 { logical_cores } else { l })
.unwrap_or(logical_cores)
}
pub trait Reducer {
type Input;
type Output;
type Error;
fn feed(&mut self, item: Self::Input) -> Result<(), Self::Error>;
fn finalize(self) -> Result<Self::Output, Self::Error>;
}
pub fn in_parallel_if<I, S, O, R>(
condition: impl FnOnce() -> bool,
input: impl Iterator<Item = I> + Send,
thread_limit: Option<usize>,
new_thread_state: impl Fn(usize) -> S + Send + Sync,
consume: impl Fn(I, &mut S) -> O + Send + Sync,
reducer: R,
) -> Result<<R as Reducer>::Output, <R as Reducer>::Error>
where
R: Reducer<Input = O>,
I: Send,
O: Send,
{
if num_threads(thread_limit) > 1 && condition() {
in_parallel(input, thread_limit, new_thread_state, consume, reducer)
} else {
serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer)
}
}