1use std::collections::HashMap;
38
39use cel::{ExecutionError, Program, Value};
40use parking_lot::Mutex;
41use serde_json::json;
42
43use super::config::ScalingEngineConfig;
44use super::transport_pressure::{
45 PressureTargets, ScalingTransport, TransportSignals, inbound_pressure, outbound_pressure,
46};
47
48struct CompiledPressure {
52 name: String,
53 program: Option<Program>,
54 enabled: bool,
55}
56
57pub struct ScalingEngine {
63 #[cfg_attr(not(feature = "metrics"), allow(dead_code))]
64 namespace: String,
65 enabled: bool,
66 cpu_target: f64,
67 targets: PressureTargets,
68 inbound_kind: ScalingTransport,
69 outbound_kind: ScalingTransport,
70 params: std::collections::BTreeMap<String, f64>,
71 pressures: Vec<CompiledPressure>,
72 last_good: Mutex<HashMap<String, f64>>,
74}
75
76impl ScalingEngine {
77 #[must_use]
85 pub fn new(
86 namespace: &str,
87 config: &ScalingEngineConfig,
88 inbound: ScalingTransport,
89 outbound: ScalingTransport,
90 ) -> (Self, Vec<String>) {
91 let targets = PressureTargets::from_params(&config.params);
92 let cpu_target = config.cpu_target();
93 let mut errors = Vec::new();
94
95 let pressures: Vec<CompiledPressure> = if config.pressures.is_empty() {
96 vec![CompiledPressure {
98 name: "default".to_string(),
99 program: None,
100 enabled: true,
101 }]
102 } else {
103 config
104 .pressures
105 .iter()
106 .map(|p| {
107 let program = if p.enabled {
108 match compile_and_check(&p.expression, &config.params) {
109 Ok(prog) => Some(prog),
110 Err(msg) => {
111 errors.push(format!(
112 "scaling pressure '{}' is invalid -- falling back to the \
113 rustlib smart default. {msg}",
114 p.name
115 ));
116 None
117 }
118 }
119 } else {
120 None
121 };
122 CompiledPressure {
123 name: p.name.clone(),
124 program,
125 enabled: p.enabled,
126 }
127 })
128 .collect()
129 };
130
131 {
134 let mut seen = std::collections::HashSet::new();
135 for p in &pressures {
136 if !seen.insert(p.name.as_str()) {
137 errors.push(format!(
138 "duplicate scaling pressure name '{}' -- names must be unique",
139 p.name
140 ));
141 }
142 }
143 }
144
145 let engine = Self {
146 namespace: namespace.to_string(),
147 enabled: config.enabled,
148 cpu_target,
149 targets,
150 inbound_kind: inbound,
151 outbound_kind: outbound,
152 params: config.params.clone(),
153 pressures,
154 last_good: Mutex::new(HashMap::new()),
155 };
156 (engine, errors)
157 }
158
159 #[must_use]
161 pub fn is_enabled(&self) -> bool {
162 self.enabled
163 }
164
165 #[must_use]
167 pub fn inbound_kind(&self) -> ScalingTransport {
168 self.inbound_kind
169 }
170
171 #[must_use]
173 pub fn outbound_kind(&self) -> ScalingTransport {
174 self.outbound_kind
175 }
176
177 #[must_use]
179 fn smart_default(&self, cpu_ratio: f64, inbound: f64, circuit_open: bool) -> f64 {
180 if circuit_open {
181 return 0.0;
182 }
183 let cpu_term = if self.cpu_target > 0.0 {
184 cpu_ratio / self.cpu_target
185 } else {
186 0.0
187 };
188 let composite = cpu_term.max(inbound);
189 100.0 * composite.clamp(0.0, 1.0)
190 }
191
192 #[must_use]
197 pub fn evaluate(
198 &self,
199 signals: &TransportSignals,
200 cpu_ratio: f64,
201 memory_ratio: f64,
202 ) -> Vec<(String, f64)> {
203 let inbound = inbound_pressure(self.inbound_kind, signals, &self.targets);
204 let outbound = outbound_pressure(signals, &self.targets);
205
206 let ctx = if self
208 .pressures
209 .iter()
210 .any(|p| p.enabled && p.program.is_some())
211 {
212 Some(self.eval_context(signals, cpu_ratio, inbound, outbound, memory_ratio))
213 } else {
214 None
215 };
216
217 let mut out = Vec::with_capacity(self.pressures.len());
218 for p in &self.pressures {
219 if !p.enabled {
220 continue;
221 }
222 let value = match &p.program {
223 None => self.smart_default(cpu_ratio, inbound, signals.circuit_open),
225 Some(program) => {
226 let evaluated = ctx.as_ref().and_then(|m| eval_program(program, m));
227 match evaluated {
228 Some(v) if v.is_finite() => {
229 self.last_good.lock().insert(p.name.clone(), v);
230 v
231 }
232 _ => self
235 .last_good
236 .lock()
237 .get(&p.name)
238 .copied()
239 .unwrap_or_else(|| {
240 self.smart_default(cpu_ratio, inbound, signals.circuit_open)
241 }),
242 }
243 }
244 };
245 out.push((p.name.clone(), value));
246 }
247 out
248 }
249
250 #[allow(unused_variables)]
256 pub fn tick(&self, signals: &TransportSignals, cpu_ratio: f64, memory_ratio: f64) {
257 if !self.enabled {
258 return;
259 }
260 let inbound = inbound_pressure(self.inbound_kind, signals, &self.targets);
261 let outbound = outbound_pressure(signals, &self.targets);
262 let values = self.evaluate(signals, cpu_ratio, memory_ratio);
263
264 #[cfg(feature = "metrics")]
265 {
266 let ns = &self.namespace;
267 for (name, value) in &values {
268 metrics::gauge!(format!("{ns}_scaling_pressure"), "name" => name.clone())
269 .set(*value);
270 }
271 metrics::gauge!(format!("{ns}_transport_inbound_pressure_ratio")).set(inbound);
275 metrics::gauge!(format!("{ns}_transport_outbound_pressure_ratio")).set(outbound);
276 metrics::gauge!(format!("{ns}_scaling_circuit_open")).set(if signals.circuit_open {
278 1.0
279 } else {
280 0.0
281 });
282 }
283 }
284
285 fn eval_context(
287 &self,
288 signals: &TransportSignals,
289 cpu_ratio: f64,
290 inbound: f64,
291 outbound: f64,
292 memory_ratio: f64,
293 ) -> serde_json::Map<String, serde_json::Value> {
294 let mut m = serde_json::Map::new();
295 m.insert("cpu_utilisation_ratio".into(), json!(cpu_ratio));
296 m.insert("circuit_open".into(), json!(signals.circuit_open));
297 m.insert("transport_inbound_pressure_ratio".into(), json!(inbound));
298 m.insert("transport_outbound_pressure_ratio".into(), json!(outbound));
299 m.insert("memory_ratio".into(), json!(memory_ratio));
300
301 let params: serde_json::Map<String, serde_json::Value> = self
302 .params
303 .iter()
304 .map(|(k, v)| (k.clone(), json!(v)))
305 .collect();
306 m.insert("params".into(), serde_json::Value::Object(params));
307
308 m.insert(
309 "metrics".into(),
310 serde_json::Value::Object(signal_metrics(signals)),
311 );
312
313 let custom: serde_json::Map<String, serde_json::Value> = signals
318 .custom
319 .iter()
320 .map(|(k, v)| (k.clone(), json!(v)))
321 .collect();
322 m.insert("custom".into(), serde_json::Value::Object(custom));
323 m
324 }
325
326 #[must_use]
328 pub fn available_surface(&self) -> String {
329 let params: Vec<&str> = self.params.keys().map(String::as_str).collect();
330 format!(
331 "top-level: cpu_utilisation_ratio, circuit_open, \
332 transport_inbound_pressure_ratio, transport_outbound_pressure_ratio, memory_ratio; \
333 params.{{{}}}; metrics.{{kafka_assigned_lag, redis_pending, inflight, shed_rate, \
334 send_backpressure_rate, refused_rate, produce_queue_depth}}; \
335 custom.<app-pushed domain signals, validated at runtime not load>",
336 params.join(", ")
337 )
338 }
339}
340
341fn signal_metrics(s: &TransportSignals) -> serde_json::Map<String, serde_json::Value> {
344 let mut m = serde_json::Map::new();
345 let mut put = |k: &str, v: Option<f64>| {
346 if let Some(v) = v {
347 m.insert(k.to_string(), json!(v));
348 }
349 };
350 put("kafka_assigned_lag", s.kafka_assigned_lag);
351 put("redis_pending", s.redis_pending);
352 put("inflight", s.inflight);
353 put("shed_rate", s.shed_rate);
354 put("send_backpressure_rate", s.send_backpressure_rate);
355 put("refused_rate", s.refused_rate);
356 put("produce_queue_depth", s.produce_queue_depth);
357 m
358}
359
360fn compile_and_check(
380 expr: &str,
381 params: &std::collections::BTreeMap<String, f64>,
382) -> Result<Program, String> {
383 let program = Program::compile(expr).map_err(|e| format!("compile error: {e}"))?;
384
385 let mut m = serde_json::Map::new();
390 m.insert("cpu_utilisation_ratio".into(), json!(0.0));
391 m.insert("circuit_open".into(), json!(false));
392 m.insert("transport_inbound_pressure_ratio".into(), json!(0.0));
393 m.insert("transport_outbound_pressure_ratio".into(), json!(0.0));
394 m.insert("memory_ratio".into(), json!(0.0));
395 let pmap: serde_json::Map<String, serde_json::Value> =
396 params.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
397 m.insert("params".into(), serde_json::Value::Object(pmap));
398 let mut metrics = serde_json::Map::new();
399 for k in [
400 "kafka_assigned_lag",
401 "redis_pending",
402 "inflight",
403 "shed_rate",
404 "send_backpressure_rate",
405 "refused_rate",
406 "produce_queue_depth",
407 ] {
408 metrics.insert(k.to_string(), json!(0.0));
409 }
410 m.insert("metrics".into(), serde_json::Value::Object(metrics));
411 m.insert(
414 "custom".into(),
415 serde_json::Value::Object(serde_json::Map::new()),
416 );
417
418 let surface = || {
419 format!(
420 "Available -- top-level: cpu_utilisation_ratio, circuit_open, \
421 transport_inbound_pressure_ratio, transport_outbound_pressure_ratio, memory_ratio; \
422 params.{{{}}}; metrics.{{kafka_assigned_lag, redis_pending, inflight, shed_rate, \
423 send_backpressure_rate, refused_rate, produce_queue_depth}}; \
424 custom.<app-pushed at runtime>",
425 params.keys().cloned().collect::<Vec<_>>().join(", ")
426 )
427 };
428
429 let ctx = crate::expression::build_context(m.iter())
434 .map_err(|e| format!("context build error: {e}. {}", surface()))?;
435
436 match program.execute(&ctx) {
437 Ok(Value::Float(_) | Value::Int(_) | Value::UInt(_)) => Ok(program),
438 Ok(other) => Err(format!(
439 "expression must evaluate to a number, got {other:?}"
440 )),
441 Err(ExecutionError::NoSuchKey(key)) => {
445 tracing::warn!(
446 missing_key = %key,
447 expression = expr,
448 "scaling pressure references a map key not present at load (likely a \
449 custom.<name> domain signal pushed at runtime) -- keeping the expression; \
450 it will be validated on each scaling tick and fall back to the smart \
451 default if it errors."
452 );
453 Ok(program)
454 }
455 Err(e) => Err(format!("evaluation error: {e}. {}", surface())),
458 }
459}
460
461fn eval_program(
464 program: &Program,
465 map: &serde_json::Map<String, serde_json::Value>,
466) -> Option<f64> {
467 let ctx = crate::expression::build_context(map.iter()).ok()?;
468 match program.execute(&ctx).ok()? {
469 Value::Float(f) => Some(f),
470 Value::Int(i) => Some(i as f64),
471 Value::UInt(u) => Some(u as f64),
472 Value::Bool(b) => Some(if b { 1.0 } else { 0.0 }),
473 _ => None,
474 }
475}
476
477#[cfg(test)]
478mod tests {
479 use super::*;
480 use crate::scaling::config::{PressureExpr, ScalingEngineConfig};
481
482 fn cfg(pressures: Vec<PressureExpr>, params: &[(&str, f64)]) -> ScalingEngineConfig {
483 let mut c = ScalingEngineConfig {
484 pressures,
485 ..Default::default()
486 };
487 for (k, v) in params {
488 c.params.insert((*k).to_string(), *v);
489 }
490 c
491 }
492
493 #[test]
494 fn smart_default_cpu_only_when_no_transport() {
495 let (eng, errs) = ScalingEngine::new(
496 "t",
497 &cfg(vec![], &[("cpu_target", 0.70)]),
498 ScalingTransport::File, ScalingTransport::Kafka,
500 );
501 assert!(errs.is_empty());
502 let v = eng.evaluate(&TransportSignals::default(), 0.70, 0.0);
504 assert_eq!(v.len(), 1);
505 assert_eq!(v[0].0, "default");
506 assert!((v[0].1 - 100.0).abs() < 1e-6);
507 }
508
509 #[test]
510 fn smart_default_takes_max_of_cpu_and_inbound_kafka() {
511 let (eng, _) = ScalingEngine::new(
512 "t",
513 &cfg(vec![], &[("cpu_target", 0.70), ("lag_target", 100_000.0)]),
514 ScalingTransport::Kafka,
515 ScalingTransport::Kafka,
516 );
517 let s = TransportSignals {
519 kafka_assigned_lag: Some(80_000.0),
520 ..Default::default()
521 };
522 let v = eng.evaluate(&s, 0.35, 0.0);
523 assert!((v[0].1 - 80.0).abs() < 1e-6);
524 }
525
526 #[test]
527 fn circuit_open_gates_to_zero() {
528 let (eng, _) = ScalingEngine::new(
529 "t",
530 &cfg(vec![], &[("cpu_target", 0.70), ("lag_target", 1.0)]),
531 ScalingTransport::Kafka,
532 ScalingTransport::Kafka,
533 );
534 let s = TransportSignals {
535 kafka_assigned_lag: Some(1_000_000.0),
536 circuit_open: true,
537 ..Default::default()
538 };
539 assert!(eng.evaluate(&s, 0.99, 0.0)[0].1.abs() < f64::EPSILON);
540 }
541
542 #[test]
543 fn user_expression_evaluated() {
544 let p = PressureExpr {
545 name: "cpu".into(),
546 expression: "cpu_utilisation_ratio * 100.0".into(),
547 enabled: true,
548 };
549 let (eng, errs) = ScalingEngine::new(
550 "t",
551 &cfg(vec![p], &[]),
552 ScalingTransport::Kafka,
553 ScalingTransport::Kafka,
554 );
555 assert!(errs.is_empty(), "errors: {errs:?}");
556 let v = eng.evaluate(&TransportSignals::default(), 0.42, 0.0);
557 assert_eq!(v.len(), 1);
558 assert_eq!(v[0].0, "cpu");
559 assert!((v[0].1 - 42.0).abs() < 1e-6);
560 }
561
562 #[test]
563 fn user_expression_can_read_params_and_metrics() {
564 let p = PressureExpr {
565 name: "lag".into(),
566 expression: "metrics.kafka_assigned_lag / params.lag_target".into(),
567 enabled: true,
568 };
569 let (eng, errs) = ScalingEngine::new(
570 "t",
571 &cfg(vec![p], &[("lag_target", 1000.0)]),
572 ScalingTransport::Kafka,
573 ScalingTransport::Kafka,
574 );
575 assert!(errs.is_empty(), "errors: {errs:?}");
576 let s = TransportSignals {
577 kafka_assigned_lag: Some(500.0),
578 ..Default::default()
579 };
580 assert!((eng.evaluate(&s, 0.0, 0.0)[0].1 - 0.5).abs() < 1e-6);
581 }
582
583 #[test]
584 fn custom_domain_signal_flows_end_to_end() {
585 let p = PressureExpr {
590 name: "ch".into(),
591 expression: "custom.clickhouse_backlog / params.ch_target".into(),
592 enabled: true,
593 };
594 let (eng, errs) = ScalingEngine::new(
595 "t",
596 &cfg(vec![p], &[("ch_target", 1000.0)]),
597 ScalingTransport::File, ScalingTransport::Kafka,
599 );
600 assert!(errs.is_empty(), "custom.* must not hard-reject: {errs:?}");
602
603 let mut signals = TransportSignals::default();
605 signals.custom.insert("clickhouse_backlog".into(), 2500.0);
606 let v = eng.evaluate(&signals, 0.0, 0.0);
607 assert_eq!(v.len(), 1);
608 assert_eq!(v[0].0, "ch");
609 assert!(
610 (v[0].1 - 2.5).abs() < 1e-9,
611 "custom signal should flow end-to-end, got {}",
612 v[0].1
613 );
614 }
615
616 #[test]
617 fn custom_signal_absent_at_runtime_falls_back() {
618 let p = PressureExpr {
622 name: "ch".into(),
623 expression: "custom.never_pushed / params.ch_target".into(),
624 enabled: true,
625 };
626 let (eng, errs) = ScalingEngine::new(
627 "t",
628 &cfg(vec![p], &[("ch_target", 1000.0), ("cpu_target", 0.70)]),
629 ScalingTransport::File,
630 ScalingTransport::Kafka,
631 );
632 assert!(errs.is_empty(), "errors: {errs:?}");
633 let v = eng.evaluate(&TransportSignals::default(), 0.70, 0.0);
634 assert!(
635 (v[0].1 - 100.0).abs() < 1e-6,
636 "absent custom signal must fall back to smart default, got {}",
637 v[0].1
638 );
639 }
640
641 #[test]
642 fn syntax_error_falls_back_with_friendly_message() {
643 let p = PressureExpr {
644 name: "broken".into(),
645 expression: "cpu_utilisation_ratio +".into(), enabled: true,
647 };
648 let (eng, errs) = ScalingEngine::new(
649 "t",
650 &cfg(vec![p], &[("cpu_target", 0.70)]),
651 ScalingTransport::Kafka,
652 ScalingTransport::Kafka,
653 );
654 assert_eq!(errs.len(), 1);
655 assert!(errs[0].contains("broken"), "msg: {}", errs[0]);
656 let v = eng.evaluate(&TransportSignals::default(), 0.70, 0.0);
658 assert!((v[0].1 - 100.0).abs() < 1e-6);
659 }
660
661 #[test]
662 fn unknown_identifier_caught_at_load() {
663 let p = PressureExpr {
664 name: "typo".into(),
665 expression: "cpu_utilisation_ratoi * 100".into(), enabled: true,
667 };
668 let (_eng, errs) = ScalingEngine::new(
669 "t",
670 &cfg(vec![p], &[]),
671 ScalingTransport::Kafka,
672 ScalingTransport::Kafka,
673 );
674 assert_eq!(errs.len(), 1, "should catch the unknown identifier at load");
675 assert!(errs[0].contains("typo"));
676 }
677
678 #[test]
679 fn multi_output_independent_gauges() {
680 let ps = vec![
681 PressureExpr {
682 name: "a".into(),
683 expression: "cpu_utilisation_ratio * 100.0".into(),
684 enabled: true,
685 },
686 PressureExpr {
687 name: "b".into(),
688 expression: "transport_inbound_pressure_ratio * 100.0".into(),
689 enabled: true,
690 },
691 ];
692 let (eng, errs) = ScalingEngine::new(
693 "t",
694 &cfg(ps, &[("lag_target", 100.0)]),
695 ScalingTransport::Kafka,
696 ScalingTransport::Kafka,
697 );
698 assert!(errs.is_empty(), "errors: {errs:?}");
699 let s = TransportSignals {
700 kafka_assigned_lag: Some(50.0),
701 ..Default::default()
702 };
703 let v = eng.evaluate(&s, 0.30, 0.0);
704 assert_eq!(v.len(), 2);
705 assert!((v[0].1 - 30.0).abs() < 1e-6); assert!((v[1].1 - 50.0).abs() < 1e-6); }
708}