1use std::collections::HashMap;
38
39use cel::{Program, Value};
40use parking_lot::Mutex;
41use serde_json::json;
42
43use super::config::ScalingEngineConfig;
44use super::transport_pressure::{
45 PressureTargets, ScalingTransport, TransportSignals, inbound_pressure, outbound_pressure,
46};
47
48struct CompiledPressure {
52 name: String,
53 program: Option<Program>,
54 enabled: bool,
55}
56
57pub struct ScalingEngine {
63 #[cfg_attr(not(feature = "metrics"), allow(dead_code))]
64 namespace: String,
65 enabled: bool,
66 cpu_target: f64,
67 targets: PressureTargets,
68 inbound_kind: ScalingTransport,
69 outbound_kind: ScalingTransport,
70 params: std::collections::BTreeMap<String, f64>,
71 pressures: Vec<CompiledPressure>,
72 last_good: Mutex<HashMap<String, f64>>,
74}
75
76impl ScalingEngine {
77 #[must_use]
85 pub fn new(
86 namespace: &str,
87 config: &ScalingEngineConfig,
88 inbound: ScalingTransport,
89 outbound: ScalingTransport,
90 ) -> (Self, Vec<String>) {
91 let targets = PressureTargets::from_params(&config.params);
92 let cpu_target = config.cpu_target();
93 let mut errors = Vec::new();
94
95 let pressures: Vec<CompiledPressure> = if config.pressures.is_empty() {
96 vec![CompiledPressure {
98 name: "default".to_string(),
99 program: None,
100 enabled: true,
101 }]
102 } else {
103 config
104 .pressures
105 .iter()
106 .map(|p| {
107 let program = if p.enabled {
108 match compile_and_check(&p.expression, &config.params) {
109 Ok(prog) => Some(prog),
110 Err(msg) => {
111 errors.push(format!(
112 "scaling pressure '{}' is invalid -- falling back to the \
113 rustlib smart default. {msg}",
114 p.name
115 ));
116 None
117 }
118 }
119 } else {
120 None
121 };
122 CompiledPressure {
123 name: p.name.clone(),
124 program,
125 enabled: p.enabled,
126 }
127 })
128 .collect()
129 };
130
131 {
134 let mut seen = std::collections::HashSet::new();
135 for p in &pressures {
136 if !seen.insert(p.name.as_str()) {
137 errors.push(format!(
138 "duplicate scaling pressure name '{}' -- names must be unique",
139 p.name
140 ));
141 }
142 }
143 }
144
145 let engine = Self {
146 namespace: namespace.to_string(),
147 enabled: config.enabled,
148 cpu_target,
149 targets,
150 inbound_kind: inbound,
151 outbound_kind: outbound,
152 params: config.params.clone(),
153 pressures,
154 last_good: Mutex::new(HashMap::new()),
155 };
156 (engine, errors)
157 }
158
159 #[must_use]
161 pub fn is_enabled(&self) -> bool {
162 self.enabled
163 }
164
165 #[must_use]
167 pub fn inbound_kind(&self) -> ScalingTransport {
168 self.inbound_kind
169 }
170
171 #[must_use]
173 pub fn outbound_kind(&self) -> ScalingTransport {
174 self.outbound_kind
175 }
176
177 #[must_use]
179 fn smart_default(&self, cpu_ratio: f64, inbound: f64, circuit_open: bool) -> f64 {
180 if circuit_open {
181 return 0.0;
182 }
183 let cpu_term = if self.cpu_target > 0.0 {
184 cpu_ratio / self.cpu_target
185 } else {
186 0.0
187 };
188 let composite = cpu_term.max(inbound);
189 100.0 * composite.clamp(0.0, 1.0)
190 }
191
192 #[must_use]
197 pub fn evaluate(
198 &self,
199 signals: &TransportSignals,
200 cpu_ratio: f64,
201 memory_ratio: f64,
202 ) -> Vec<(String, f64)> {
203 let inbound = inbound_pressure(self.inbound_kind, signals, &self.targets);
204 let outbound = outbound_pressure(signals, &self.targets);
205
206 let ctx = if self
208 .pressures
209 .iter()
210 .any(|p| p.enabled && p.program.is_some())
211 {
212 Some(self.eval_context(signals, cpu_ratio, inbound, outbound, memory_ratio))
213 } else {
214 None
215 };
216
217 let mut out = Vec::with_capacity(self.pressures.len());
218 for p in &self.pressures {
219 if !p.enabled {
220 continue;
221 }
222 let value = match &p.program {
223 None => self.smart_default(cpu_ratio, inbound, signals.circuit_open),
225 Some(program) => {
226 let evaluated = ctx.as_ref().and_then(|m| eval_program(program, m));
227 match evaluated {
228 Some(v) if v.is_finite() => {
229 self.last_good.lock().insert(p.name.clone(), v);
230 v
231 }
232 _ => self
235 .last_good
236 .lock()
237 .get(&p.name)
238 .copied()
239 .unwrap_or_else(|| {
240 self.smart_default(cpu_ratio, inbound, signals.circuit_open)
241 }),
242 }
243 }
244 };
245 out.push((p.name.clone(), value));
246 }
247 out
248 }
249
250 #[allow(unused_variables)]
256 pub fn tick(&self, signals: &TransportSignals, cpu_ratio: f64, memory_ratio: f64) {
257 if !self.enabled {
258 return;
259 }
260 let inbound = inbound_pressure(self.inbound_kind, signals, &self.targets);
261 let outbound = outbound_pressure(signals, &self.targets);
262 let values = self.evaluate(signals, cpu_ratio, memory_ratio);
263
264 #[cfg(feature = "metrics")]
265 {
266 let ns = &self.namespace;
267 for (name, value) in &values {
268 metrics::gauge!(format!("{ns}_scaling_pressure"), "name" => name.clone())
269 .set(*value);
270 }
271 metrics::gauge!(format!("{ns}_transport_inbound_pressure_ratio")).set(inbound);
275 metrics::gauge!(format!("{ns}_transport_outbound_pressure_ratio")).set(outbound);
276 metrics::gauge!(format!("{ns}_scaling_circuit_open")).set(if signals.circuit_open {
278 1.0
279 } else {
280 0.0
281 });
282 }
283 }
284
285 fn eval_context(
287 &self,
288 signals: &TransportSignals,
289 cpu_ratio: f64,
290 inbound: f64,
291 outbound: f64,
292 memory_ratio: f64,
293 ) -> serde_json::Map<String, serde_json::Value> {
294 let mut m = serde_json::Map::new();
295 m.insert("cpu_utilisation_ratio".into(), json!(cpu_ratio));
296 m.insert("circuit_open".into(), json!(signals.circuit_open));
297 m.insert("transport_inbound_pressure_ratio".into(), json!(inbound));
298 m.insert("transport_outbound_pressure_ratio".into(), json!(outbound));
299 m.insert("memory_ratio".into(), json!(memory_ratio));
300
301 let params: serde_json::Map<String, serde_json::Value> = self
302 .params
303 .iter()
304 .map(|(k, v)| (k.clone(), json!(v)))
305 .collect();
306 m.insert("params".into(), serde_json::Value::Object(params));
307
308 m.insert(
309 "metrics".into(),
310 serde_json::Value::Object(signal_metrics(signals)),
311 );
312 m
313 }
314
315 #[must_use]
317 pub fn available_surface(&self) -> String {
318 let params: Vec<&str> = self.params.keys().map(String::as_str).collect();
319 format!(
320 "top-level: cpu_utilisation_ratio, circuit_open, \
321 transport_inbound_pressure_ratio, transport_outbound_pressure_ratio, memory_ratio; \
322 params.{{{}}}; metrics.{{kafka_assigned_lag, redis_pending, inflight, shed_rate, \
323 send_backpressure_rate, refused_rate, produce_queue_depth}}",
324 params.join(", ")
325 )
326 }
327}
328
329fn signal_metrics(s: &TransportSignals) -> serde_json::Map<String, serde_json::Value> {
332 let mut m = serde_json::Map::new();
333 let mut put = |k: &str, v: Option<f64>| {
334 if let Some(v) = v {
335 m.insert(k.to_string(), json!(v));
336 }
337 };
338 put("kafka_assigned_lag", s.kafka_assigned_lag);
339 put("redis_pending", s.redis_pending);
340 put("inflight", s.inflight);
341 put("shed_rate", s.shed_rate);
342 put("send_backpressure_rate", s.send_backpressure_rate);
343 put("refused_rate", s.refused_rate);
344 put("produce_queue_depth", s.produce_queue_depth);
345 m
346}
347
348fn compile_and_check(
352 expr: &str,
353 params: &std::collections::BTreeMap<String, f64>,
354) -> Result<Program, String> {
355 let program = Program::compile(expr).map_err(|e| format!("compile error: {e}"))?;
356
357 let mut m = serde_json::Map::new();
361 m.insert("cpu_utilisation_ratio".into(), json!(0.0));
362 m.insert("circuit_open".into(), json!(false));
363 m.insert("transport_inbound_pressure_ratio".into(), json!(0.0));
364 m.insert("transport_outbound_pressure_ratio".into(), json!(0.0));
365 m.insert("memory_ratio".into(), json!(0.0));
366 let pmap: serde_json::Map<String, serde_json::Value> =
367 params.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
368 m.insert("params".into(), serde_json::Value::Object(pmap));
369 let mut metrics = serde_json::Map::new();
370 for k in [
371 "kafka_assigned_lag",
372 "redis_pending",
373 "inflight",
374 "shed_rate",
375 "send_backpressure_rate",
376 "refused_rate",
377 "produce_queue_depth",
378 ] {
379 metrics.insert(k.to_string(), json!(0.0));
380 }
381 m.insert("metrics".into(), serde_json::Value::Object(metrics));
382
383 match eval_program_checked(&program, &m) {
384 Ok(Value::Float(_) | Value::Int(_) | Value::UInt(_)) => Ok(program),
385 Ok(other) => Err(format!(
386 "expression must evaluate to a number, got {other:?}"
387 )),
388 Err(e) => Err(format!(
389 "evaluation error: {e}. Available -- top-level: cpu_utilisation_ratio, \
390 circuit_open, transport_inbound_pressure_ratio, \
391 transport_outbound_pressure_ratio, memory_ratio; params.{{{}}}; \
392 metrics.{{kafka_assigned_lag, redis_pending, inflight, shed_rate, \
393 send_backpressure_rate, refused_rate, produce_queue_depth}}",
394 params.keys().cloned().collect::<Vec<_>>().join(", ")
395 )),
396 }
397}
398
399fn eval_program(
402 program: &Program,
403 map: &serde_json::Map<String, serde_json::Value>,
404) -> Option<f64> {
405 let ctx = crate::expression::build_context(map.iter()).ok()?;
406 match program.execute(&ctx).ok()? {
407 Value::Float(f) => Some(f),
408 Value::Int(i) => Some(i as f64),
409 Value::UInt(u) => Some(u as f64),
410 Value::Bool(b) => Some(if b { 1.0 } else { 0.0 }),
411 _ => None,
412 }
413}
414
415fn eval_program_checked(
417 program: &Program,
418 map: &serde_json::Map<String, serde_json::Value>,
419) -> Result<Value, String> {
420 let ctx = crate::expression::build_context(map.iter()).map_err(|e| format!("{e}"))?;
421 program.execute(&ctx).map_err(|e| format!("{e}"))
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427 use crate::scaling::config::{PressureExpr, ScalingEngineConfig};
428
429 fn cfg(pressures: Vec<PressureExpr>, params: &[(&str, f64)]) -> ScalingEngineConfig {
430 let mut c = ScalingEngineConfig {
431 pressures,
432 ..Default::default()
433 };
434 for (k, v) in params {
435 c.params.insert((*k).to_string(), *v);
436 }
437 c
438 }
439
440 #[test]
441 fn smart_default_cpu_only_when_no_transport() {
442 let (eng, errs) = ScalingEngine::new(
443 "t",
444 &cfg(vec![], &[("cpu_target", 0.70)]),
445 ScalingTransport::File, ScalingTransport::Kafka,
447 );
448 assert!(errs.is_empty());
449 let v = eng.evaluate(&TransportSignals::default(), 0.70, 0.0);
451 assert_eq!(v.len(), 1);
452 assert_eq!(v[0].0, "default");
453 assert!((v[0].1 - 100.0).abs() < 1e-6);
454 }
455
456 #[test]
457 fn smart_default_takes_max_of_cpu_and_inbound_kafka() {
458 let (eng, _) = ScalingEngine::new(
459 "t",
460 &cfg(vec![], &[("cpu_target", 0.70), ("lag_target", 100_000.0)]),
461 ScalingTransport::Kafka,
462 ScalingTransport::Kafka,
463 );
464 let s = TransportSignals {
466 kafka_assigned_lag: Some(80_000.0),
467 ..Default::default()
468 };
469 let v = eng.evaluate(&s, 0.35, 0.0);
470 assert!((v[0].1 - 80.0).abs() < 1e-6);
471 }
472
473 #[test]
474 fn circuit_open_gates_to_zero() {
475 let (eng, _) = ScalingEngine::new(
476 "t",
477 &cfg(vec![], &[("cpu_target", 0.70), ("lag_target", 1.0)]),
478 ScalingTransport::Kafka,
479 ScalingTransport::Kafka,
480 );
481 let s = TransportSignals {
482 kafka_assigned_lag: Some(1_000_000.0),
483 circuit_open: true,
484 ..Default::default()
485 };
486 assert!(eng.evaluate(&s, 0.99, 0.0)[0].1.abs() < f64::EPSILON);
487 }
488
489 #[test]
490 fn user_expression_evaluated() {
491 let p = PressureExpr {
492 name: "cpu".into(),
493 expression: "cpu_utilisation_ratio * 100.0".into(),
494 enabled: true,
495 };
496 let (eng, errs) = ScalingEngine::new(
497 "t",
498 &cfg(vec![p], &[]),
499 ScalingTransport::Kafka,
500 ScalingTransport::Kafka,
501 );
502 assert!(errs.is_empty(), "errors: {errs:?}");
503 let v = eng.evaluate(&TransportSignals::default(), 0.42, 0.0);
504 assert_eq!(v.len(), 1);
505 assert_eq!(v[0].0, "cpu");
506 assert!((v[0].1 - 42.0).abs() < 1e-6);
507 }
508
509 #[test]
510 fn user_expression_can_read_params_and_metrics() {
511 let p = PressureExpr {
512 name: "lag".into(),
513 expression: "metrics.kafka_assigned_lag / params.lag_target".into(),
514 enabled: true,
515 };
516 let (eng, errs) = ScalingEngine::new(
517 "t",
518 &cfg(vec![p], &[("lag_target", 1000.0)]),
519 ScalingTransport::Kafka,
520 ScalingTransport::Kafka,
521 );
522 assert!(errs.is_empty(), "errors: {errs:?}");
523 let s = TransportSignals {
524 kafka_assigned_lag: Some(500.0),
525 ..Default::default()
526 };
527 assert!((eng.evaluate(&s, 0.0, 0.0)[0].1 - 0.5).abs() < 1e-6);
528 }
529
530 #[test]
531 fn syntax_error_falls_back_with_friendly_message() {
532 let p = PressureExpr {
533 name: "broken".into(),
534 expression: "cpu_utilisation_ratio +".into(), enabled: true,
536 };
537 let (eng, errs) = ScalingEngine::new(
538 "t",
539 &cfg(vec![p], &[("cpu_target", 0.70)]),
540 ScalingTransport::Kafka,
541 ScalingTransport::Kafka,
542 );
543 assert_eq!(errs.len(), 1);
544 assert!(errs[0].contains("broken"), "msg: {}", errs[0]);
545 let v = eng.evaluate(&TransportSignals::default(), 0.70, 0.0);
547 assert!((v[0].1 - 100.0).abs() < 1e-6);
548 }
549
550 #[test]
551 fn unknown_identifier_caught_at_load() {
552 let p = PressureExpr {
553 name: "typo".into(),
554 expression: "cpu_utilisation_ratoi * 100".into(), enabled: true,
556 };
557 let (_eng, errs) = ScalingEngine::new(
558 "t",
559 &cfg(vec![p], &[]),
560 ScalingTransport::Kafka,
561 ScalingTransport::Kafka,
562 );
563 assert_eq!(errs.len(), 1, "should catch the unknown identifier at load");
564 assert!(errs[0].contains("typo"));
565 }
566
567 #[test]
568 fn multi_output_independent_gauges() {
569 let ps = vec![
570 PressureExpr {
571 name: "a".into(),
572 expression: "cpu_utilisation_ratio * 100.0".into(),
573 enabled: true,
574 },
575 PressureExpr {
576 name: "b".into(),
577 expression: "transport_inbound_pressure_ratio * 100.0".into(),
578 enabled: true,
579 },
580 ];
581 let (eng, errs) = ScalingEngine::new(
582 "t",
583 &cfg(ps, &[("lag_target", 100.0)]),
584 ScalingTransport::Kafka,
585 ScalingTransport::Kafka,
586 );
587 assert!(errs.is_empty(), "errors: {errs:?}");
588 let s = TransportSignals {
589 kafka_assigned_lag: Some(50.0),
590 ..Default::default()
591 };
592 let v = eng.evaluate(&s, 0.30, 0.0);
593 assert_eq!(v.len(), 2);
594 assert!((v[0].1 - 30.0).abs() < 1e-6); assert!((v[1].1 - 50.0).abs() < 1e-6); }
597}