Skip to main content

hyperi_rustlib/scaling/
engine.rs

1// Project:   hyperi-rustlib
2// File:      src/scaling/engine.rs
3// Purpose:   Horizontal scaling-pressure engine (CEL over local metrics)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! The horizontal scaling-pressure ENGINE: CEL expressions evaluated over local,
10//! correlated signals on the periodic scaling tick, producing N named
11//! `scaling_pressure` gauges for any horizontal autoscaler (KEDA is the prime
12//! tool; the engine is autoscaler-neutral).
13//!
14//! ## The correlated composite (rustlib's edge)
15//!
16//! An autoscaler sees only coarse, top-level, single metrics. An APP has rich
17//! LOCAL context and can COMBINE + CORRELATE it -- CPU, the compound transport
18//! pressure, and any domain signal -- into one **correlated composite**
19//! pressure. That is the whole point of this engine (scaling ACR principle 4).
20//!
21//! ## Precedence
22//!
23//! `config.scaling.pressures` (CEL) > app-plumbed default > rustlib's
24//! context-aware smart default (computed in Rust; used when no expressions are
25//! configured, and as the per-pressure fallback when one fails).
26//!
27//! ## Smart default (context-aware, per spec section 4)
28//!
29//! ```text
30//! circuit_open ? 0 : 100 * min(1, max(cpu_utilisation_ratio / cpu_target,
31//!                                     transport_inbound_pressure_ratio))
32//! ```
33//!
34//! Outbound pressure is EMITTED (gratis) but not in the default max -- it is
35//! downstream-bound (scaling ACR). Memory is excluded (self-regulation's job).
36
37use std::collections::HashMap;
38
39use cel::{ExecutionError, Program, Value};
40use parking_lot::Mutex;
41use serde_json::json;
42
43use super::config::ScalingEngineConfig;
44use super::transport_pressure::{
45    PressureTargets, ScalingTransport, TransportSignals, inbound_pressure, outbound_pressure,
46};
47
48/// One configured pressure output. `program` is `None` for the rustlib smart
49/// default (computed in Rust) and for any user expression that failed to
50/// compile/validate (it falls back to the smart default, loudly).
51struct CompiledPressure {
52    name: String,
53    program: Option<Program>,
54    enabled: bool,
55}
56
57/// The CEL-over-local-metrics horizontal scaling-pressure engine.
58///
59/// Construct with [`ScalingEngine::new`], then call [`ScalingEngine::tick`] on
60/// the periodic scaling interval (NOT the data hot-path). Evaluation errors hold
61/// the last-good value (fail-safe), missing signals contribute 0 (never NaN).
62pub struct ScalingEngine {
63    #[cfg_attr(not(feature = "metrics"), allow(dead_code))]
64    namespace: String,
65    enabled: bool,
66    cpu_target: f64,
67    targets: PressureTargets,
68    inbound_kind: ScalingTransport,
69    outbound_kind: ScalingTransport,
70    params: std::collections::BTreeMap<String, f64>,
71    pressures: Vec<CompiledPressure>,
72    /// Last successfully-evaluated value per pressure name (fail-safe hold).
73    last_good: Mutex<HashMap<String, f64>>,
74}
75
76impl ScalingEngine {
77    /// Build the engine from config + the resolved inbound/outbound transport
78    /// kinds. Returns the engine plus a list of friendly, operator-facing
79    /// validation errors (each failing expression falls back to the smart
80    /// default; the caller should log these LOUDLY).
81    ///
82    /// `inbound`/`outbound` come from config (`scaling.transport.*`) or are
83    /// auto-derived by the runtime from the transports it built.
84    #[must_use]
85    pub fn new(
86        namespace: &str,
87        config: &ScalingEngineConfig,
88        inbound: ScalingTransport,
89        outbound: ScalingTransport,
90    ) -> (Self, Vec<String>) {
91        let targets = PressureTargets::from_params(&config.params);
92        let cpu_target = config.cpu_target();
93        let mut errors = Vec::new();
94
95        let pressures: Vec<CompiledPressure> = if config.pressures.is_empty() {
96            // No expressions configured -> the rustlib smart default.
97            vec![CompiledPressure {
98                name: "default".to_string(),
99                program: None,
100                enabled: true,
101            }]
102        } else {
103            config
104                .pressures
105                .iter()
106                .map(|p| {
107                    let program = if p.enabled {
108                        match compile_and_check(&p.expression, &config.params) {
109                            Ok(prog) => Some(prog),
110                            Err(msg) => {
111                                errors.push(format!(
112                                    "scaling pressure '{}' is invalid -- falling back to the \
113                                     rustlib smart default. {msg}",
114                                    p.name
115                                ));
116                                None
117                            }
118                        }
119                    } else {
120                        None
121                    };
122                    CompiledPressure {
123                        name: p.name.clone(),
124                        program,
125                        enabled: p.enabled,
126                    }
127                })
128                .collect()
129        };
130
131        // Duplicate names would collide on one gauge series + share a last_good
132        // slot -- flag at load, consistent with the rest of the validation.
133        {
134            let mut seen = std::collections::HashSet::new();
135            for p in &pressures {
136                if !seen.insert(p.name.as_str()) {
137                    errors.push(format!(
138                        "duplicate scaling pressure name '{}' -- names must be unique",
139                        p.name
140                    ));
141                }
142            }
143        }
144
145        let engine = Self {
146            namespace: namespace.to_string(),
147            enabled: config.enabled,
148            cpu_target,
149            targets,
150            inbound_kind: inbound,
151            outbound_kind: outbound,
152            params: config.params.clone(),
153            pressures,
154            last_good: Mutex::new(HashMap::new()),
155        };
156        (engine, errors)
157    }
158
159    /// Whether the engine is enabled.
160    #[must_use]
161    pub fn is_enabled(&self) -> bool {
162        self.enabled
163    }
164
165    /// The resolved inbound transport kind (drives the compound inbound term).
166    #[must_use]
167    pub fn inbound_kind(&self) -> ScalingTransport {
168        self.inbound_kind
169    }
170
171    /// The resolved outbound transport kind.
172    #[must_use]
173    pub fn outbound_kind(&self) -> ScalingTransport {
174        self.outbound_kind
175    }
176
177    /// The rustlib context-aware smart default (0-100). Pure; no CEL.
178    #[must_use]
179    fn smart_default(&self, cpu_ratio: f64, inbound: f64, circuit_open: bool) -> f64 {
180        if circuit_open {
181            return 0.0;
182        }
183        let cpu_term = if self.cpu_target > 0.0 {
184            cpu_ratio / self.cpu_target
185        } else {
186            0.0
187        };
188        let composite = cpu_term.max(inbound);
189        100.0 * composite.clamp(0.0, 1.0)
190    }
191
192    /// Evaluate all configured pressures for the current tick.
193    ///
194    /// Returns `(name, value)` per enabled pressure. Pure (no metric emission);
195    /// [`tick`](Self::tick) wraps this and publishes the gauges.
196    #[must_use]
197    pub fn evaluate(
198        &self,
199        signals: &TransportSignals,
200        cpu_ratio: f64,
201        memory_ratio: f64,
202    ) -> Vec<(String, f64)> {
203        let inbound = inbound_pressure(self.inbound_kind, signals, &self.targets);
204        let outbound = outbound_pressure(signals, &self.targets);
205
206        // Build the CEL context ONCE per tick (only if a pressure needs it).
207        let ctx = if self
208            .pressures
209            .iter()
210            .any(|p| p.enabled && p.program.is_some())
211        {
212            Some(self.eval_context(signals, cpu_ratio, inbound, outbound, memory_ratio))
213        } else {
214            None
215        };
216
217        let mut out = Vec::with_capacity(self.pressures.len());
218        for p in &self.pressures {
219            if !p.enabled {
220                continue;
221            }
222            let value = match &p.program {
223                // Smart default (no expression, or failed compile).
224                None => self.smart_default(cpu_ratio, inbound, signals.circuit_open),
225                Some(program) => {
226                    let evaluated = ctx.as_ref().and_then(|m| eval_program(program, m));
227                    match evaluated {
228                        Some(v) if v.is_finite() => {
229                            self.last_good.lock().insert(p.name.clone(), v);
230                            v
231                        }
232                        // Eval error / non-numeric -> hold last-good, else fall
233                        // back to the smart default (fail-safe; never panic/NaN).
234                        _ => self
235                            .last_good
236                            .lock()
237                            .get(&p.name)
238                            .copied()
239                            .unwrap_or_else(|| {
240                                self.smart_default(cpu_ratio, inbound, signals.circuit_open)
241                            }),
242                    }
243                }
244            };
245            out.push((p.name.clone(), value));
246        }
247        out
248    }
249
250    /// Evaluate and publish the gauges for this tick.
251    ///
252    /// Emits `{ns}_scaling_pressure{name=...}` per pressure, plus the gratis
253    /// compound `{ns}_transport_inbound_pressure_ratio` /
254    /// `{ns}_transport_outbound_pressure_ratio` and `{ns}_scaling_circuit_open`.
255    #[allow(unused_variables)]
256    pub fn tick(&self, signals: &TransportSignals, cpu_ratio: f64, memory_ratio: f64) {
257        if !self.enabled {
258            return;
259        }
260        let inbound = inbound_pressure(self.inbound_kind, signals, &self.targets);
261        let outbound = outbound_pressure(signals, &self.targets);
262        let values = self.evaluate(signals, cpu_ratio, memory_ratio);
263
264        #[cfg(feature = "metrics")]
265        {
266            let ns = &self.namespace;
267            for (name, value) in &values {
268                metrics::gauge!(format!("{ns}_scaling_pressure"), "name" => name.clone())
269                    .set(*value);
270            }
271            // Gratis compound transport pressure (IN and OUT) -- observability +
272            // KEDA-direct. Ratios floored at 0, unclamped above for proportional
273            // scale-up.
274            metrics::gauge!(format!("{ns}_transport_inbound_pressure_ratio")).set(inbound);
275            metrics::gauge!(format!("{ns}_transport_outbound_pressure_ratio")).set(outbound);
276            // 0/1 state gauge (NOT a bool type) -- the only default gate.
277            metrics::gauge!(format!("{ns}_scaling_circuit_open")).set(if signals.circuit_open {
278                1.0
279            } else {
280                0.0
281            });
282        }
283    }
284
285    /// Build the CEL evaluation context (derived vars + `params` + `metrics`).
286    fn eval_context(
287        &self,
288        signals: &TransportSignals,
289        cpu_ratio: f64,
290        inbound: f64,
291        outbound: f64,
292        memory_ratio: f64,
293    ) -> serde_json::Map<String, serde_json::Value> {
294        let mut m = serde_json::Map::new();
295        m.insert("cpu_utilisation_ratio".into(), json!(cpu_ratio));
296        m.insert("circuit_open".into(), json!(signals.circuit_open));
297        m.insert("transport_inbound_pressure_ratio".into(), json!(inbound));
298        m.insert("transport_outbound_pressure_ratio".into(), json!(outbound));
299        m.insert("memory_ratio".into(), json!(memory_ratio));
300
301        let params: serde_json::Map<String, serde_json::Value> = self
302            .params
303            .iter()
304            .map(|(k, v)| (k.clone(), json!(v)))
305            .collect();
306        m.insert("params".into(), serde_json::Value::Object(params));
307
308        m.insert(
309            "metrics".into(),
310            serde_json::Value::Object(signal_metrics(signals)),
311        );
312
313        // App-pushed DOMAIN signals under a DEDICATED `custom` map (so
314        // expressions use `custom.<name>`), SEPARATE from the strict `metrics`
315        // map of fixed transport signals. Names are not known at config-load,
316        // so this map is whatever the app has pushed via `set_custom`.
317        let custom: serde_json::Map<String, serde_json::Value> = signals
318            .custom
319            .iter()
320            .map(|(k, v)| (k.clone(), json!(v)))
321            .collect();
322        m.insert("custom".into(), serde_json::Value::Object(custom));
323        m
324    }
325
326    /// Operator-facing list of identifiers available to expressions.
327    #[must_use]
328    pub fn available_surface(&self) -> String {
329        let params: Vec<&str> = self.params.keys().map(String::as_str).collect();
330        format!(
331            "top-level: cpu_utilisation_ratio, circuit_open, \
332             transport_inbound_pressure_ratio, transport_outbound_pressure_ratio, memory_ratio; \
333             params.{{{}}}; metrics.{{kafka_assigned_lag, redis_pending, inflight, shed_rate, \
334             send_backpressure_rate, refused_rate, produce_queue_depth}}; \
335             custom.<app-pushed domain signals, validated at runtime not load>",
336            params.join(", ")
337        )
338    }
339}
340
341/// The curated `metrics` map exposed to expressions (the scaling-relevant local
342/// signals, by name). Absent signals are simply omitted.
343fn signal_metrics(s: &TransportSignals) -> serde_json::Map<String, serde_json::Value> {
344    let mut m = serde_json::Map::new();
345    let mut put = |k: &str, v: Option<f64>| {
346        if let Some(v) = v {
347            m.insert(k.to_string(), json!(v));
348        }
349    };
350    put("kafka_assigned_lag", s.kafka_assigned_lag);
351    put("redis_pending", s.redis_pending);
352    put("inflight", s.inflight);
353    put("shed_rate", s.shed_rate);
354    put("send_backpressure_rate", s.send_backpressure_rate);
355    put("refused_rate", s.refused_rate);
356    put("produce_queue_depth", s.produce_queue_depth);
357    m
358}
359
360/// Compile a user expression and dry-run it against a representative context so
361/// unknown identifiers / type errors surface at LOAD, not first tick. Returns a
362/// friendly error string on a HARD failure.
363///
364/// ## Validation contract (custom-signal aware)
365///
366/// `custom.<name>` domain signals are pushed by the app at RUNTIME and so are
367/// NOT present in the load-time dry-run context. We therefore distinguish (via
368/// the typed `cel::ExecutionError`):
369///
370/// - `Program::compile` failure (syntax) -> HARD reject.
371/// - dry-run `NoSuchKey` (a missing MAP key, e.g. `custom.<name>` not yet
372///   pushed, or any not-yet-present `metrics.*`/`params.*`) -> DOWNGRADE to a
373///   load `warn!` and KEEP the program. The runtime guard falls back to
374///   last-good / smart-default if it really is broken at tick time.
375/// - dry-run `UndeclaredReference` (an unknown TOP-LEVEL identifier -- a typo in
376///   a closed set) -> HARD reject.
377/// - any other eval error (type mismatch, bad overload, ...) or a non-numeric
378///   result -> HARD reject.
379fn compile_and_check(
380    expr: &str,
381    params: &std::collections::BTreeMap<String, f64>,
382) -> Result<Program, String> {
383    let program = Program::compile(expr).map_err(|e| format!("compile error: {e}"))?;
384
385    // Representative zero-context: all derived vars present, every param key,
386    // and the full metrics surface, so a reference to a KNOWN name succeeds and
387    // only genuine typos/type errors fail. `custom.*` is deliberately NOT
388    // pre-populated -- it is runtime-validated (see the contract above).
389    let mut m = serde_json::Map::new();
390    m.insert("cpu_utilisation_ratio".into(), json!(0.0));
391    m.insert("circuit_open".into(), json!(false));
392    m.insert("transport_inbound_pressure_ratio".into(), json!(0.0));
393    m.insert("transport_outbound_pressure_ratio".into(), json!(0.0));
394    m.insert("memory_ratio".into(), json!(0.0));
395    let pmap: serde_json::Map<String, serde_json::Value> =
396        params.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
397    m.insert("params".into(), serde_json::Value::Object(pmap));
398    let mut metrics = serde_json::Map::new();
399    for k in [
400        "kafka_assigned_lag",
401        "redis_pending",
402        "inflight",
403        "shed_rate",
404        "send_backpressure_rate",
405        "refused_rate",
406        "produce_queue_depth",
407    ] {
408        metrics.insert(k.to_string(), json!(0.0));
409    }
410    m.insert("metrics".into(), serde_json::Value::Object(metrics));
411    // An EMPTY custom map: a reference to `custom.<name>` produces a `NoSuchKey`,
412    // which we downgrade below (apps push the real keys at runtime).
413    m.insert(
414        "custom".into(),
415        serde_json::Value::Object(serde_json::Map::new()),
416    );
417
418    let surface = || {
419        format!(
420            "Available -- top-level: cpu_utilisation_ratio, circuit_open, \
421             transport_inbound_pressure_ratio, transport_outbound_pressure_ratio, memory_ratio; \
422             params.{{{}}}; metrics.{{kafka_assigned_lag, redis_pending, inflight, shed_rate, \
423             send_backpressure_rate, refused_rate, produce_queue_depth}}; \
424             custom.<app-pushed at runtime>",
425            params.keys().cloned().collect::<Vec<_>>().join(", ")
426        )
427    };
428
429    // Build the dry-run context, then execute capturing the TYPED execution
430    // error so we can branch on `NoSuchKey` (a missing map key -- downgrade) vs
431    // everything else (hard reject). A `build_context` failure is itself a hard
432    // reject (it realistically never errs for these JSON maps).
433    let ctx = crate::expression::build_context(m.iter())
434        .map_err(|e| format!("context build error: {e}. {}", surface()))?;
435
436    match program.execute(&ctx) {
437        Ok(Value::Float(_) | Value::Int(_) | Value::UInt(_)) => Ok(program),
438        Ok(other) => Err(format!(
439            "expression must evaluate to a number, got {other:?}"
440        )),
441        // A missing MAP key -- almost always a `custom.<name>` the app pushes at
442        // runtime (or a not-yet-present metrics/params field). Do NOT hard-reject:
443        // warn and keep the program; the runtime guard is the safety net.
444        Err(ExecutionError::NoSuchKey(key)) => {
445            tracing::warn!(
446                missing_key = %key,
447                expression = expr,
448                "scaling pressure references a map key not present at load (likely a \
449                 custom.<name> domain signal pushed at runtime) -- keeping the expression; \
450                 it will be validated on each scaling tick and fall back to the smart \
451                 default if it errors."
452            );
453            Ok(program)
454        }
455        // Unknown TOP-LEVEL identifier (a typo in the closed set) -> hard reject;
456        // ditto any other eval error (type mismatch, bad overload, ...).
457        Err(e) => Err(format!("evaluation error: {e}. {}", surface())),
458    }
459}
460
461/// Execute a compiled program against a JSON context map; returns the f64 value
462/// (coercing int/uint/bool) or `None` on any error / non-numeric result.
463fn eval_program(
464    program: &Program,
465    map: &serde_json::Map<String, serde_json::Value>,
466) -> Option<f64> {
467    let ctx = crate::expression::build_context(map.iter()).ok()?;
468    match program.execute(&ctx).ok()? {
469        Value::Float(f) => Some(f),
470        Value::Int(i) => Some(i as f64),
471        Value::UInt(u) => Some(u as f64),
472        Value::Bool(b) => Some(if b { 1.0 } else { 0.0 }),
473        _ => None,
474    }
475}
476
477#[cfg(test)]
478mod tests {
479    use super::*;
480    use crate::scaling::config::{PressureExpr, ScalingEngineConfig};
481
482    fn cfg(pressures: Vec<PressureExpr>, params: &[(&str, f64)]) -> ScalingEngineConfig {
483        let mut c = ScalingEngineConfig {
484            pressures,
485            ..Default::default()
486        };
487        for (k, v) in params {
488            c.params.insert((*k).to_string(), *v);
489        }
490        c
491    }
492
493    #[test]
494    fn smart_default_cpu_only_when_no_transport() {
495        let (eng, errs) = ScalingEngine::new(
496            "t",
497            &cfg(vec![], &[("cpu_target", 0.70)]),
498            ScalingTransport::File, // non-scalable inbound
499            ScalingTransport::Kafka,
500        );
501        assert!(errs.is_empty());
502        // CPU at target -> 100 * (0.70/0.70) = 100.
503        let v = eng.evaluate(&TransportSignals::default(), 0.70, 0.0);
504        assert_eq!(v.len(), 1);
505        assert_eq!(v[0].0, "default");
506        assert!((v[0].1 - 100.0).abs() < 1e-6);
507    }
508
509    #[test]
510    fn smart_default_takes_max_of_cpu_and_inbound_kafka() {
511        let (eng, _) = ScalingEngine::new(
512            "t",
513            &cfg(vec![], &[("cpu_target", 0.70), ("lag_target", 100_000.0)]),
514            ScalingTransport::Kafka,
515            ScalingTransport::Kafka,
516        );
517        // CPU 0.35 -> cpu_term 0.5; lag 80k/100k = 0.8 -> max 0.8 -> 80.
518        let s = TransportSignals {
519            kafka_assigned_lag: Some(80_000.0),
520            ..Default::default()
521        };
522        let v = eng.evaluate(&s, 0.35, 0.0);
523        assert!((v[0].1 - 80.0).abs() < 1e-6);
524    }
525
526    #[test]
527    fn circuit_open_gates_to_zero() {
528        let (eng, _) = ScalingEngine::new(
529            "t",
530            &cfg(vec![], &[("cpu_target", 0.70), ("lag_target", 1.0)]),
531            ScalingTransport::Kafka,
532            ScalingTransport::Kafka,
533        );
534        let s = TransportSignals {
535            kafka_assigned_lag: Some(1_000_000.0),
536            circuit_open: true,
537            ..Default::default()
538        };
539        assert!(eng.evaluate(&s, 0.99, 0.0)[0].1.abs() < f64::EPSILON);
540    }
541
542    #[test]
543    fn user_expression_evaluated() {
544        let p = PressureExpr {
545            name: "cpu".into(),
546            expression: "cpu_utilisation_ratio * 100.0".into(),
547            enabled: true,
548        };
549        let (eng, errs) = ScalingEngine::new(
550            "t",
551            &cfg(vec![p], &[]),
552            ScalingTransport::Kafka,
553            ScalingTransport::Kafka,
554        );
555        assert!(errs.is_empty(), "errors: {errs:?}");
556        let v = eng.evaluate(&TransportSignals::default(), 0.42, 0.0);
557        assert_eq!(v.len(), 1);
558        assert_eq!(v[0].0, "cpu");
559        assert!((v[0].1 - 42.0).abs() < 1e-6);
560    }
561
562    #[test]
563    fn user_expression_can_read_params_and_metrics() {
564        let p = PressureExpr {
565            name: "lag".into(),
566            expression: "metrics.kafka_assigned_lag / params.lag_target".into(),
567            enabled: true,
568        };
569        let (eng, errs) = ScalingEngine::new(
570            "t",
571            &cfg(vec![p], &[("lag_target", 1000.0)]),
572            ScalingTransport::Kafka,
573            ScalingTransport::Kafka,
574        );
575        assert!(errs.is_empty(), "errors: {errs:?}");
576        let s = TransportSignals {
577            kafka_assigned_lag: Some(500.0),
578            ..Default::default()
579        };
580        assert!((eng.evaluate(&s, 0.0, 0.0)[0].1 - 0.5).abs() < 1e-6);
581    }
582
583    #[test]
584    fn custom_domain_signal_flows_end_to_end() {
585        // An app pushes a DOMAIN signal at runtime; a pressure expression scales
586        // on it under the dedicated `custom` map. The custom name is NOT known at
587        // config-load, so it must NOT be a hard load error (warn-and-keep), and
588        // it must evaluate to the expected value once the signal is present.
589        let p = PressureExpr {
590            name: "ch".into(),
591            expression: "custom.clickhouse_backlog / params.ch_target".into(),
592            enabled: true,
593        };
594        let (eng, errs) = ScalingEngine::new(
595            "t",
596            &cfg(vec![p], &[("ch_target", 1000.0)]),
597            ScalingTransport::File, // CPU-only smart default for the fallback
598            ScalingTransport::Kafka,
599        );
600        // custom.* is runtime-validated -> no hard load error.
601        assert!(errs.is_empty(), "custom.* must not hard-reject: {errs:?}");
602
603        // App pushes the signal: backlog 2500 / target 1000 = 2.5.
604        let mut signals = TransportSignals::default();
605        signals.custom.insert("clickhouse_backlog".into(), 2500.0);
606        let v = eng.evaluate(&signals, 0.0, 0.0);
607        assert_eq!(v.len(), 1);
608        assert_eq!(v[0].0, "ch");
609        assert!(
610            (v[0].1 - 2.5).abs() < 1e-9,
611            "custom signal should flow end-to-end, got {}",
612            v[0].1
613        );
614    }
615
616    #[test]
617    fn custom_signal_absent_at_runtime_falls_back() {
618        // Same expression, but the app never pushes the signal -> the runtime
619        // eval errors (NoSuchKey) -> fall back to the smart default (no last-good
620        // yet). With File inbound + CPU 0.70/0.70 the smart default is 100.
621        let p = PressureExpr {
622            name: "ch".into(),
623            expression: "custom.never_pushed / params.ch_target".into(),
624            enabled: true,
625        };
626        let (eng, errs) = ScalingEngine::new(
627            "t",
628            &cfg(vec![p], &[("ch_target", 1000.0), ("cpu_target", 0.70)]),
629            ScalingTransport::File,
630            ScalingTransport::Kafka,
631        );
632        assert!(errs.is_empty(), "errors: {errs:?}");
633        let v = eng.evaluate(&TransportSignals::default(), 0.70, 0.0);
634        assert!(
635            (v[0].1 - 100.0).abs() < 1e-6,
636            "absent custom signal must fall back to smart default, got {}",
637            v[0].1
638        );
639    }
640
641    #[test]
642    fn syntax_error_falls_back_with_friendly_message() {
643        let p = PressureExpr {
644            name: "broken".into(),
645            expression: "cpu_utilisation_ratio +".into(), // syntax error
646            enabled: true,
647        };
648        let (eng, errs) = ScalingEngine::new(
649            "t",
650            &cfg(vec![p], &[("cpu_target", 0.70)]),
651            ScalingTransport::Kafka,
652            ScalingTransport::Kafka,
653        );
654        assert_eq!(errs.len(), 1);
655        assert!(errs[0].contains("broken"), "msg: {}", errs[0]);
656        // Falls back to smart default (still produces a value, no panic).
657        let v = eng.evaluate(&TransportSignals::default(), 0.70, 0.0);
658        assert!((v[0].1 - 100.0).abs() < 1e-6);
659    }
660
661    #[test]
662    fn unknown_identifier_caught_at_load() {
663        let p = PressureExpr {
664            name: "typo".into(),
665            expression: "cpu_utilisation_ratoi * 100".into(), // typo'd ident
666            enabled: true,
667        };
668        let (_eng, errs) = ScalingEngine::new(
669            "t",
670            &cfg(vec![p], &[]),
671            ScalingTransport::Kafka,
672            ScalingTransport::Kafka,
673        );
674        assert_eq!(errs.len(), 1, "should catch the unknown identifier at load");
675        assert!(errs[0].contains("typo"));
676    }
677
678    #[test]
679    fn multi_output_independent_gauges() {
680        let ps = vec![
681            PressureExpr {
682                name: "a".into(),
683                expression: "cpu_utilisation_ratio * 100.0".into(),
684                enabled: true,
685            },
686            PressureExpr {
687                name: "b".into(),
688                expression: "transport_inbound_pressure_ratio * 100.0".into(),
689                enabled: true,
690            },
691        ];
692        let (eng, errs) = ScalingEngine::new(
693            "t",
694            &cfg(ps, &[("lag_target", 100.0)]),
695            ScalingTransport::Kafka,
696            ScalingTransport::Kafka,
697        );
698        assert!(errs.is_empty(), "errors: {errs:?}");
699        let s = TransportSignals {
700            kafka_assigned_lag: Some(50.0),
701            ..Default::default()
702        };
703        let v = eng.evaluate(&s, 0.30, 0.0);
704        assert_eq!(v.len(), 2);
705        assert!((v[0].1 - 30.0).abs() < 1e-6); // a: cpu
706        assert!((v[1].1 - 50.0).abs() < 1e-6); // b: inbound 50/100=0.5 *100
707    }
708}