hyperi_rustlib/worker/scaler.rs
1// Project: hyperi-rustlib
2// File: src/worker/scaler.rs
3// Purpose: Scaling controller loop, watermark algorithm, CPU sampling
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Threading model and the CPU vs memory asymmetry
10//!
11//! **Foundational assumption: Tokio is HyperI's async + multithreading
12//! substrate.** We do not build our own runtime or our own cgroup-aware
13//! thread-count plumbing -- we lean on the existing wheels:
14//!
15//! - **Sizing** of both the Tokio runtime (`worker_threads`) and the rayon
16//! [`AdaptiveWorkerPool`] ceiling derives from
17//! [`std::thread::available_parallelism`], which on Linux is **cgroup-aware**
18//! (`cpu.max` / `cpuset`, Rust 1.74+). So both pools auto-size to the
19//! container's CPU budget at startup with no bespoke detection.
20//! - **CPU throttling** is the kernel CFS scheduler's job (`cpu.max`). We do
21//! not, and need not, replicate it.
22//!
23//! ## Why there is no cgroup-CPU backpressure signal (but there IS for memory)
24//!
25//! CPU and memory are not symmetric, so they are not plumbed symmetrically:
26//!
27//! | Resource | Over-use outcome | Self-correcting? | Dynamic backpressure |
28//! |----------|------------------|------------------|----------------------|
29//! | Memory | OOM-kill | No -- fatal | Yes -- the `MemoryGuard` + cgroup-first pressure signal |
30//! | CPU | CFS throttle | Yes -- graceful | No -- the kernel already handles it |
31//!
32//! Exceeding the CPU quota is graceful and automatic (the scheduler throttles
33//! us); exceeding the memory limit is fatal. So the dynamic pressure signal
34//! that actually matters is **memory**, which is cgroup-aware here. There is
35//! deliberately no `cpu.stat` reader: feeding a bespoke cgroup-CPU signal into
36//! the scaler would prop up a scale-DOWN that the cgroup case does not want
37//! (under a hard quota you want to USE your whole budget; CFS bounds it).
38//!
39//! ## What `ScalingInput::cpu_util` is for
40//!
41//! The host-wide CPU sample (via `sysinfo`) drives scale-DOWN as a
42//! **bare-metal / unlimited-deployment good-neighbour heuristic** -- backing
43//! off when sharing an un-capped node. Under a cgroup `cpu.max` it is largely
44//! redundant with CFS and the cgroup-aware static sizing above. If in-process
45//! scheduler busyness is ever needed as a finer signal, the existing wheel is
46//! `tokio-metrics` (runtime busy-ratio), not a hand-rolled parser.
47
48use std::sync::Arc;
49use std::time::Duration;
50
51use sysinfo::System;
52use tokio_util::sync::CancellationToken;
53
54use super::pool::AdaptiveWorkerPool;
55
56/// Inputs to the watermark scaling algorithm.
57#[derive(Debug, Clone)]
58pub struct ScalingInput {
59 /// Host-wide CPU utilisation (0.0-1.0), a bare-metal good-neighbour
60 /// scale-down heuristic -- NOT a cgroup mechanism. See the module docs
61 /// for the CPU-vs-memory asymmetry and why there is no cgroup-CPU signal.
62 pub cpu_util: f64,
63 pub memory_pressure: f64,
64 pub current: usize,
65 pub min_threads: usize,
66 pub max_threads: usize,
67 pub grow_below: f64,
68 pub shrink_above: f64,
69 pub emergency_above: f64,
70 pub memory_pressure_cap: f64,
71}
72
73/// Result of the watermark scaling algorithm.
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct ScalingDecision {
76 /// Target thread count after applying watermark bands.
77 pub target: usize,
78 /// Direction of change: "up", "down", "emergency_down", "memory_cap", or "steady".
79 pub direction: &'static str,
80}
81
82impl ScalingDecision {
83 /// Evaluate the watermark scaling algorithm.
84 ///
85 /// Given current CPU utilisation, memory pressure, and thresholds, returns
86 /// the target thread count and the direction of the scaling change.
87 #[must_use]
88 pub fn evaluate(input: &ScalingInput) -> Self {
89 // Memory pressure overrides everything -- prevent OOM
90 if input.memory_pressure > input.memory_pressure_cap {
91 return Self {
92 target: input.min_threads,
93 direction: "memory_cap",
94 };
95 }
96
97 let (raw_target, direction) = if input.cpu_util < input.grow_below {
98 (input.current.saturating_add(2), "up")
99 } else if input.cpu_util <= input.shrink_above {
100 (input.current, "steady")
101 } else if input.cpu_util <= input.emergency_above {
102 (input.current.saturating_sub(1), "down")
103 } else {
104 (input.current.saturating_sub(2), "emergency_down")
105 };
106
107 // Clamp to [min, max]
108 let target = raw_target.clamp(input.min_threads, input.max_threads);
109
110 Self { target, direction }
111 }
112}
113
114/// Background scaling controller.
115///
116/// Samples CPU and memory every `scale_interval_secs`, applies the watermark
117/// algorithm, and adjusts the semaphore permits on the worker pool.
118pub(crate) struct ScalingController {
119 pool: Arc<AdaptiveWorkerPool>,
120 system: System,
121}
122
123impl ScalingController {
124 pub fn new(pool: Arc<AdaptiveWorkerPool>) -> Self {
125 Self {
126 pool,
127 system: System::new(),
128 }
129 }
130
131 /// Run the scaling loop until cancelled.
132 pub async fn run(mut self, cancel: CancellationToken) {
133 let interval_secs = {
134 let cfg = self.pool.config.read();
135 cfg.scale_interval_secs
136 };
137
138 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
139
140 loop {
141 tokio::select! {
142 () = cancel.cancelled() => {
143 tracing::info!("Worker pool scaling controller shutting down");
144 break;
145 }
146 _ = interval.tick() => {
147 self.tick();
148 }
149 }
150 }
151 }
152
153 fn tick(&mut self) {
154 // Sample CPU
155 self.system.refresh_cpu_all();
156 let cpu_util = f64::from(self.system.global_cpu_usage()) / 100.0;
157
158 // Memory signal, container-first. Priority:
159 // 1. Attached MemoryGuard pressure (app-tracked bytes vs the cgroup
160 // limit) -- the most accurate signal for THIS service.
161 // 2. The cgroup's own current/limit -- what the OOM killer acts on.
162 // 3. Host used/total (sysinfo) -- ONLY as a bare-metal fallback.
163 // Host memory must NOT drive container decisions: on a large shared
164 // host it is unrelated to this container's cgroup limit (it can be
165 // high from other tenants, or mask this container nearing its own cap).
166 #[cfg(feature = "memory")]
167 let guard_pressure = self
168 .pool
169 .memory_guard
170 .lock()
171 .as_ref()
172 .map(|g| g.pressure_ratio());
173 #[cfg(not(feature = "memory"))]
174 let guard_pressure: Option<f64> = None;
175
176 #[cfg(feature = "memory")]
177 let cgroup_pressure = crate::memory::detect_memory_pressure();
178 #[cfg(not(feature = "memory"))]
179 let cgroup_pressure: Option<f64> = None;
180
181 let effective_memory_pressure = guard_pressure.or(cgroup_pressure).unwrap_or_else(|| {
182 self.system.refresh_memory();
183 if self.system.total_memory() > 0 {
184 self.system.used_memory() as f64 / self.system.total_memory() as f64
185 } else {
186 0.0
187 }
188 });
189
190 // Read config (may have been hot-reloaded via Arc<RwLock<>>)
191 let cfg = self.pool.config.read().clone();
192
193 // Control input is the CURRENT TARGET (the ceiling we are adjusting),
194 // not the in-flight count -- the watermark algorithm evolves the
195 // target up/down from where it currently sits.
196 let current_target = self.pool.target_threads();
197
198 let decision = ScalingDecision::evaluate(&ScalingInput {
199 cpu_util,
200 memory_pressure: effective_memory_pressure,
201 current: current_target,
202 min_threads: cfg.min_threads,
203 max_threads: cfg.max_threads,
204 grow_below: cfg.grow_below,
205 shrink_above: cfg.shrink_above,
206 emergency_above: cfg.emergency_above,
207 memory_pressure_cap: cfg.memory_pressure_cap,
208 });
209
210 if decision.direction == "steady" {
211 tracing::debug!(
212 cpu = format!("{cpu_util:.2}"),
213 current = current_target,
214 "Worker pool steady"
215 );
216 } else {
217 tracing::debug!(
218 cpu = format!("{cpu_util:.2}"),
219 mem = format!("{effective_memory_pressure:.2}"),
220 current = current_target,
221 target = decision.target,
222 direction = decision.direction,
223 "Worker pool scaling"
224 );
225 metrics::counter!("worker_pool_scale_events_total", "direction" => decision.direction)
226 .increment(1);
227 }
228
229 // Apply the new target concurrency.
230 self.pool.semaphore.set_target(decision.target);
231
232 // Emit operational metrics -- active (leased, in-flight) and target
233 // (admission ceiling) are DISTINCT; do not conflate them.
234 let leased = self.pool.active_threads();
235 metrics::gauge!("worker_pool_active_threads").set(leased as f64);
236 metrics::gauge!("worker_pool_target_threads").set(decision.target as f64);
237 metrics::gauge!("worker_pool_available_threads")
238 .set(decision.target.saturating_sub(leased) as f64);
239 metrics::gauge!("worker_pool_cpu_utilisation").set(cpu_util);
240 metrics::gauge!("worker_pool_memory_utilisation").set(effective_memory_pressure);
241 metrics::gauge!("worker_pool_saturation")
242 .set(decision.target as f64 / cfg.max_threads.max(1) as f64);
243
244 // Feed back into ScalingPressure if attached
245 #[cfg(feature = "scaling")]
246 if let Some(sp) = self.pool.scaling_pressure.lock().as_ref() {
247 let saturation = decision.target as f64 / cfg.max_threads.max(1) as f64;
248 sp.set_component("worker_pool_saturation", saturation);
249 }
250 }
251}