Skip to main content

hyperi_rustlib/worker/
config.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/config.rs
3// Purpose:   Configuration for adaptive worker pool
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use serde::{Deserialize, Serialize};
10
11/// Configuration for the adaptive worker pool.
12///
13/// All values are overridable via the 8-layer config cascade
14/// (CLI > ENV > .env > settings.{env}.yaml > settings.yaml > defaults > rustlib > hard-coded).
15///
16/// Every field is also emitted as a gauge metric for Grafana overlay.
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct WorkerPoolConfig {
19    /// Minimum active worker threads (floor for scaling).
20    #[serde(default = "default_min_threads")]
21    pub min_threads: usize,
22
23    /// Maximum worker threads. 0 = auto-detect from cgroup / `available_parallelism`.
24    #[serde(default)]
25    pub max_threads: usize,
26
27    /// CPU utilisation below this threshold triggers thread growth.
28    #[serde(default = "default_grow_below")]
29    pub grow_below: f64,
30
31    /// CPU utilisation above this threshold triggers gentle thread reduction.
32    #[serde(default = "default_shrink_above")]
33    pub shrink_above: f64,
34
35    /// CPU utilisation above this threshold triggers aggressive thread reduction.
36    #[serde(default = "default_emergency_above")]
37    pub emergency_above: f64,
38
39    /// Memory pressure above this threshold hard-caps threads at `min_threads`.
40    #[serde(default = "default_memory_pressure_cap")]
41    pub memory_pressure_cap: f64,
42
43    /// How often to re-evaluate scaling (seconds).
44    #[serde(default = "default_scale_interval_secs")]
45    pub scale_interval_secs: u64,
46
47    /// Maximum concurrent async fan-out tasks.
48    #[serde(default = "default_async_concurrency")]
49    pub async_concurrency: usize,
50
51    /// Seconds the pool must be saturated before reporting unhealthy.
52    #[serde(default = "default_health_saturation_timeout_secs")]
53    pub health_saturation_timeout_secs: u64,
54}
55
56fn default_min_threads() -> usize {
57    2
58}
59fn default_grow_below() -> f64 {
60    0.60
61}
62fn default_shrink_above() -> f64 {
63    0.85
64}
65fn default_emergency_above() -> f64 {
66    0.95
67}
68fn default_memory_pressure_cap() -> f64 {
69    0.80
70}
71fn default_scale_interval_secs() -> u64 {
72    5
73}
74fn default_async_concurrency() -> usize {
75    32
76}
77fn default_health_saturation_timeout_secs() -> u64 {
78    30
79}
80
81impl Default for WorkerPoolConfig {
82    fn default() -> Self {
83        Self {
84            min_threads: default_min_threads(),
85            max_threads: 0,
86            grow_below: default_grow_below(),
87            shrink_above: default_shrink_above(),
88            emergency_above: default_emergency_above(),
89            memory_pressure_cap: default_memory_pressure_cap(),
90            scale_interval_secs: default_scale_interval_secs(),
91            async_concurrency: default_async_concurrency(),
92            health_saturation_timeout_secs: default_health_saturation_timeout_secs(),
93        }
94    }
95}
96
97impl WorkerPoolConfig {
98    /// Load config from the cascade under the given key (e.g. "worker_pool").
99    ///
100    /// Falls back to defaults if the config cascade is not initialised or the
101    /// key is absent. Validates after loading.
102    ///
103    /// # Errors
104    ///
105    /// Returns an error if validation fails (e.g. thresholds out of order).
106    pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
107        let pool_cfg: Self = if let Some(cfg) = crate::config::try_get() {
108            cfg.unmarshal_key(key).unwrap_or_default()
109        } else {
110            tracing::debug!("Config cascade not initialised, using default WorkerPoolConfig");
111            Self::default()
112        };
113        pool_cfg.validate()?;
114        Ok(pool_cfg)
115    }
116
117    /// Validate configuration invariants.
118    ///
119    /// # Errors
120    ///
121    /// Returns an error if thresholds are out of order or min > max.
122    pub fn validate(&self) -> Result<(), crate::config::ConfigError> {
123        if self.max_threads != 0 && self.min_threads > self.max_threads {
124            return Err(crate::config::ConfigError::InvalidValue {
125                key: "worker_pool.min_threads".into(),
126                reason: format!(
127                    "min_threads ({}) > max_threads ({})",
128                    self.min_threads, self.max_threads
129                ),
130            });
131        }
132        if self.grow_below >= self.shrink_above {
133            return Err(crate::config::ConfigError::InvalidValue {
134                key: "worker_pool.grow_below".into(),
135                reason: format!(
136                    "grow_below ({}) >= shrink_above ({})",
137                    self.grow_below, self.shrink_above
138                ),
139            });
140        }
141        if self.shrink_above >= self.emergency_above {
142            return Err(crate::config::ConfigError::InvalidValue {
143                key: "worker_pool.shrink_above".into(),
144                reason: format!(
145                    "shrink_above ({}) >= emergency_above ({})",
146                    self.shrink_above, self.emergency_above
147                ),
148            });
149        }
150        // `fan_out_async` does `step_by(async_concurrency)`; 0 panics.
151        if self.async_concurrency == 0 {
152            return Err(crate::config::ConfigError::InvalidValue {
153                key: "worker_pool.async_concurrency".into(),
154                reason: "must be >= 1 (fan_out_async iterates via step_by)".into(),
155            });
156        }
157        // Codex F11: zero CPU workers leaves the rayon semaphore
158        // spinning in `yield_now()` forever. Reject upfront; the
159        // scaler's clamp uses min_threads as the floor, so this
160        // also guarantees the scaler never drives permits below 1.
161        if self.min_threads == 0 {
162            return Err(crate::config::ConfigError::InvalidValue {
163                key: "worker_pool.min_threads".into(),
164                reason: "must be >= 1 (zero permits busy-spin the pool)".into(),
165            });
166        }
167        Ok(())
168    }
169
170    /// Resolve `max_threads` to the effective CPU count.
171    ///
172    /// - `max_threads = 0` → auto-detect from `available_parallelism` (cgroup-aware)
173    /// - `max_threads > 0` → cap at `min(configured, available_parallelism)`
174    ///   to avoid creating more threads than physical cores
175    pub fn resolve_max_threads(&mut self) {
176        let available = std::thread::available_parallelism().map_or(4, std::num::NonZero::get);
177
178        if self.max_threads == 0 {
179            self.max_threads = available;
180        } else {
181            self.max_threads = self.max_threads.min(available);
182        }
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    /// Codex F9 regression: `async_concurrency: 0` previously
191    /// passed validation and panicked at `step_by(0)` in
192    /// `fan_out_async`.
193    #[test]
194    fn validate_rejects_zero_async_concurrency() {
195        let cfg = WorkerPoolConfig {
196            async_concurrency: 0,
197            ..Default::default()
198        };
199        let err = cfg.validate().unwrap_err();
200        assert!(matches!(
201            err,
202            crate::config::ConfigError::InvalidValue { .. }
203        ));
204    }
205
206    #[test]
207    fn validate_accepts_one_async_concurrency() {
208        let cfg = WorkerPoolConfig {
209            async_concurrency: 1,
210            ..Default::default()
211        };
212        assert!(cfg.validate().is_ok());
213    }
214
215    /// Codex F11 regression: `min_threads: 0` previously passed
216    /// validation and pinned the rayon semaphore in a yield loop.
217    #[test]
218    fn validate_rejects_zero_min_threads() {
219        let cfg = WorkerPoolConfig {
220            min_threads: 0,
221            ..Default::default()
222        };
223        let err = cfg.validate().unwrap_err();
224        assert!(matches!(
225            err,
226            crate::config::ConfigError::InvalidValue { .. }
227        ));
228    }
229}