Skip to main content

hyperi_rustlib/scaling/
pressure.rs

1// Project:   hyperi-rustlib
2// File:      src/scaling/pressure.rs
3// Purpose:   Lock-free scaling pressure calculator
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Lock-free scaling pressure calculator for KEDA autoscaling.
10//!
11//! Apps register components at construction, then update values atomically
12//! from their pipeline. [`ScalingPressure::calculate`] returns 0.0-100.0.
13//!
14//! ## Gate Logic
15//!
16//! Two hard gates are evaluated before the weighted composite:
17//!
18//! 1. **Circuit breaker** -- if any circuit is open, returns 0.0
19//!    (sink is down, scaling won't help)
20//! 2. **Memory pressure** -- if `used/limit >= threshold`, returns 100.0
21//!    (scale immediately before OOM)
22//!
23//! If neither gate fires, the weighted composite is calculated.
24
25use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
26
27use super::config::{ScalingComponent, ScalingPressureConfig};
28
29/// Active gate preventing normal composite calculation.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum GateType {
32    /// Circuit breaker open → 0.0 (scaling won't help).
33    CircuitBreaker,
34    /// Memory pressure high → 100.0 (scale before OOM).
35    MemoryPressure,
36}
37
38impl std::fmt::Display for GateType {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        match self {
41            GateType::CircuitBreaker => write!(f, "circuit_breaker"),
42            GateType::MemoryPressure => write!(f, "memory_pressure"),
43        }
44    }
45}
46
47/// Per-component diagnostic snapshot.
48#[derive(Debug, Clone)]
49pub struct ComponentSnapshot {
50    /// Component name.
51    pub name: String,
52    /// Current raw value.
53    pub raw_value: f64,
54    /// Score contribution (0.0-weight*100.0).
55    pub score: f64,
56    /// Configured weight.
57    pub weight: f64,
58    /// Configured saturation point.
59    pub saturation: f64,
60}
61
62/// Full diagnostic snapshot of scaling pressure state.
63#[derive(Debug, Clone)]
64pub struct PressureSnapshot {
65    /// Calculated scaling pressure (0.0-100.0).
66    pub value: f64,
67    /// Active gate, if any.
68    pub gate_active: Option<GateType>,
69    /// Per-component breakdown.
70    pub components: Vec<ComponentSnapshot>,
71    /// Current memory usage ratio (0.0-1.0).
72    pub memory_ratio: f64,
73    /// Whether the circuit breaker is signalled open.
74    pub circuit_open: bool,
75}
76
77/// Internal entry for a registered component.
78struct ComponentEntry {
79    name: String,
80    weight: f64,
81    saturation: f64,
82    /// Current value stored as f64 bits in AtomicU64.
83    value: AtomicU64,
84}
85
86/// Lock-free scaling pressure calculator.
87///
88/// Produces a 0.0-100.0 composite metric for KEDA (or any autoscaler)
89/// based on weighted application signals with two hard gates.
90///
91/// All updates are lock-free (`Relaxed` ordering) -- safe to call from
92/// any thread without contention.
93///
94/// # Example
95///
96/// ```rust
97/// use hyperi_rustlib::scaling::{ScalingPressure, ScalingPressureConfig, ScalingComponent};
98///
99/// let pressure = ScalingPressure::new(
100///     ScalingPressureConfig::default(),
101///     vec![
102///         ScalingComponent::new("kafka_lag", 0.50, 100_000.0),
103///         ScalingComponent::new("memory", 0.50, 1.0),
104///     ],
105/// );
106///
107/// pressure.set_component("kafka_lag", 50_000.0);
108/// pressure.set_memory(400_000_000, 1_000_000_000);
109///
110/// let value = pressure.calculate();
111/// assert!(value > 0.0 && value < 100.0);
112/// ```
113pub struct ScalingPressure {
114    enabled: bool,
115    memory_gate_threshold: f64,
116    components: Vec<ComponentEntry>,
117    circuit_open: AtomicBool,
118    memory_used: AtomicU64,
119    memory_limit: AtomicU64,
120}
121
122impl ScalingPressure {
123    /// Create a new scaling pressure calculator.
124    ///
125    /// Components define the weighted signals. Their order doesn't matter.
126    #[must_use]
127    pub fn new(config: ScalingPressureConfig, components: Vec<ScalingComponent>) -> Self {
128        let entries = components
129            .into_iter()
130            .map(|c| ComponentEntry {
131                name: c.name,
132                weight: c.weight,
133                saturation: c.saturation,
134                value: AtomicU64::new(0_f64.to_bits()),
135            })
136            .collect();
137
138        Self {
139            enabled: config.enabled,
140            memory_gate_threshold: config.memory_gate_threshold,
141            components: entries,
142            circuit_open: AtomicBool::new(false),
143            memory_used: AtomicU64::new(0),
144            memory_limit: AtomicU64::new(0),
145        }
146    }
147
148    /// Set a component's current value (lock-free).
149    ///
150    /// If `name` doesn't match any registered component, this is a no-op.
151    pub fn set_component(&self, name: &str, value: f64) {
152        for entry in &self.components {
153            if entry.name == name {
154                entry.value.store(value.to_bits(), Ordering::Relaxed);
155                return;
156            }
157        }
158    }
159
160    /// Signal whether the circuit breaker is open.
161    ///
162    /// When open, `calculate()` returns 0.0 (scaling won't help when the
163    /// downstream sink is unavailable).
164    pub fn set_circuit_open(&self, open: bool) {
165        self.circuit_open.store(open, Ordering::Relaxed);
166    }
167
168    /// Update memory usage for the memory gate.
169    ///
170    /// When `used_bytes / limit_bytes >= memory_gate_threshold`,
171    /// `calculate()` returns 100.0 to trigger immediate scale-up.
172    pub fn set_memory(&self, used_bytes: u64, limit_bytes: u64) {
173        self.memory_used.store(used_bytes, Ordering::Relaxed);
174        self.memory_limit.store(limit_bytes, Ordering::Relaxed);
175    }
176
177    /// Calculate composite scaling pressure (0.0-100.0).
178    ///
179    /// Gate logic:
180    /// 1. Disabled → 0.0
181    /// 2. Circuit breaker open → 0.0
182    /// 3. Memory pressure ≥ threshold → 100.0
183    /// 4. Otherwise → weighted composite capped at 100.0
184    #[must_use]
185    pub fn calculate(&self) -> f64 {
186        if !self.enabled {
187            return 0.0;
188        }
189
190        // Gate 1: Circuit breaker open -- sink is down, scaling won't help
191        if self.circuit_open.load(Ordering::Relaxed) {
192            return 0.0;
193        }
194
195        // Compute memory ratio
196        let memory_used = self.memory_used.load(Ordering::Relaxed) as f64;
197        let memory_limit = self.memory_limit.load(Ordering::Relaxed) as f64;
198        let memory_ratio = if memory_limit > 0.0 {
199            memory_used / memory_limit
200        } else {
201            0.0
202        };
203
204        // Gate 2: High memory pressure -- scale immediately before OOM
205        if memory_ratio >= self.memory_gate_threshold {
206            return 100.0;
207        }
208
209        // Weighted composite
210        let mut total = 0.0_f64;
211        for entry in &self.components {
212            let value = f64::from_bits(entry.value.load(Ordering::Relaxed));
213            let score = if entry.saturation > 0.0 {
214                (value / entry.saturation).min(1.0) * entry.weight * 100.0
215            } else {
216                0.0
217            };
218            total += score;
219        }
220
221        total.min(100.0)
222    }
223
224    /// Diagnostic snapshot with per-component breakdown.
225    #[must_use]
226    pub fn snapshot(&self) -> PressureSnapshot {
227        let circuit_open = self.circuit_open.load(Ordering::Relaxed);
228
229        let memory_used = self.memory_used.load(Ordering::Relaxed) as f64;
230        let memory_limit = self.memory_limit.load(Ordering::Relaxed) as f64;
231        let memory_ratio = if memory_limit > 0.0 {
232            memory_used / memory_limit
233        } else {
234            0.0
235        };
236
237        // Determine active gate
238        let gate_active = if !self.enabled {
239            None
240        } else if circuit_open {
241            Some(GateType::CircuitBreaker)
242        } else if memory_ratio >= self.memory_gate_threshold {
243            Some(GateType::MemoryPressure)
244        } else {
245            None
246        };
247
248        let components: Vec<ComponentSnapshot> = self
249            .components
250            .iter()
251            .map(|entry| {
252                let raw_value = f64::from_bits(entry.value.load(Ordering::Relaxed));
253                let score = if entry.saturation > 0.0 {
254                    (raw_value / entry.saturation).min(1.0) * entry.weight * 100.0
255                } else {
256                    0.0
257                };
258                ComponentSnapshot {
259                    name: entry.name.clone(),
260                    raw_value,
261                    score,
262                    weight: entry.weight,
263                    saturation: entry.saturation,
264                }
265            })
266            .collect();
267
268        PressureSnapshot {
269            value: self.calculate(),
270            gate_active,
271            components,
272            memory_ratio,
273            circuit_open,
274        }
275    }
276
277    /// Whether the engine is enabled.
278    #[must_use]
279    pub fn is_enabled(&self) -> bool {
280        self.enabled
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287    use crate::scaling::ScalingComponent;
288
289    fn test_components() -> Vec<ScalingComponent> {
290        vec![
291            ScalingComponent::new("kafka_lag", 0.35, 100_000.0),
292            ScalingComponent::new("buffer_depth", 0.25, 10_000.0),
293            ScalingComponent::new("insert_latency", 0.15, 5.0),
294            ScalingComponent::new("memory", 0.15, 1.0),
295            ScalingComponent::new("errors", 0.10, 100.0),
296        ]
297    }
298
299    fn test_pressure() -> ScalingPressure {
300        ScalingPressure::new(ScalingPressureConfig::default(), test_components())
301    }
302
303    #[test]
304    fn test_zero_load() {
305        let p = test_pressure();
306        let value = p.calculate();
307        assert!(
308            value.abs() < f64::EPSILON,
309            "Zero load should produce 0.0, got {value}"
310        );
311    }
312
313    #[test]
314    fn test_single_component_at_saturation() {
315        let p = test_pressure();
316        // kafka_lag at saturation (100,000) → contributes weight * 100 = 35.0
317        p.set_component("kafka_lag", 100_000.0);
318        let value = p.calculate();
319        assert!(
320            (value - 35.0).abs() < 0.01,
321            "kafka_lag at saturation should contribute 35.0, got {value}"
322        );
323    }
324
325    #[test]
326    fn test_single_component_half_saturation() {
327        let p = test_pressure();
328        // kafka_lag at 50% saturation → contributes 0.5 * 0.35 * 100 = 17.5
329        p.set_component("kafka_lag", 50_000.0);
330        let value = p.calculate();
331        assert!(
332            (value - 17.5).abs() < 0.01,
333            "kafka_lag at half saturation should contribute 17.5, got {value}"
334        );
335    }
336
337    #[test]
338    fn test_all_components_saturated() {
339        let p = test_pressure();
340        p.set_component("kafka_lag", 200_000.0); // Over saturation
341        p.set_component("buffer_depth", 20_000.0);
342        p.set_component("insert_latency", 10.0);
343        p.set_component("memory", 2.0);
344        p.set_component("errors", 200.0);
345        let value = p.calculate();
346        assert!(
347            (value - 100.0).abs() < 0.01,
348            "All saturated should produce 100.0, got {value}"
349        );
350    }
351
352    #[test]
353    fn test_capped_at_100() {
354        let p = test_pressure();
355        // Way over saturation for all components
356        p.set_component("kafka_lag", 1_000_000.0);
357        p.set_component("buffer_depth", 1_000_000.0);
358        p.set_component("insert_latency", 1_000.0);
359        p.set_component("memory", 100.0);
360        p.set_component("errors", 100_000.0);
361        let value = p.calculate();
362        assert!(
363            (value - 100.0).abs() < f64::EPSILON,
364            "Should be capped at 100.0, got {value}"
365        );
366    }
367
368    #[test]
369    fn test_circuit_breaker_gate() {
370        let p = test_pressure();
371        p.set_component("kafka_lag", 100_000.0);
372        p.set_circuit_open(true);
373        let value = p.calculate();
374        assert!(
375            value.abs() < f64::EPSILON,
376            "Circuit breaker open should produce 0.0, got {value}"
377        );
378    }
379
380    #[test]
381    fn test_memory_gate() {
382        let p = test_pressure();
383        // 80% memory = exactly at threshold
384        p.set_memory(800, 1000);
385        let value = p.calculate();
386        assert!(
387            (value - 100.0).abs() < f64::EPSILON,
388            "Memory at threshold should produce 100.0, got {value}"
389        );
390    }
391
392    #[test]
393    fn test_memory_gate_above_threshold() {
394        let p = test_pressure();
395        p.set_memory(900, 1000);
396        let value = p.calculate();
397        assert!(
398            (value - 100.0).abs() < f64::EPSILON,
399            "Memory above threshold should produce 100.0, got {value}"
400        );
401    }
402
403    #[test]
404    fn test_memory_below_threshold_uses_composite() {
405        let p = test_pressure();
406        // 70% memory -- below 0.8 threshold
407        p.set_memory(700, 1000);
408        p.set_component("kafka_lag", 50_000.0);
409        let value = p.calculate();
410        // Should be composite, not 100.0
411        assert!(
412            value > 0.0 && value < 100.0,
413            "Memory below threshold should use composite, got {value}"
414        );
415    }
416
417    #[test]
418    fn test_memory_gate_takes_precedence_over_circuit_breaker() {
419        // Memory gate (100.0) fires. Circuit breaker (0.0) is checked first.
420        // Circuit breaker is checked before memory -- so CB wins.
421        let p = test_pressure();
422        p.set_memory(900, 1000);
423        p.set_circuit_open(true);
424        let value = p.calculate();
425        assert!(
426            value.abs() < f64::EPSILON,
427            "Circuit breaker should take precedence, got {value}"
428        );
429    }
430
431    #[test]
432    fn test_disabled() {
433        let config = ScalingPressureConfig {
434            enabled: false,
435            ..Default::default()
436        };
437        let p = ScalingPressure::new(config, test_components());
438        p.set_component("kafka_lag", 100_000.0);
439        p.set_memory(900, 1000);
440        let value = p.calculate();
441        assert!(
442            value.abs() < f64::EPSILON,
443            "Disabled should produce 0.0, got {value}"
444        );
445    }
446
447    #[test]
448    fn test_unknown_component_is_noop() {
449        let p = test_pressure();
450        // Should not panic or affect result
451        p.set_component("nonexistent", 999.0);
452        let value = p.calculate();
453        assert!(
454            value.abs() < f64::EPSILON,
455            "Unknown component should not affect result, got {value}"
456        );
457    }
458
459    #[test]
460    fn test_zero_memory_limit() {
461        let p = test_pressure();
462        // Zero limit should not trigger memory gate (avoid div by zero)
463        p.set_memory(100, 0);
464        p.set_component("kafka_lag", 50_000.0);
465        let value = p.calculate();
466        assert!(
467            value > 0.0,
468            "Zero memory limit should not trigger gate, got {value}"
469        );
470    }
471
472    #[test]
473    fn test_zero_saturation_component() {
474        let p = ScalingPressure::new(
475            ScalingPressureConfig::default(),
476            vec![ScalingComponent::new("broken", 0.50, 0.0)],
477        );
478        p.set_component("broken", 100.0);
479        let value = p.calculate();
480        assert!(
481            value.abs() < f64::EPSILON,
482            "Zero saturation component should contribute 0.0, got {value}"
483        );
484    }
485
486    #[test]
487    fn test_snapshot() {
488        let p = test_pressure();
489        p.set_component("kafka_lag", 50_000.0);
490        p.set_component("buffer_depth", 5_000.0);
491        p.set_memory(500, 1000);
492
493        let snap = p.snapshot();
494        assert!(!snap.circuit_open);
495        assert!(snap.gate_active.is_none());
496        assert!((snap.memory_ratio - 0.5).abs() < f64::EPSILON);
497        assert_eq!(snap.components.len(), 5);
498        assert!(snap.value > 0.0);
499
500        // Check kafka_lag component
501        let lag = snap
502            .components
503            .iter()
504            .find(|c| c.name == "kafka_lag")
505            .unwrap();
506        assert!((lag.raw_value - 50_000.0).abs() < f64::EPSILON);
507        assert!((lag.score - 17.5).abs() < 0.01);
508    }
509
510    #[test]
511    fn test_snapshot_with_gate() {
512        let p = test_pressure();
513        p.set_circuit_open(true);
514
515        let snap = p.snapshot();
516        assert!(snap.circuit_open);
517        assert_eq!(snap.gate_active, Some(GateType::CircuitBreaker));
518        assert!(snap.value.abs() < f64::EPSILON);
519    }
520
521    #[test]
522    fn test_is_enabled() {
523        let p = test_pressure();
524        assert!(p.is_enabled());
525
526        let disabled = ScalingPressure::new(
527            ScalingPressureConfig {
528                enabled: false,
529                ..Default::default()
530            },
531            vec![],
532        );
533        assert!(!disabled.is_enabled());
534    }
535
536    #[test]
537    fn test_mixed_load() {
538        let p = test_pressure();
539        // Realistic mixed load scenario
540        p.set_component("kafka_lag", 20_000.0); // 20% of saturation
541        p.set_component("buffer_depth", 3_000.0); // 30% of saturation
542        p.set_component("insert_latency", 1.0); // 20% of saturation
543        p.set_component("memory", 0.4); // 40% of saturation
544        p.set_component("errors", 5.0); // 5% of saturation
545
546        let value = p.calculate();
547        // Expected: 0.2*35 + 0.3*25 + 0.2*15 + 0.4*15 + 0.05*10
548        // = 7.0 + 7.5 + 3.0 + 6.0 + 0.5 = 24.0
549        assert!(
550            (value - 24.0).abs() < 0.01,
551            "Mixed load should produce ~24.0, got {value}"
552        );
553    }
554}