hyperi_rustlib/worker/
config.rs1use serde::{Deserialize, Serialize};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct WorkerPoolConfig {
19 #[serde(default = "default_min_threads")]
21 pub min_threads: usize,
22
23 #[serde(default)]
25 pub max_threads: usize,
26
27 #[serde(default = "default_grow_below")]
29 pub grow_below: f64,
30
31 #[serde(default = "default_shrink_above")]
33 pub shrink_above: f64,
34
35 #[serde(default = "default_emergency_above")]
37 pub emergency_above: f64,
38
39 #[serde(default = "default_memory_pressure_cap")]
41 pub memory_pressure_cap: f64,
42
43 #[serde(default = "default_scale_interval_secs")]
45 pub scale_interval_secs: u64,
46
47 #[serde(default = "default_async_concurrency")]
49 pub async_concurrency: usize,
50
51 #[serde(default = "default_health_saturation_timeout_secs")]
53 pub health_saturation_timeout_secs: u64,
54}
55
56fn default_min_threads() -> usize {
57 2
58}
59fn default_grow_below() -> f64 {
60 0.60
61}
62fn default_shrink_above() -> f64 {
63 0.85
64}
65fn default_emergency_above() -> f64 {
66 0.95
67}
68fn default_memory_pressure_cap() -> f64 {
69 0.80
70}
71fn default_scale_interval_secs() -> u64 {
72 5
73}
74fn default_async_concurrency() -> usize {
75 32
76}
77fn default_health_saturation_timeout_secs() -> u64 {
78 30
79}
80
81impl Default for WorkerPoolConfig {
82 fn default() -> Self {
83 Self {
84 min_threads: default_min_threads(),
85 max_threads: 0,
86 grow_below: default_grow_below(),
87 shrink_above: default_shrink_above(),
88 emergency_above: default_emergency_above(),
89 memory_pressure_cap: default_memory_pressure_cap(),
90 scale_interval_secs: default_scale_interval_secs(),
91 async_concurrency: default_async_concurrency(),
92 health_saturation_timeout_secs: default_health_saturation_timeout_secs(),
93 }
94 }
95}
96
97impl WorkerPoolConfig {
98 pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
107 let pool_cfg: Self = if let Some(cfg) = crate::config::try_get() {
108 cfg.unmarshal_key_or_warn(key).unwrap_or_default()
111 } else {
112 tracing::debug!("Config cascade not initialised, using default WorkerPoolConfig");
113 Self::default()
114 };
115 pool_cfg.validate()?;
116 Ok(pool_cfg)
117 }
118
119 pub fn validate(&self) -> Result<(), crate::config::ConfigError> {
125 if self.max_threads != 0 && self.min_threads > self.max_threads {
126 return Err(crate::config::ConfigError::InvalidValue {
127 key: "worker_pool.min_threads".into(),
128 reason: format!(
129 "min_threads ({}) > max_threads ({})",
130 self.min_threads, self.max_threads
131 ),
132 });
133 }
134 if self.grow_below >= self.shrink_above {
135 return Err(crate::config::ConfigError::InvalidValue {
136 key: "worker_pool.grow_below".into(),
137 reason: format!(
138 "grow_below ({}) >= shrink_above ({})",
139 self.grow_below, self.shrink_above
140 ),
141 });
142 }
143 if self.shrink_above >= self.emergency_above {
144 return Err(crate::config::ConfigError::InvalidValue {
145 key: "worker_pool.shrink_above".into(),
146 reason: format!(
147 "shrink_above ({}) >= emergency_above ({})",
148 self.shrink_above, self.emergency_above
149 ),
150 });
151 }
152 if self.async_concurrency == 0 {
154 return Err(crate::config::ConfigError::InvalidValue {
155 key: "worker_pool.async_concurrency".into(),
156 reason: "must be >= 1 (fan_out_async iterates via step_by)".into(),
157 });
158 }
159 if self.min_threads == 0 {
164 return Err(crate::config::ConfigError::InvalidValue {
165 key: "worker_pool.min_threads".into(),
166 reason: "must be >= 1 (zero permits busy-spin the pool)".into(),
167 });
168 }
169 Ok(())
170 }
171
172 pub fn resolve_max_threads(&mut self) {
178 let available = std::thread::available_parallelism().map_or(4, std::num::NonZero::get);
179
180 if self.max_threads == 0 {
181 self.max_threads = available;
182 } else {
183 self.max_threads = self.max_threads.min(available);
184 }
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191
192 #[test]
196 fn validate_rejects_zero_async_concurrency() {
197 let cfg = WorkerPoolConfig {
198 async_concurrency: 0,
199 ..Default::default()
200 };
201 let err = cfg.validate().unwrap_err();
202 assert!(matches!(
203 err,
204 crate::config::ConfigError::InvalidValue { .. }
205 ));
206 }
207
208 #[test]
209 fn validate_accepts_one_async_concurrency() {
210 let cfg = WorkerPoolConfig {
211 async_concurrency: 1,
212 ..Default::default()
213 };
214 assert!(cfg.validate().is_ok());
215 }
216
217 #[test]
220 fn validate_rejects_zero_min_threads() {
221 let cfg = WorkerPoolConfig {
222 min_threads: 0,
223 ..Default::default()
224 };
225 let err = cfg.validate().unwrap_err();
226 assert!(matches!(
227 err,
228 crate::config::ConfigError::InvalidValue { .. }
229 ));
230 }
231}