1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
pub trait IntoParStreamConfig: Sized {
fn into_par_stream_config(self) -> ParStreamConfig;
fn into_par_stream_params(self) -> ParStreamParams {
self.into_par_stream_config().into()
}
}
impl IntoParStreamConfig for Option<usize> {
fn into_par_stream_config(self) -> ParStreamConfig {
match self {
Some(size) => {
assert!(
size >= 1,
"the number of workers must be equal to greater to 1"
);
ParStreamConfig::Absolute(size)
}
None => ParStreamConfig::Auto,
}
}
}
impl IntoParStreamConfig for usize {
fn into_par_stream_config(self) -> ParStreamConfig {
assert!(
self >= 1,
"the number of workers must be a number greater or equal to 1"
);
ParStreamConfig::Absolute(self)
}
}
impl IntoParStreamConfig for f64 {
fn into_par_stream_config(self) -> ParStreamConfig {
assert!(
self.is_finite() && self >= 0.0,
"the scaling number must be positive"
);
ParStreamConfig::Scale(self)
}
}
impl IntoParStreamConfig for (usize, usize) {
fn into_par_stream_config(self) -> ParStreamConfig {
let (num_workers, buf_size) = self;
assert!(
num_workers >= 1,
"the number of workers must be equal to greater to 1"
);
assert!(
buf_size >= 1,
"the buffer size must be equal to greater to 1"
);
ParStreamConfig::Custom {
num_workers,
buf_size,
}
}
}
#[derive(Debug, Clone)]
pub enum ParStreamConfig {
Auto,
Absolute(usize),
Scale(f64),
Custom { num_workers: usize, buf_size: usize },
}
#[derive(Debug, Clone)]
pub struct ParStreamParams {
pub(crate) num_workers: usize,
pub(crate) buf_size: usize,
}
impl From<ParStreamConfig> for ParStreamParams {
fn from(from: ParStreamConfig) -> Self {
use ParStreamConfig::*;
let (num_workers, buf_size) = match from {
Auto => {
let num_workers = num_cpus::get();
let buf_size = num_workers * 2;
(num_workers, buf_size)
}
Absolute(num_workers) => {
let buf_size = num_workers * 2;
(num_workers, buf_size)
}
Scale(scale) => {
let num_workers = (num_cpus::get() as f64 * scale).ceil() as usize;
let buf_size = num_workers * 2;
(num_workers, buf_size)
}
Custom {
num_workers,
buf_size,
} => (num_workers, buf_size),
};
Self {
num_workers,
buf_size,
}
}
}