hyperi_rustlib/worker/
config.rs1use serde::{Deserialize, Serialize};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct WorkerPoolConfig {
19 #[serde(default = "default_min_threads")]
21 pub min_threads: usize,
22
23 #[serde(default)]
25 pub max_threads: usize,
26
27 #[serde(default = "default_grow_below")]
29 pub grow_below: f64,
30
31 #[serde(default = "default_shrink_above")]
33 pub shrink_above: f64,
34
35 #[serde(default = "default_emergency_above")]
37 pub emergency_above: f64,
38
39 #[serde(default = "default_memory_pressure_cap")]
41 pub memory_pressure_cap: f64,
42
43 #[serde(default = "default_scale_interval_secs")]
45 pub scale_interval_secs: u64,
46
47 #[serde(default = "default_async_concurrency")]
49 pub async_concurrency: usize,
50
51 #[serde(default = "default_health_saturation_timeout_secs")]
53 pub health_saturation_timeout_secs: u64,
54}
55
56fn default_min_threads() -> usize {
57 2
58}
59fn default_grow_below() -> f64 {
60 0.60
61}
62fn default_shrink_above() -> f64 {
63 0.85
64}
65fn default_emergency_above() -> f64 {
66 0.95
67}
68fn default_memory_pressure_cap() -> f64 {
69 0.80
70}
71fn default_scale_interval_secs() -> u64 {
72 5
73}
74fn default_async_concurrency() -> usize {
75 32
76}
77fn default_health_saturation_timeout_secs() -> u64 {
78 30
79}
80
81impl Default for WorkerPoolConfig {
82 fn default() -> Self {
83 Self {
84 min_threads: default_min_threads(),
85 max_threads: 0,
86 grow_below: default_grow_below(),
87 shrink_above: default_shrink_above(),
88 emergency_above: default_emergency_above(),
89 memory_pressure_cap: default_memory_pressure_cap(),
90 scale_interval_secs: default_scale_interval_secs(),
91 async_concurrency: default_async_concurrency(),
92 health_saturation_timeout_secs: default_health_saturation_timeout_secs(),
93 }
94 }
95}
96
97impl WorkerPoolConfig {
98 pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
107 let pool_cfg: Self = if let Some(cfg) = crate::config::try_get() {
108 cfg.unmarshal_key(key).unwrap_or_default()
109 } else {
110 tracing::debug!("Config cascade not initialised, using default WorkerPoolConfig");
111 Self::default()
112 };
113 pool_cfg.validate()?;
114 Ok(pool_cfg)
115 }
116
117 pub fn validate(&self) -> Result<(), crate::config::ConfigError> {
123 if self.max_threads != 0 && self.min_threads > self.max_threads {
124 return Err(crate::config::ConfigError::InvalidValue {
125 key: "worker_pool.min_threads".into(),
126 reason: format!(
127 "min_threads ({}) > max_threads ({})",
128 self.min_threads, self.max_threads
129 ),
130 });
131 }
132 if self.grow_below >= self.shrink_above {
133 return Err(crate::config::ConfigError::InvalidValue {
134 key: "worker_pool.grow_below".into(),
135 reason: format!(
136 "grow_below ({}) >= shrink_above ({})",
137 self.grow_below, self.shrink_above
138 ),
139 });
140 }
141 if self.shrink_above >= self.emergency_above {
142 return Err(crate::config::ConfigError::InvalidValue {
143 key: "worker_pool.shrink_above".into(),
144 reason: format!(
145 "shrink_above ({}) >= emergency_above ({})",
146 self.shrink_above, self.emergency_above
147 ),
148 });
149 }
150 if self.async_concurrency == 0 {
152 return Err(crate::config::ConfigError::InvalidValue {
153 key: "worker_pool.async_concurrency".into(),
154 reason: "must be >= 1 (fan_out_async iterates via step_by)".into(),
155 });
156 }
157 if self.min_threads == 0 {
162 return Err(crate::config::ConfigError::InvalidValue {
163 key: "worker_pool.min_threads".into(),
164 reason: "must be >= 1 (zero permits busy-spin the pool)".into(),
165 });
166 }
167 Ok(())
168 }
169
170 pub fn resolve_max_threads(&mut self) {
176 let available = std::thread::available_parallelism().map_or(4, std::num::NonZero::get);
177
178 if self.max_threads == 0 {
179 self.max_threads = available;
180 } else {
181 self.max_threads = self.max_threads.min(available);
182 }
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 #[test]
194 fn validate_rejects_zero_async_concurrency() {
195 let cfg = WorkerPoolConfig {
196 async_concurrency: 0,
197 ..Default::default()
198 };
199 let err = cfg.validate().unwrap_err();
200 assert!(matches!(
201 err,
202 crate::config::ConfigError::InvalidValue { .. }
203 ));
204 }
205
206 #[test]
207 fn validate_accepts_one_async_concurrency() {
208 let cfg = WorkerPoolConfig {
209 async_concurrency: 1,
210 ..Default::default()
211 };
212 assert!(cfg.validate().is_ok());
213 }
214
215 #[test]
218 fn validate_rejects_zero_min_threads() {
219 let cfg = WorkerPoolConfig {
220 min_threads: 0,
221 ..Default::default()
222 };
223 let err = cfg.validate().unwrap_err();
224 assert!(matches!(
225 err,
226 crate::config::ConfigError::InvalidValue { .. }
227 ));
228 }
229}