Skip to main content

hyperi_rustlib/worker/
scaler.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/scaler.rs
3// Purpose:   Scaling controller loop, watermark algorithm, CPU sampling
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Threading model and the CPU vs memory asymmetry
10//!
11//! **Foundational assumption: Tokio is HyperI's async + multithreading
12//! substrate.** We do not build our own runtime or our own cgroup-aware
13//! thread-count plumbing -- we lean on the existing wheels:
14//!
15//! - **Sizing** of both the Tokio runtime (`worker_threads`) and the rayon
16//!   [`AdaptiveWorkerPool`] ceiling derives from
17//!   [`std::thread::available_parallelism`], which on Linux is **cgroup-aware**
18//!   (`cpu.max` / `cpuset`, Rust 1.74+). So both pools auto-size to the
19//!   container's CPU budget at startup with no bespoke detection.
20//! - **CPU throttling** is the kernel CFS scheduler's job (`cpu.max`). We do
21//!   not, and need not, replicate it.
22//!
23//! ## Why there is no cgroup-CPU backpressure signal (but there IS for memory)
24//!
25//! CPU and memory are not symmetric, so they are not plumbed symmetrically:
26//!
27//! | Resource | Over-use outcome | Self-correcting? | Dynamic backpressure |
28//! |----------|------------------|------------------|----------------------|
29//! | Memory   | OOM-kill         | No -- fatal      | Yes -- the `MemoryGuard` + cgroup-first pressure signal |
30//! | CPU      | CFS throttle     | Yes -- graceful  | No -- the kernel already handles it |
31//!
32//! Exceeding the CPU quota is graceful and automatic (the scheduler throttles
33//! us); exceeding the memory limit is fatal. So the dynamic pressure signal
34//! that actually matters is **memory**, which is cgroup-aware here. There is
35//! deliberately no `cpu.stat` reader: feeding a bespoke cgroup-CPU signal into
36//! the scaler would prop up a scale-DOWN that the cgroup case does not want
37//! (under a hard quota you want to USE your whole budget; CFS bounds it).
38//!
39//! ## What `ScalingInput::cpu_util` is for
40//!
41//! The host-wide CPU sample (via `sysinfo`) drives scale-DOWN as a
42//! **bare-metal / unlimited-deployment good-neighbour heuristic** -- backing
43//! off when sharing an un-capped node. Under a cgroup `cpu.max` it is largely
44//! redundant with CFS and the cgroup-aware static sizing above. If in-process
45//! scheduler busyness is ever needed as a finer signal, the existing wheel is
46//! `tokio-metrics` (runtime busy-ratio), not a hand-rolled parser.
47
48use std::sync::Arc;
49use std::time::Duration;
50
51use sysinfo::System;
52use tokio_util::sync::CancellationToken;
53
54use super::pool::AdaptiveWorkerPool;
55
56/// Inputs to the watermark scaling algorithm.
57#[derive(Debug, Clone)]
58pub struct ScalingInput {
59    /// Host-wide CPU utilisation (0.0-1.0), a bare-metal good-neighbour
60    /// scale-down heuristic -- NOT a cgroup mechanism. See the module docs
61    /// for the CPU-vs-memory asymmetry and why there is no cgroup-CPU signal.
62    pub cpu_util: f64,
63    pub memory_pressure: f64,
64    pub current: usize,
65    pub min_threads: usize,
66    pub max_threads: usize,
67    pub grow_below: f64,
68    pub shrink_above: f64,
69    pub emergency_above: f64,
70    pub memory_pressure_cap: f64,
71}
72
73/// Result of the watermark scaling algorithm.
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct ScalingDecision {
76    /// Target thread count after applying watermark bands.
77    pub target: usize,
78    /// Direction of change: "up", "down", "emergency_down", "memory_cap", or "steady".
79    pub direction: &'static str,
80}
81
82impl ScalingDecision {
83    /// Evaluate the watermark scaling algorithm.
84    ///
85    /// Given current CPU utilisation, memory pressure, and thresholds, returns
86    /// the target thread count and the direction of the scaling change.
87    #[must_use]
88    pub fn evaluate(input: &ScalingInput) -> Self {
89        // Memory pressure overrides everything -- prevent OOM
90        if input.memory_pressure > input.memory_pressure_cap {
91            return Self {
92                target: input.min_threads,
93                direction: "memory_cap",
94            };
95        }
96
97        let (raw_target, direction) = if input.cpu_util < input.grow_below {
98            (input.current.saturating_add(2), "up")
99        } else if input.cpu_util <= input.shrink_above {
100            (input.current, "steady")
101        } else if input.cpu_util <= input.emergency_above {
102            (input.current.saturating_sub(1), "down")
103        } else {
104            (input.current.saturating_sub(2), "emergency_down")
105        };
106
107        // Clamp to [min, max]
108        let target = raw_target.clamp(input.min_threads, input.max_threads);
109
110        Self { target, direction }
111    }
112}
113
114/// Background scaling controller.
115///
116/// Samples CPU and memory every `scale_interval_secs`, applies the watermark
117/// algorithm, and adjusts the semaphore permits on the worker pool.
118pub(crate) struct ScalingController {
119    pool: Arc<AdaptiveWorkerPool>,
120    system: System,
121}
122
123impl ScalingController {
124    pub fn new(pool: Arc<AdaptiveWorkerPool>) -> Self {
125        Self {
126            pool,
127            system: System::new(),
128        }
129    }
130
131    /// Run the scaling loop until cancelled.
132    pub async fn run(mut self, cancel: CancellationToken) {
133        let interval_secs = {
134            let cfg = self.pool.config.read();
135            cfg.scale_interval_secs
136        };
137
138        let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
139
140        loop {
141            tokio::select! {
142                () = cancel.cancelled() => {
143                    tracing::info!("Worker pool scaling controller shutting down");
144                    break;
145                }
146                _ = interval.tick() => {
147                    self.tick();
148                }
149            }
150        }
151    }
152
153    fn tick(&mut self) {
154        // Sample CPU
155        self.system.refresh_cpu_all();
156        let cpu_util = f64::from(self.system.global_cpu_usage()) / 100.0;
157
158        // Memory signal, container-first. Priority:
159        //   1. Attached MemoryGuard pressure (app-tracked bytes vs the cgroup
160        //      limit) -- the most accurate signal for THIS service.
161        //   2. The cgroup's own current/limit -- what the OOM killer acts on.
162        //   3. Host used/total (sysinfo) -- ONLY as a bare-metal fallback.
163        // Host memory must NOT drive container decisions: on a large shared
164        // host it is unrelated to this container's cgroup limit (it can be
165        // high from other tenants, or mask this container nearing its own cap).
166        #[cfg(feature = "memory")]
167        let guard_pressure = self
168            .pool
169            .memory_guard
170            .lock()
171            .as_ref()
172            .map(|g| g.pressure_ratio());
173        #[cfg(not(feature = "memory"))]
174        let guard_pressure: Option<f64> = None;
175
176        #[cfg(feature = "memory")]
177        let cgroup_pressure = crate::memory::detect_memory_pressure();
178        #[cfg(not(feature = "memory"))]
179        let cgroup_pressure: Option<f64> = None;
180
181        let effective_memory_pressure = guard_pressure.or(cgroup_pressure).unwrap_or_else(|| {
182            self.system.refresh_memory();
183            if self.system.total_memory() > 0 {
184                self.system.used_memory() as f64 / self.system.total_memory() as f64
185            } else {
186                0.0
187            }
188        });
189
190        // Read config (may have been hot-reloaded via Arc<RwLock<>>)
191        let cfg = self.pool.config.read().clone();
192
193        // Control input is the CURRENT TARGET (the ceiling we are adjusting),
194        // not the in-flight count -- the watermark algorithm evolves the
195        // target up/down from where it currently sits.
196        let current_target = self.pool.target_threads();
197
198        let decision = ScalingDecision::evaluate(&ScalingInput {
199            cpu_util,
200            memory_pressure: effective_memory_pressure,
201            current: current_target,
202            min_threads: cfg.min_threads,
203            max_threads: cfg.max_threads,
204            grow_below: cfg.grow_below,
205            shrink_above: cfg.shrink_above,
206            emergency_above: cfg.emergency_above,
207            memory_pressure_cap: cfg.memory_pressure_cap,
208        });
209
210        if decision.direction == "steady" {
211            tracing::debug!(
212                cpu = format!("{cpu_util:.2}"),
213                current = current_target,
214                "Worker pool steady"
215            );
216        } else {
217            tracing::debug!(
218                cpu = format!("{cpu_util:.2}"),
219                mem = format!("{effective_memory_pressure:.2}"),
220                current = current_target,
221                target = decision.target,
222                direction = decision.direction,
223                "Worker pool scaling"
224            );
225            metrics::counter!("worker_pool_scale_events_total", "direction" => decision.direction)
226                .increment(1);
227        }
228
229        // Apply the new target concurrency.
230        self.pool.semaphore.set_target(decision.target);
231
232        // Emit operational metrics -- active (leased, in-flight) and target
233        // (admission ceiling) are DISTINCT; do not conflate them.
234        let leased = self.pool.active_threads();
235        metrics::gauge!("worker_pool_active_threads").set(leased as f64);
236        metrics::gauge!("worker_pool_target_threads").set(decision.target as f64);
237        metrics::gauge!("worker_pool_available_threads")
238            .set(decision.target.saturating_sub(leased) as f64);
239        metrics::gauge!("worker_pool_cpu_utilisation").set(cpu_util);
240        metrics::gauge!("worker_pool_memory_utilisation").set(effective_memory_pressure);
241        metrics::gauge!("worker_pool_saturation")
242            .set(decision.target as f64 / cfg.max_threads.max(1) as f64);
243
244        // Feed back into ScalingPressure if attached
245        #[cfg(feature = "scaling")]
246        if let Some(sp) = self.pool.scaling_pressure.lock().as_ref() {
247            let saturation = decision.target as f64 / cfg.max_threads.max(1) as f64;
248            sp.set_component("worker_pool_saturation", saturation);
249        }
250    }
251}