Skip to main content

hyperi_rustlib/scaling/
engine.rs

1// Project:   hyperi-rustlib
2// File:      src/scaling/engine.rs
3// Purpose:   Horizontal scaling-pressure engine (CEL over local metrics)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! The horizontal scaling-pressure ENGINE: CEL expressions evaluated over local,
10//! correlated signals on the periodic scaling tick, producing N named
11//! `scaling_pressure` gauges for any horizontal autoscaler (KEDA is the prime
12//! tool; the engine is autoscaler-neutral).
13//!
14//! ## The correlated composite (rustlib's edge)
15//!
16//! An autoscaler sees only coarse, top-level, single metrics. An APP has rich
17//! LOCAL context and can COMBINE + CORRELATE it -- CPU, the compound transport
18//! pressure, and any domain signal -- into one **correlated composite**
19//! pressure. That is the whole point of this engine (scaling ACR principle 4).
20//!
21//! ## Precedence
22//!
23//! `config.scaling.pressures` (CEL) > app-plumbed default > rustlib's
24//! context-aware smart default (computed in Rust; used when no expressions are
25//! configured, and as the per-pressure fallback when one fails).
26//!
27//! ## Smart default (context-aware, per spec section 4)
28//!
29//! ```text
30//! circuit_open ? 0 : 100 * min(1, max(cpu_utilisation_ratio / cpu_target,
31//!                                     transport_inbound_pressure_ratio))
32//! ```
33//!
34//! Outbound pressure is EMITTED (gratis) but not in the default max -- it is
35//! downstream-bound (scaling ACR). Memory is excluded (self-regulation's job).
36
37use std::collections::HashMap;
38
39use cel::{Program, Value};
40use parking_lot::Mutex;
41use serde_json::json;
42
43use super::config::ScalingEngineConfig;
44use super::transport_pressure::{
45    PressureTargets, ScalingTransport, TransportSignals, inbound_pressure, outbound_pressure,
46};
47
48/// One configured pressure output. `program` is `None` for the rustlib smart
49/// default (computed in Rust) and for any user expression that failed to
50/// compile/validate (it falls back to the smart default, loudly).
51struct CompiledPressure {
52    name: String,
53    program: Option<Program>,
54    enabled: bool,
55}
56
57/// The CEL-over-local-metrics horizontal scaling-pressure engine.
58///
59/// Construct with [`ScalingEngine::new`], then call [`ScalingEngine::tick`] on
60/// the periodic scaling interval (NOT the data hot-path). Evaluation errors hold
61/// the last-good value (fail-safe), missing signals contribute 0 (never NaN).
62pub struct ScalingEngine {
63    #[cfg_attr(not(feature = "metrics"), allow(dead_code))]
64    namespace: String,
65    enabled: bool,
66    cpu_target: f64,
67    targets: PressureTargets,
68    inbound_kind: ScalingTransport,
69    outbound_kind: ScalingTransport,
70    params: std::collections::BTreeMap<String, f64>,
71    pressures: Vec<CompiledPressure>,
72    /// Last successfully-evaluated value per pressure name (fail-safe hold).
73    last_good: Mutex<HashMap<String, f64>>,
74}
75
76impl ScalingEngine {
77    /// Build the engine from config + the resolved inbound/outbound transport
78    /// kinds. Returns the engine plus a list of friendly, operator-facing
79    /// validation errors (each failing expression falls back to the smart
80    /// default; the caller should log these LOUDLY).
81    ///
82    /// `inbound`/`outbound` come from config (`scaling.transport.*`) or are
83    /// auto-derived by the runtime from the transports it built.
84    #[must_use]
85    pub fn new(
86        namespace: &str,
87        config: &ScalingEngineConfig,
88        inbound: ScalingTransport,
89        outbound: ScalingTransport,
90    ) -> (Self, Vec<String>) {
91        let targets = PressureTargets::from_params(&config.params);
92        let cpu_target = config.cpu_target();
93        let mut errors = Vec::new();
94
95        let pressures: Vec<CompiledPressure> = if config.pressures.is_empty() {
96            // No expressions configured -> the rustlib smart default.
97            vec![CompiledPressure {
98                name: "default".to_string(),
99                program: None,
100                enabled: true,
101            }]
102        } else {
103            config
104                .pressures
105                .iter()
106                .map(|p| {
107                    let program = if p.enabled {
108                        match compile_and_check(&p.expression, &config.params) {
109                            Ok(prog) => Some(prog),
110                            Err(msg) => {
111                                errors.push(format!(
112                                    "scaling pressure '{}' is invalid -- falling back to the \
113                                     rustlib smart default. {msg}",
114                                    p.name
115                                ));
116                                None
117                            }
118                        }
119                    } else {
120                        None
121                    };
122                    CompiledPressure {
123                        name: p.name.clone(),
124                        program,
125                        enabled: p.enabled,
126                    }
127                })
128                .collect()
129        };
130
131        // Duplicate names would collide on one gauge series + share a last_good
132        // slot -- flag at load, consistent with the rest of the validation.
133        {
134            let mut seen = std::collections::HashSet::new();
135            for p in &pressures {
136                if !seen.insert(p.name.as_str()) {
137                    errors.push(format!(
138                        "duplicate scaling pressure name '{}' -- names must be unique",
139                        p.name
140                    ));
141                }
142            }
143        }
144
145        let engine = Self {
146            namespace: namespace.to_string(),
147            enabled: config.enabled,
148            cpu_target,
149            targets,
150            inbound_kind: inbound,
151            outbound_kind: outbound,
152            params: config.params.clone(),
153            pressures,
154            last_good: Mutex::new(HashMap::new()),
155        };
156        (engine, errors)
157    }
158
159    /// Whether the engine is enabled.
160    #[must_use]
161    pub fn is_enabled(&self) -> bool {
162        self.enabled
163    }
164
165    /// The resolved inbound transport kind (drives the compound inbound term).
166    #[must_use]
167    pub fn inbound_kind(&self) -> ScalingTransport {
168        self.inbound_kind
169    }
170
171    /// The resolved outbound transport kind.
172    #[must_use]
173    pub fn outbound_kind(&self) -> ScalingTransport {
174        self.outbound_kind
175    }
176
177    /// The rustlib context-aware smart default (0-100). Pure; no CEL.
178    #[must_use]
179    fn smart_default(&self, cpu_ratio: f64, inbound: f64, circuit_open: bool) -> f64 {
180        if circuit_open {
181            return 0.0;
182        }
183        let cpu_term = if self.cpu_target > 0.0 {
184            cpu_ratio / self.cpu_target
185        } else {
186            0.0
187        };
188        let composite = cpu_term.max(inbound);
189        100.0 * composite.clamp(0.0, 1.0)
190    }
191
192    /// Evaluate all configured pressures for the current tick.
193    ///
194    /// Returns `(name, value)` per enabled pressure. Pure (no metric emission);
195    /// [`tick`](Self::tick) wraps this and publishes the gauges.
196    #[must_use]
197    pub fn evaluate(
198        &self,
199        signals: &TransportSignals,
200        cpu_ratio: f64,
201        memory_ratio: f64,
202    ) -> Vec<(String, f64)> {
203        let inbound = inbound_pressure(self.inbound_kind, signals, &self.targets);
204        let outbound = outbound_pressure(signals, &self.targets);
205
206        // Build the CEL context ONCE per tick (only if a pressure needs it).
207        let ctx = if self
208            .pressures
209            .iter()
210            .any(|p| p.enabled && p.program.is_some())
211        {
212            Some(self.eval_context(signals, cpu_ratio, inbound, outbound, memory_ratio))
213        } else {
214            None
215        };
216
217        let mut out = Vec::with_capacity(self.pressures.len());
218        for p in &self.pressures {
219            if !p.enabled {
220                continue;
221            }
222            let value = match &p.program {
223                // Smart default (no expression, or failed compile).
224                None => self.smart_default(cpu_ratio, inbound, signals.circuit_open),
225                Some(program) => {
226                    let evaluated = ctx.as_ref().and_then(|m| eval_program(program, m));
227                    match evaluated {
228                        Some(v) if v.is_finite() => {
229                            self.last_good.lock().insert(p.name.clone(), v);
230                            v
231                        }
232                        // Eval error / non-numeric -> hold last-good, else fall
233                        // back to the smart default (fail-safe; never panic/NaN).
234                        _ => self
235                            .last_good
236                            .lock()
237                            .get(&p.name)
238                            .copied()
239                            .unwrap_or_else(|| {
240                                self.smart_default(cpu_ratio, inbound, signals.circuit_open)
241                            }),
242                    }
243                }
244            };
245            out.push((p.name.clone(), value));
246        }
247        out
248    }
249
250    /// Evaluate and publish the gauges for this tick.
251    ///
252    /// Emits `{ns}_scaling_pressure{name=...}` per pressure, plus the gratis
253    /// compound `{ns}_transport_inbound_pressure_ratio` /
254    /// `{ns}_transport_outbound_pressure_ratio` and `{ns}_scaling_circuit_open`.
255    #[allow(unused_variables)]
256    pub fn tick(&self, signals: &TransportSignals, cpu_ratio: f64, memory_ratio: f64) {
257        if !self.enabled {
258            return;
259        }
260        let inbound = inbound_pressure(self.inbound_kind, signals, &self.targets);
261        let outbound = outbound_pressure(signals, &self.targets);
262        let values = self.evaluate(signals, cpu_ratio, memory_ratio);
263
264        #[cfg(feature = "metrics")]
265        {
266            let ns = &self.namespace;
267            for (name, value) in &values {
268                metrics::gauge!(format!("{ns}_scaling_pressure"), "name" => name.clone())
269                    .set(*value);
270            }
271            // Gratis compound transport pressure (IN and OUT) -- observability +
272            // KEDA-direct. Ratios floored at 0, unclamped above for proportional
273            // scale-up.
274            metrics::gauge!(format!("{ns}_transport_inbound_pressure_ratio")).set(inbound);
275            metrics::gauge!(format!("{ns}_transport_outbound_pressure_ratio")).set(outbound);
276            // 0/1 state gauge (NOT a bool type) -- the only default gate.
277            metrics::gauge!(format!("{ns}_scaling_circuit_open")).set(if signals.circuit_open {
278                1.0
279            } else {
280                0.0
281            });
282        }
283    }
284
285    /// Build the CEL evaluation context (derived vars + `params` + `metrics`).
286    fn eval_context(
287        &self,
288        signals: &TransportSignals,
289        cpu_ratio: f64,
290        inbound: f64,
291        outbound: f64,
292        memory_ratio: f64,
293    ) -> serde_json::Map<String, serde_json::Value> {
294        let mut m = serde_json::Map::new();
295        m.insert("cpu_utilisation_ratio".into(), json!(cpu_ratio));
296        m.insert("circuit_open".into(), json!(signals.circuit_open));
297        m.insert("transport_inbound_pressure_ratio".into(), json!(inbound));
298        m.insert("transport_outbound_pressure_ratio".into(), json!(outbound));
299        m.insert("memory_ratio".into(), json!(memory_ratio));
300
301        let params: serde_json::Map<String, serde_json::Value> = self
302            .params
303            .iter()
304            .map(|(k, v)| (k.clone(), json!(v)))
305            .collect();
306        m.insert("params".into(), serde_json::Value::Object(params));
307
308        m.insert(
309            "metrics".into(),
310            serde_json::Value::Object(signal_metrics(signals)),
311        );
312        m
313    }
314
315    /// Operator-facing list of identifiers available to expressions.
316    #[must_use]
317    pub fn available_surface(&self) -> String {
318        let params: Vec<&str> = self.params.keys().map(String::as_str).collect();
319        format!(
320            "top-level: cpu_utilisation_ratio, circuit_open, \
321             transport_inbound_pressure_ratio, transport_outbound_pressure_ratio, memory_ratio; \
322             params.{{{}}}; metrics.{{kafka_assigned_lag, redis_pending, inflight, shed_rate, \
323             send_backpressure_rate, refused_rate, produce_queue_depth}}",
324            params.join(", ")
325        )
326    }
327}
328
329/// The curated `metrics` map exposed to expressions (the scaling-relevant local
330/// signals, by name). Absent signals are simply omitted.
331fn signal_metrics(s: &TransportSignals) -> serde_json::Map<String, serde_json::Value> {
332    let mut m = serde_json::Map::new();
333    let mut put = |k: &str, v: Option<f64>| {
334        if let Some(v) = v {
335            m.insert(k.to_string(), json!(v));
336        }
337    };
338    put("kafka_assigned_lag", s.kafka_assigned_lag);
339    put("redis_pending", s.redis_pending);
340    put("inflight", s.inflight);
341    put("shed_rate", s.shed_rate);
342    put("send_backpressure_rate", s.send_backpressure_rate);
343    put("refused_rate", s.refused_rate);
344    put("produce_queue_depth", s.produce_queue_depth);
345    m
346}
347
348/// Compile a user expression and dry-run it against a representative context so
349/// unknown identifiers / type errors surface at LOAD, not first tick. Returns a
350/// friendly error string on failure.
351fn compile_and_check(
352    expr: &str,
353    params: &std::collections::BTreeMap<String, f64>,
354) -> Result<Program, String> {
355    let program = Program::compile(expr).map_err(|e| format!("compile error: {e}"))?;
356
357    // Representative zero-context: all derived vars present, every param key,
358    // and the full metrics surface, so a reference to a KNOWN name succeeds and
359    // only genuine typos/type errors fail.
360    let mut m = serde_json::Map::new();
361    m.insert("cpu_utilisation_ratio".into(), json!(0.0));
362    m.insert("circuit_open".into(), json!(false));
363    m.insert("transport_inbound_pressure_ratio".into(), json!(0.0));
364    m.insert("transport_outbound_pressure_ratio".into(), json!(0.0));
365    m.insert("memory_ratio".into(), json!(0.0));
366    let pmap: serde_json::Map<String, serde_json::Value> =
367        params.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
368    m.insert("params".into(), serde_json::Value::Object(pmap));
369    let mut metrics = serde_json::Map::new();
370    for k in [
371        "kafka_assigned_lag",
372        "redis_pending",
373        "inflight",
374        "shed_rate",
375        "send_backpressure_rate",
376        "refused_rate",
377        "produce_queue_depth",
378    ] {
379        metrics.insert(k.to_string(), json!(0.0));
380    }
381    m.insert("metrics".into(), serde_json::Value::Object(metrics));
382
383    match eval_program_checked(&program, &m) {
384        Ok(Value::Float(_) | Value::Int(_) | Value::UInt(_)) => Ok(program),
385        Ok(other) => Err(format!(
386            "expression must evaluate to a number, got {other:?}"
387        )),
388        Err(e) => Err(format!(
389            "evaluation error: {e}. Available -- top-level: cpu_utilisation_ratio, \
390             circuit_open, transport_inbound_pressure_ratio, \
391             transport_outbound_pressure_ratio, memory_ratio; params.{{{}}}; \
392             metrics.{{kafka_assigned_lag, redis_pending, inflight, shed_rate, \
393             send_backpressure_rate, refused_rate, produce_queue_depth}}",
394            params.keys().cloned().collect::<Vec<_>>().join(", ")
395        )),
396    }
397}
398
399/// Execute a compiled program against a JSON context map; returns the f64 value
400/// (coercing int/uint/bool) or `None` on any error / non-numeric result.
401fn eval_program(
402    program: &Program,
403    map: &serde_json::Map<String, serde_json::Value>,
404) -> Option<f64> {
405    let ctx = crate::expression::build_context(map.iter()).ok()?;
406    match program.execute(&ctx).ok()? {
407        Value::Float(f) => Some(f),
408        Value::Int(i) => Some(i as f64),
409        Value::UInt(u) => Some(u as f64),
410        Value::Bool(b) => Some(if b { 1.0 } else { 0.0 }),
411        _ => None,
412    }
413}
414
415/// Like [`eval_program`] but surfaces the error string (for load-time checking).
416fn eval_program_checked(
417    program: &Program,
418    map: &serde_json::Map<String, serde_json::Value>,
419) -> Result<Value, String> {
420    let ctx = crate::expression::build_context(map.iter()).map_err(|e| format!("{e}"))?;
421    program.execute(&ctx).map_err(|e| format!("{e}"))
422}
423
424#[cfg(test)]
425mod tests {
426    use super::*;
427    use crate::scaling::config::{PressureExpr, ScalingEngineConfig};
428
429    fn cfg(pressures: Vec<PressureExpr>, params: &[(&str, f64)]) -> ScalingEngineConfig {
430        let mut c = ScalingEngineConfig {
431            pressures,
432            ..Default::default()
433        };
434        for (k, v) in params {
435            c.params.insert((*k).to_string(), *v);
436        }
437        c
438    }
439
440    #[test]
441    fn smart_default_cpu_only_when_no_transport() {
442        let (eng, errs) = ScalingEngine::new(
443            "t",
444            &cfg(vec![], &[("cpu_target", 0.70)]),
445            ScalingTransport::File, // non-scalable inbound
446            ScalingTransport::Kafka,
447        );
448        assert!(errs.is_empty());
449        // CPU at target -> 100 * (0.70/0.70) = 100.
450        let v = eng.evaluate(&TransportSignals::default(), 0.70, 0.0);
451        assert_eq!(v.len(), 1);
452        assert_eq!(v[0].0, "default");
453        assert!((v[0].1 - 100.0).abs() < 1e-6);
454    }
455
456    #[test]
457    fn smart_default_takes_max_of_cpu_and_inbound_kafka() {
458        let (eng, _) = ScalingEngine::new(
459            "t",
460            &cfg(vec![], &[("cpu_target", 0.70), ("lag_target", 100_000.0)]),
461            ScalingTransport::Kafka,
462            ScalingTransport::Kafka,
463        );
464        // CPU 0.35 -> cpu_term 0.5; lag 80k/100k = 0.8 -> max 0.8 -> 80.
465        let s = TransportSignals {
466            kafka_assigned_lag: Some(80_000.0),
467            ..Default::default()
468        };
469        let v = eng.evaluate(&s, 0.35, 0.0);
470        assert!((v[0].1 - 80.0).abs() < 1e-6);
471    }
472
473    #[test]
474    fn circuit_open_gates_to_zero() {
475        let (eng, _) = ScalingEngine::new(
476            "t",
477            &cfg(vec![], &[("cpu_target", 0.70), ("lag_target", 1.0)]),
478            ScalingTransport::Kafka,
479            ScalingTransport::Kafka,
480        );
481        let s = TransportSignals {
482            kafka_assigned_lag: Some(1_000_000.0),
483            circuit_open: true,
484            ..Default::default()
485        };
486        assert!(eng.evaluate(&s, 0.99, 0.0)[0].1.abs() < f64::EPSILON);
487    }
488
489    #[test]
490    fn user_expression_evaluated() {
491        let p = PressureExpr {
492            name: "cpu".into(),
493            expression: "cpu_utilisation_ratio * 100.0".into(),
494            enabled: true,
495        };
496        let (eng, errs) = ScalingEngine::new(
497            "t",
498            &cfg(vec![p], &[]),
499            ScalingTransport::Kafka,
500            ScalingTransport::Kafka,
501        );
502        assert!(errs.is_empty(), "errors: {errs:?}");
503        let v = eng.evaluate(&TransportSignals::default(), 0.42, 0.0);
504        assert_eq!(v.len(), 1);
505        assert_eq!(v[0].0, "cpu");
506        assert!((v[0].1 - 42.0).abs() < 1e-6);
507    }
508
509    #[test]
510    fn user_expression_can_read_params_and_metrics() {
511        let p = PressureExpr {
512            name: "lag".into(),
513            expression: "metrics.kafka_assigned_lag / params.lag_target".into(),
514            enabled: true,
515        };
516        let (eng, errs) = ScalingEngine::new(
517            "t",
518            &cfg(vec![p], &[("lag_target", 1000.0)]),
519            ScalingTransport::Kafka,
520            ScalingTransport::Kafka,
521        );
522        assert!(errs.is_empty(), "errors: {errs:?}");
523        let s = TransportSignals {
524            kafka_assigned_lag: Some(500.0),
525            ..Default::default()
526        };
527        assert!((eng.evaluate(&s, 0.0, 0.0)[0].1 - 0.5).abs() < 1e-6);
528    }
529
530    #[test]
531    fn syntax_error_falls_back_with_friendly_message() {
532        let p = PressureExpr {
533            name: "broken".into(),
534            expression: "cpu_utilisation_ratio +".into(), // syntax error
535            enabled: true,
536        };
537        let (eng, errs) = ScalingEngine::new(
538            "t",
539            &cfg(vec![p], &[("cpu_target", 0.70)]),
540            ScalingTransport::Kafka,
541            ScalingTransport::Kafka,
542        );
543        assert_eq!(errs.len(), 1);
544        assert!(errs[0].contains("broken"), "msg: {}", errs[0]);
545        // Falls back to smart default (still produces a value, no panic).
546        let v = eng.evaluate(&TransportSignals::default(), 0.70, 0.0);
547        assert!((v[0].1 - 100.0).abs() < 1e-6);
548    }
549
550    #[test]
551    fn unknown_identifier_caught_at_load() {
552        let p = PressureExpr {
553            name: "typo".into(),
554            expression: "cpu_utilisation_ratoi * 100".into(), // typo'd ident
555            enabled: true,
556        };
557        let (_eng, errs) = ScalingEngine::new(
558            "t",
559            &cfg(vec![p], &[]),
560            ScalingTransport::Kafka,
561            ScalingTransport::Kafka,
562        );
563        assert_eq!(errs.len(), 1, "should catch the unknown identifier at load");
564        assert!(errs[0].contains("typo"));
565    }
566
567    #[test]
568    fn multi_output_independent_gauges() {
569        let ps = vec![
570            PressureExpr {
571                name: "a".into(),
572                expression: "cpu_utilisation_ratio * 100.0".into(),
573                enabled: true,
574            },
575            PressureExpr {
576                name: "b".into(),
577                expression: "transport_inbound_pressure_ratio * 100.0".into(),
578                enabled: true,
579            },
580        ];
581        let (eng, errs) = ScalingEngine::new(
582            "t",
583            &cfg(ps, &[("lag_target", 100.0)]),
584            ScalingTransport::Kafka,
585            ScalingTransport::Kafka,
586        );
587        assert!(errs.is_empty(), "errors: {errs:?}");
588        let s = TransportSignals {
589            kafka_assigned_lag: Some(50.0),
590            ..Default::default()
591        };
592        let v = eng.evaluate(&s, 0.30, 0.0);
593        assert_eq!(v.len(), 2);
594        assert!((v[0].1 - 30.0).abs() < 1e-6); // a: cpu
595        assert!((v[1].1 - 50.0).abs() < 1e-6); // b: inbound 50/100=0.5 *100
596    }
597}