Skip to main content

hyperi_rustlib/worker/
scaler.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/scaler.rs
3// Purpose:   Scaling controller loop, watermark algorithm, CPU sampling
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use sysinfo::System;
13use tokio_util::sync::CancellationToken;
14
15use super::pool::AdaptiveWorkerPool;
16
17/// Inputs to the watermark scaling algorithm.
18#[derive(Debug, Clone)]
19pub struct ScalingInput {
20    pub cpu_util: f64,
21    pub memory_pressure: f64,
22    pub current: usize,
23    pub min_threads: usize,
24    pub max_threads: usize,
25    pub grow_below: f64,
26    pub shrink_above: f64,
27    pub emergency_above: f64,
28    pub memory_pressure_cap: f64,
29}
30
31/// Result of the watermark scaling algorithm.
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub struct ScalingDecision {
34    /// Target thread count after applying watermark bands.
35    pub target: usize,
36    /// Direction of change: "up", "down", "emergency_down", "memory_cap", or "steady".
37    pub direction: &'static str,
38}
39
40impl ScalingDecision {
41    /// Evaluate the watermark scaling algorithm.
42    ///
43    /// Given current CPU utilisation, memory pressure, and thresholds, returns
44    /// the target thread count and the direction of the scaling change.
45    #[must_use]
46    pub fn evaluate(input: &ScalingInput) -> Self {
47        // Memory pressure overrides everything -- prevent OOM
48        if input.memory_pressure > input.memory_pressure_cap {
49            return Self {
50                target: input.min_threads,
51                direction: "memory_cap",
52            };
53        }
54
55        let (raw_target, direction) = if input.cpu_util < input.grow_below {
56            (input.current.saturating_add(2), "up")
57        } else if input.cpu_util <= input.shrink_above {
58            (input.current, "steady")
59        } else if input.cpu_util <= input.emergency_above {
60            (input.current.saturating_sub(1), "down")
61        } else {
62            (input.current.saturating_sub(2), "emergency_down")
63        };
64
65        // Clamp to [min, max]
66        let target = raw_target.clamp(input.min_threads, input.max_threads);
67
68        Self { target, direction }
69    }
70}
71
72/// Background scaling controller.
73///
74/// Samples CPU and memory every `scale_interval_secs`, applies the watermark
75/// algorithm, and adjusts the semaphore permits on the worker pool.
76pub(crate) struct ScalingController {
77    pool: Arc<AdaptiveWorkerPool>,
78    system: System,
79}
80
81impl ScalingController {
82    pub fn new(pool: Arc<AdaptiveWorkerPool>) -> Self {
83        Self {
84            pool,
85            system: System::new(),
86        }
87    }
88
89    /// Run the scaling loop until cancelled.
90    pub async fn run(mut self, cancel: CancellationToken) {
91        let interval_secs = {
92            let cfg = self.pool.config.read();
93            cfg.scale_interval_secs
94        };
95
96        let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
97
98        loop {
99            tokio::select! {
100                () = cancel.cancelled() => {
101                    tracing::info!("Worker pool scaling controller shutting down");
102                    break;
103                }
104                _ = interval.tick() => {
105                    self.tick();
106                }
107            }
108        }
109    }
110
111    fn tick(&mut self) {
112        // Sample CPU
113        self.system.refresh_cpu_all();
114        let cpu_util = f64::from(self.system.global_cpu_usage()) / 100.0;
115
116        // Sample memory -- dual source: sysinfo process RSS + MemoryGuard if attached
117        self.system.refresh_memory();
118        let sysinfo_mem_pressure = if self.system.total_memory() > 0 {
119            self.system.used_memory() as f64 / self.system.total_memory() as f64
120        } else {
121            0.0
122        };
123
124        #[cfg(feature = "memory")]
125        let memory_guard_pressure = self
126            .pool
127            .memory_guard
128            .lock()
129            .as_ref()
130            .map_or(0.0, |g| g.pressure_ratio());
131        #[cfg(not(feature = "memory"))]
132        let memory_guard_pressure = 0.0;
133
134        let effective_memory_pressure = sysinfo_mem_pressure.max(memory_guard_pressure);
135
136        // Read config (may have been hot-reloaded via Arc<RwLock<>>)
137        let cfg = self.pool.config.read().clone();
138
139        let current_permits = self.pool.active_threads();
140
141        let decision = ScalingDecision::evaluate(&ScalingInput {
142            cpu_util,
143            memory_pressure: effective_memory_pressure,
144            current: current_permits,
145            min_threads: cfg.min_threads,
146            max_threads: cfg.max_threads,
147            grow_below: cfg.grow_below,
148            shrink_above: cfg.shrink_above,
149            emergency_above: cfg.emergency_above,
150            memory_pressure_cap: cfg.memory_pressure_cap,
151        });
152
153        if decision.direction == "steady" {
154            tracing::debug!(
155                cpu = format!("{cpu_util:.2}"),
156                current = current_permits,
157                "Worker pool steady"
158            );
159        } else {
160            tracing::debug!(
161                cpu = format!("{cpu_util:.2}"),
162                mem = format!("{effective_memory_pressure:.2}"),
163                current = current_permits,
164                target = decision.target,
165                direction = decision.direction,
166                "Worker pool scaling"
167            );
168            metrics::counter!("worker_pool_scale_events_total", "direction" => decision.direction)
169                .increment(1);
170        }
171
172        // Adjust semaphore permits
173        self.pool.semaphore.set_permits(decision.target);
174
175        // Emit operational metrics
176        metrics::gauge!("worker_pool_active_threads").set(decision.target as f64);
177        metrics::gauge!("worker_pool_target_threads").set(decision.target as f64);
178        metrics::gauge!("worker_pool_cpu_utilisation").set(cpu_util);
179        metrics::gauge!("worker_pool_memory_utilisation").set(effective_memory_pressure);
180        metrics::gauge!("worker_pool_saturation")
181            .set(decision.target as f64 / cfg.max_threads.max(1) as f64);
182
183        // Feed back into ScalingPressure if attached
184        #[cfg(feature = "scaling")]
185        if let Some(sp) = self.pool.scaling_pressure.lock().as_ref() {
186            let saturation = decision.target as f64 / cfg.max_threads.max(1) as f64;
187            sp.set_component("worker_pool_saturation", saturation);
188        }
189    }
190}