par_stream/
config.rs

1use crate::common::*;
2
3/// The default value returned by [get_buf_size_scale()].
4pub const DEFAULT_BUF_SIZE_SCALE: f64 = 2.0;
5
6static BUF_SIZE_SCALE: OnceCell<f64> = OnceCell::new();
7static DEFAULT_NUM_WORKERS: Lazy<usize> = Lazy::new(|| {
8    let value = num_cpus::get();
9    assert!(value > 0);
10    value
11});
12
13/// Sets the global scaling factor for buffer size.
14///
15/// The default buffer size will be determined by `scale * num_cpus`.
16/// The method must be called at most once and before calling of any other
17/// methods in this crate. Otherwise it returns an error with current value.
18///
19/// # Panics
20/// The `scale` must be positive and finite.
21pub fn set_buf_size_scale(scale: f64) -> Result<(), f64> {
22    assert!(scale.is_finite() && scale > 0.0);
23    BUF_SIZE_SCALE.set(scale)
24}
25
26/// Gets the global scaling factor for buffer size.
27///
28/// If [set_buf_size_scale] was not called before, it returns
29/// [DEFAULT_BUF_SIZE_SCALE]. Otherwise it returns the value accordingly.
30///
31/// Note that calling this function causes future calls to [set_buf_size_scale]
32/// to fail.
33pub fn get_buf_size_scale() -> f64 {
34    *BUF_SIZE_SCALE.get_or_init(|| DEFAULT_BUF_SIZE_SCALE)
35}
36
37fn default_buf_size() -> usize {
38    scale_positive(*DEFAULT_NUM_WORKERS, get_buf_size_scale())
39}
40
41pub(crate) fn scale_positive(value: usize, scale: f64) -> usize {
42    assert!(value > 0);
43    assert!(scale.is_finite() && scale > 0.0);
44    cmp::max((value as f64 * scale).round() as usize, 1)
45}
46
47pub use config_::*;
48mod config_ {
49    use super::*;
50
51    /// The determination strategy for the number of workers and buffer size.
52    #[derive(Debug, Clone, Copy, PartialEq)]
53    pub enum ParParamsConfig {
54        Default,
55        FixedWorkers {
56            num_workers: usize,
57        },
58        ScaleOfCpus {
59            scale: f64,
60        },
61        Manual {
62            num_workers: NumWorkers,
63            buf_size: BufSize,
64        },
65    }
66
67    impl ParParamsConfig {
68        pub fn to_params(&self) -> ParParams {
69            match *self {
70                Self::Default => {
71                    let num_workers = *DEFAULT_NUM_WORKERS;
72                    let buf_size = Some(scale_positive(num_workers, get_buf_size_scale()));
73
74                    ParParams {
75                        num_workers,
76                        buf_size,
77                    }
78                }
79                Self::FixedWorkers { num_workers } => {
80                    let buf_size = Some(scale_positive(num_workers, get_buf_size_scale()));
81
82                    ParParams {
83                        num_workers,
84                        buf_size,
85                    }
86                }
87                Self::ScaleOfCpus { scale } => {
88                    let num_workers = scale_positive(*DEFAULT_NUM_WORKERS, scale);
89                    let buf_size = Some(scale_positive(num_workers, get_buf_size_scale()));
90
91                    ParParams {
92                        num_workers,
93                        buf_size,
94                    }
95                }
96                Self::Manual {
97                    num_workers,
98                    buf_size,
99                } => {
100                    let num_workers = num_workers.get();
101                    let buf_size = buf_size.get();
102
103                    ParParams {
104                        num_workers,
105                        buf_size,
106                    }
107                }
108            }
109        }
110    }
111
112    /// The determination strategy for the number of workers.
113    #[derive(Debug, Clone, Copy, PartialEq)]
114    pub enum NumWorkers {
115        Default,
116        Fixed(usize),
117        ScaleOfCpus(f64),
118    }
119
120    impl NumWorkers {
121        pub fn get(&self) -> usize {
122            match *self {
123                Self::Default => *DEFAULT_NUM_WORKERS,
124                Self::Fixed(val) => val,
125                Self::ScaleOfCpus(scale) => scale_positive(*DEFAULT_NUM_WORKERS, scale),
126            }
127        }
128    }
129
130    impl Default for NumWorkers {
131        fn default() -> Self {
132            Self::Default
133        }
134    }
135
136    impl From<Option<usize>> for NumWorkers {
137        fn from(num_workers: Option<usize>) -> Self {
138            num_workers.map(Self::Fixed).unwrap_or(Self::Default)
139        }
140    }
141
142    impl From<usize> for NumWorkers {
143        fn from(value: usize) -> Self {
144            Self::Fixed(value)
145        }
146    }
147
148    impl From<f64> for NumWorkers {
149        fn from(scale: f64) -> Self {
150            Self::ScaleOfCpus(scale)
151        }
152    }
153
154    /// The buffer size determination strategy.
155    #[derive(Debug, Clone, Copy, PartialEq)]
156    pub enum BufSize {
157        Default,
158        Fixed(usize),
159        ScaleOfCpus(f64),
160        Unbounded,
161    }
162
163    impl BufSize {
164        pub fn get(&self) -> Option<usize> {
165            match *self {
166                Self::Default => default_buf_size().into(),
167                Self::Fixed(val) => val.into(),
168                Self::ScaleOfCpus(scale) => scale_positive(*DEFAULT_NUM_WORKERS, scale).into(),
169                Self::Unbounded => None,
170            }
171        }
172    }
173
174    impl Default for BufSize {
175        fn default() -> Self {
176            Self::Default
177        }
178    }
179
180    impl From<Option<usize>> for BufSize {
181        fn from(value: Option<usize>) -> Self {
182            value.map(Self::Fixed).unwrap_or(Self::Default)
183        }
184    }
185
186    impl From<usize> for BufSize {
187        fn from(value: usize) -> Self {
188            Self::Fixed(value)
189        }
190    }
191
192    impl From<f64> for BufSize {
193        fn from(scale: f64) -> Self {
194            Self::ScaleOfCpus(scale)
195        }
196    }
197}
198
199pub use params::*;
200mod params {
201    use super::*;
202
203    /// The parameters including `num_workers` and `buf_size`.
204    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
205    pub struct ParParams {
206        pub num_workers: usize,
207        pub buf_size: Option<usize>,
208    }
209
210    impl Default for ParParams {
211        fn default() -> Self {
212            ParParamsConfig::Default.to_params()
213        }
214    }
215
216    impl From<Option<ParParamsConfig>> for ParParams {
217        fn from(config: Option<ParParamsConfig>) -> Self {
218            config.map(|config| config.to_params()).unwrap_or_default()
219        }
220    }
221
222    impl From<ParParamsConfig> for ParParams {
223        fn from(config: ParParamsConfig) -> Self {
224            config.to_params()
225        }
226    }
227
228    impl From<usize> for ParParams {
229        fn from(num_workers: usize) -> Self {
230            ParParamsConfig::FixedWorkers { num_workers }.to_params()
231        }
232    }
233
234    impl From<f64> for ParParams {
235        fn from(scale: f64) -> Self {
236            ParParamsConfig::ScaleOfCpus { scale }.to_params()
237        }
238    }
239}