Skip to main content

hyperi_rustlib/worker/
config.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/config.rs
3// Purpose:   Configuration for adaptive worker pool
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use serde::{Deserialize, Serialize};
10
11/// Configuration for the adaptive worker pool.
12///
13/// All values are overridable via the 8-layer config cascade
14/// (CLI > ENV > .env > settings.{env}.yaml > settings.yaml > defaults > rustlib > hard-coded).
15///
16/// Every field is also emitted as a gauge metric for Grafana overlay.
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct WorkerPoolConfig {
19    /// Minimum active worker threads (floor for scaling).
20    #[serde(default = "default_min_threads")]
21    pub min_threads: usize,
22
23    /// Maximum worker threads. 0 = auto-detect from cgroup / `available_parallelism`.
24    #[serde(default)]
25    pub max_threads: usize,
26
27    /// CPU utilisation below this threshold triggers thread growth.
28    #[serde(default = "default_grow_below")]
29    pub grow_below: f64,
30
31    /// CPU utilisation above this threshold triggers gentle thread reduction.
32    #[serde(default = "default_shrink_above")]
33    pub shrink_above: f64,
34
35    /// CPU utilisation above this threshold triggers aggressive thread reduction.
36    #[serde(default = "default_emergency_above")]
37    pub emergency_above: f64,
38
39    /// Memory pressure above this threshold hard-caps threads at `min_threads`.
40    #[serde(default = "default_memory_pressure_cap")]
41    pub memory_pressure_cap: f64,
42
43    /// How often to re-evaluate scaling (seconds).
44    #[serde(default = "default_scale_interval_secs")]
45    pub scale_interval_secs: u64,
46
47    /// Maximum concurrent async fan-out tasks.
48    #[serde(default = "default_async_concurrency")]
49    pub async_concurrency: usize,
50
51    /// Seconds the pool must be saturated before reporting unhealthy.
52    #[serde(default = "default_health_saturation_timeout_secs")]
53    pub health_saturation_timeout_secs: u64,
54}
55
56fn default_min_threads() -> usize {
57    2
58}
59fn default_grow_below() -> f64 {
60    0.60
61}
62fn default_shrink_above() -> f64 {
63    0.85
64}
65fn default_emergency_above() -> f64 {
66    0.95
67}
68fn default_memory_pressure_cap() -> f64 {
69    0.80
70}
71fn default_scale_interval_secs() -> u64 {
72    5
73}
74fn default_async_concurrency() -> usize {
75    32
76}
77fn default_health_saturation_timeout_secs() -> u64 {
78    30
79}
80
81impl Default for WorkerPoolConfig {
82    fn default() -> Self {
83        Self {
84            min_threads: default_min_threads(),
85            max_threads: 0,
86            grow_below: default_grow_below(),
87            shrink_above: default_shrink_above(),
88            emergency_above: default_emergency_above(),
89            memory_pressure_cap: default_memory_pressure_cap(),
90            scale_interval_secs: default_scale_interval_secs(),
91            async_concurrency: default_async_concurrency(),
92            health_saturation_timeout_secs: default_health_saturation_timeout_secs(),
93        }
94    }
95}
96
97impl WorkerPoolConfig {
98    /// Load config from the cascade under the given key (e.g. "worker_pool").
99    ///
100    /// Falls back to defaults if the config cascade is not initialised or the
101    /// key is absent. Validates after loading.
102    ///
103    /// # Errors
104    ///
105    /// Returns an error if validation fails (e.g. thresholds out of order).
106    pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
107        let pool_cfg: Self = if let Some(cfg) = crate::config::try_get() {
108            // `or_warn`: absent key -> default (silent); present-but-malformed
109            // -> WARN + default (was silently swallowed pre-2.8.11).
110            cfg.unmarshal_key_or_warn(key).unwrap_or_default()
111        } else {
112            tracing::debug!("Config cascade not initialised, using default WorkerPoolConfig");
113            Self::default()
114        };
115        pool_cfg.validate()?;
116        Ok(pool_cfg)
117    }
118
119    /// Validate configuration invariants.
120    ///
121    /// # Errors
122    ///
123    /// Returns an error if thresholds are out of order or min > max.
124    pub fn validate(&self) -> Result<(), crate::config::ConfigError> {
125        if self.max_threads != 0 && self.min_threads > self.max_threads {
126            return Err(crate::config::ConfigError::InvalidValue {
127                key: "worker_pool.min_threads".into(),
128                reason: format!(
129                    "min_threads ({}) > max_threads ({})",
130                    self.min_threads, self.max_threads
131                ),
132            });
133        }
134        if self.grow_below >= self.shrink_above {
135            return Err(crate::config::ConfigError::InvalidValue {
136                key: "worker_pool.grow_below".into(),
137                reason: format!(
138                    "grow_below ({}) >= shrink_above ({})",
139                    self.grow_below, self.shrink_above
140                ),
141            });
142        }
143        if self.shrink_above >= self.emergency_above {
144            return Err(crate::config::ConfigError::InvalidValue {
145                key: "worker_pool.shrink_above".into(),
146                reason: format!(
147                    "shrink_above ({}) >= emergency_above ({})",
148                    self.shrink_above, self.emergency_above
149                ),
150            });
151        }
152        // `fan_out_async` does `step_by(async_concurrency)`; 0 panics.
153        if self.async_concurrency == 0 {
154            return Err(crate::config::ConfigError::InvalidValue {
155                key: "worker_pool.async_concurrency".into(),
156                reason: "must be >= 1 (fan_out_async iterates via step_by)".into(),
157            });
158        }
159        // Zero CPU workers leaves the rayon semaphore
160        // spinning in `yield_now()` forever. Reject upfront; the
161        // scaler's clamp uses min_threads as the floor, so this
162        // also guarantees the scaler never drives permits below 1.
163        if self.min_threads == 0 {
164            return Err(crate::config::ConfigError::InvalidValue {
165                key: "worker_pool.min_threads".into(),
166                reason: "must be >= 1 (zero permits busy-spin the pool)".into(),
167            });
168        }
169        Ok(())
170    }
171
172    /// Resolve `max_threads` to the effective CPU count.
173    ///
174    /// - `max_threads = 0` -> auto-detect from `available_parallelism` (cgroup-aware)
175    /// - `max_threads > 0` -> cap at `min(configured, available_parallelism)`
176    ///   to avoid creating more threads than physical cores
177    pub fn resolve_max_threads(&mut self) {
178        let available = std::thread::available_parallelism().map_or(4, std::num::NonZero::get);
179
180        if self.max_threads == 0 {
181            self.max_threads = available;
182        } else {
183            self.max_threads = self.max_threads.min(available);
184        }
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191
192    /// Regression: `async_concurrency: 0` previously
193    /// passed validation and panicked at `step_by(0)` in
194    /// `fan_out_async`.
195    #[test]
196    fn validate_rejects_zero_async_concurrency() {
197        let cfg = WorkerPoolConfig {
198            async_concurrency: 0,
199            ..Default::default()
200        };
201        let err = cfg.validate().unwrap_err();
202        assert!(matches!(
203            err,
204            crate::config::ConfigError::InvalidValue { .. }
205        ));
206    }
207
208    #[test]
209    fn validate_accepts_one_async_concurrency() {
210        let cfg = WorkerPoolConfig {
211            async_concurrency: 1,
212            ..Default::default()
213        };
214        assert!(cfg.validate().is_ok());
215    }
216
217    /// Regression: `min_threads: 0` previously passed
218    /// validation and pinned the rayon semaphore in a yield loop.
219    #[test]
220    fn validate_rejects_zero_min_threads() {
221        let cfg = WorkerPoolConfig {
222            min_threads: 0,
223            ..Default::default()
224        };
225        let err = cfg.validate().unwrap_err();
226        assert!(matches!(
227            err,
228            crate::config::ConfigError::InvalidValue { .. }
229        ));
230    }
231}