hyperi_rustlib/worker/
scaler.rs1use std::sync::Arc;
10use std::time::Duration;
11
12use sysinfo::System;
13use tokio_util::sync::CancellationToken;
14
15use super::pool::AdaptiveWorkerPool;
16
17#[derive(Debug, Clone)]
19pub struct ScalingInput {
20 pub cpu_util: f64,
21 pub memory_pressure: f64,
22 pub current: usize,
23 pub min_threads: usize,
24 pub max_threads: usize,
25 pub grow_below: f64,
26 pub shrink_above: f64,
27 pub emergency_above: f64,
28 pub memory_pressure_cap: f64,
29}
30
31#[derive(Debug, Clone, PartialEq, Eq)]
33pub struct ScalingDecision {
34 pub target: usize,
36 pub direction: &'static str,
38}
39
40impl ScalingDecision {
41 #[must_use]
46 pub fn evaluate(input: &ScalingInput) -> Self {
47 if input.memory_pressure > input.memory_pressure_cap {
49 return Self {
50 target: input.min_threads,
51 direction: "memory_cap",
52 };
53 }
54
55 let (raw_target, direction) = if input.cpu_util < input.grow_below {
56 (input.current.saturating_add(2), "up")
57 } else if input.cpu_util <= input.shrink_above {
58 (input.current, "steady")
59 } else if input.cpu_util <= input.emergency_above {
60 (input.current.saturating_sub(1), "down")
61 } else {
62 (input.current.saturating_sub(2), "emergency_down")
63 };
64
65 let target = raw_target.clamp(input.min_threads, input.max_threads);
67
68 Self { target, direction }
69 }
70}
71
72pub(crate) struct ScalingController {
77 pool: Arc<AdaptiveWorkerPool>,
78 system: System,
79}
80
81impl ScalingController {
82 pub fn new(pool: Arc<AdaptiveWorkerPool>) -> Self {
83 Self {
84 pool,
85 system: System::new(),
86 }
87 }
88
89 pub async fn run(mut self, cancel: CancellationToken) {
91 let interval_secs = {
92 let cfg = self.pool.config.read();
93 cfg.scale_interval_secs
94 };
95
96 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
97
98 loop {
99 tokio::select! {
100 () = cancel.cancelled() => {
101 tracing::info!("Worker pool scaling controller shutting down");
102 break;
103 }
104 _ = interval.tick() => {
105 self.tick();
106 }
107 }
108 }
109 }
110
111 fn tick(&mut self) {
112 self.system.refresh_cpu_all();
114 let cpu_util = f64::from(self.system.global_cpu_usage()) / 100.0;
115
116 self.system.refresh_memory();
118 let sysinfo_mem_pressure = if self.system.total_memory() > 0 {
119 self.system.used_memory() as f64 / self.system.total_memory() as f64
120 } else {
121 0.0
122 };
123
124 #[cfg(feature = "memory")]
125 let memory_guard_pressure = self
126 .pool
127 .memory_guard
128 .lock()
129 .as_ref()
130 .map_or(0.0, |g| g.pressure_ratio());
131 #[cfg(not(feature = "memory"))]
132 let memory_guard_pressure = 0.0;
133
134 let effective_memory_pressure = sysinfo_mem_pressure.max(memory_guard_pressure);
135
136 let cfg = self.pool.config.read().clone();
138
139 let current_permits = self.pool.active_threads();
140
141 let decision = ScalingDecision::evaluate(&ScalingInput {
142 cpu_util,
143 memory_pressure: effective_memory_pressure,
144 current: current_permits,
145 min_threads: cfg.min_threads,
146 max_threads: cfg.max_threads,
147 grow_below: cfg.grow_below,
148 shrink_above: cfg.shrink_above,
149 emergency_above: cfg.emergency_above,
150 memory_pressure_cap: cfg.memory_pressure_cap,
151 });
152
153 if decision.direction == "steady" {
154 tracing::debug!(
155 cpu = format!("{cpu_util:.2}"),
156 current = current_permits,
157 "Worker pool steady"
158 );
159 } else {
160 tracing::debug!(
161 cpu = format!("{cpu_util:.2}"),
162 mem = format!("{effective_memory_pressure:.2}"),
163 current = current_permits,
164 target = decision.target,
165 direction = decision.direction,
166 "Worker pool scaling"
167 );
168 metrics::counter!("worker_pool_scale_events_total", "direction" => decision.direction)
169 .increment(1);
170 }
171
172 self.pool.semaphore.set_permits(decision.target);
174
175 metrics::gauge!("worker_pool_active_threads").set(decision.target as f64);
177 metrics::gauge!("worker_pool_target_threads").set(decision.target as f64);
178 metrics::gauge!("worker_pool_cpu_utilisation").set(cpu_util);
179 metrics::gauge!("worker_pool_memory_utilisation").set(effective_memory_pressure);
180 metrics::gauge!("worker_pool_saturation")
181 .set(decision.target as f64 / cfg.max_threads.max(1) as f64);
182
183 #[cfg(feature = "scaling")]
185 if let Some(sp) = self.pool.scaling_pressure.lock().as_ref() {
186 let saturation = decision.target as f64 / cfg.max_threads.max(1) as f64;
187 sp.set_component("worker_pool_saturation", saturation);
188 }
189 }
190}