use std::collections::HashMap;
use cel::{ExecutionError, Program, Value};
use parking_lot::Mutex;
use serde_json::json;
use super::config::ScalingEngineConfig;
use super::transport_pressure::{
PressureTargets, ScalingTransport, TransportSignals, inbound_pressure, outbound_pressure,
};
struct CompiledPressure {
name: String,
program: Option<Program>,
enabled: bool,
}
pub struct ScalingEngine {
#[cfg_attr(not(feature = "metrics"), allow(dead_code))]
namespace: String,
enabled: bool,
cpu_target: f64,
targets: PressureTargets,
inbound_kind: ScalingTransport,
outbound_kind: ScalingTransport,
params: std::collections::BTreeMap<String, f64>,
pressures: Vec<CompiledPressure>,
last_good: Mutex<HashMap<String, f64>>,
}
impl ScalingEngine {
#[must_use]
pub fn new(
namespace: &str,
config: &ScalingEngineConfig,
inbound: ScalingTransport,
outbound: ScalingTransport,
) -> (Self, Vec<String>) {
let targets = PressureTargets::from_params(&config.params);
let cpu_target = config.cpu_target();
let mut errors = Vec::new();
let pressures: Vec<CompiledPressure> = if config.pressures.is_empty() {
vec![CompiledPressure {
name: "default".to_string(),
program: None,
enabled: true,
}]
} else {
config
.pressures
.iter()
.map(|p| {
let program = if p.enabled {
match compile_and_check(&p.expression, &config.params) {
Ok(prog) => Some(prog),
Err(msg) => {
errors.push(format!(
"scaling pressure '{}' is invalid -- falling back to the \
rustlib smart default. {msg}",
p.name
));
None
}
}
} else {
None
};
CompiledPressure {
name: p.name.clone(),
program,
enabled: p.enabled,
}
})
.collect()
};
{
let mut seen = std::collections::HashSet::new();
for p in &pressures {
if !seen.insert(p.name.as_str()) {
errors.push(format!(
"duplicate scaling pressure name '{}' -- names must be unique",
p.name
));
}
}
}
let engine = Self {
namespace: namespace.to_string(),
enabled: config.enabled,
cpu_target,
targets,
inbound_kind: inbound,
outbound_kind: outbound,
params: config.params.clone(),
pressures,
last_good: Mutex::new(HashMap::new()),
};
(engine, errors)
}
#[must_use]
pub fn is_enabled(&self) -> bool {
self.enabled
}
#[must_use]
pub fn inbound_kind(&self) -> ScalingTransport {
self.inbound_kind
}
#[must_use]
pub fn outbound_kind(&self) -> ScalingTransport {
self.outbound_kind
}
#[must_use]
fn smart_default(&self, cpu_ratio: f64, inbound: f64, circuit_open: bool) -> f64 {
if circuit_open {
return 0.0;
}
let cpu_term = if self.cpu_target > 0.0 {
cpu_ratio / self.cpu_target
} else {
0.0
};
let composite = cpu_term.max(inbound);
100.0 * composite.clamp(0.0, 1.0)
}
#[must_use]
pub fn evaluate(
&self,
signals: &TransportSignals,
cpu_ratio: f64,
memory_ratio: f64,
) -> Vec<(String, f64)> {
let inbound = inbound_pressure(self.inbound_kind, signals, &self.targets);
let outbound = outbound_pressure(signals, &self.targets);
let ctx = if self
.pressures
.iter()
.any(|p| p.enabled && p.program.is_some())
{
Some(self.eval_context(signals, cpu_ratio, inbound, outbound, memory_ratio))
} else {
None
};
let mut out = Vec::with_capacity(self.pressures.len());
for p in &self.pressures {
if !p.enabled {
continue;
}
let value = match &p.program {
None => self.smart_default(cpu_ratio, inbound, signals.circuit_open),
Some(program) => {
let evaluated = ctx.as_ref().and_then(|m| eval_program(program, m));
match evaluated {
Some(v) if v.is_finite() => {
self.last_good.lock().insert(p.name.clone(), v);
v
}
_ => self
.last_good
.lock()
.get(&p.name)
.copied()
.unwrap_or_else(|| {
self.smart_default(cpu_ratio, inbound, signals.circuit_open)
}),
}
}
};
out.push((p.name.clone(), value));
}
out
}
#[allow(unused_variables)]
pub fn tick(&self, signals: &TransportSignals, cpu_ratio: f64, memory_ratio: f64) {
if !self.enabled {
return;
}
let inbound = inbound_pressure(self.inbound_kind, signals, &self.targets);
let outbound = outbound_pressure(signals, &self.targets);
let values = self.evaluate(signals, cpu_ratio, memory_ratio);
#[cfg(feature = "metrics")]
{
let ns = &self.namespace;
for (name, value) in &values {
metrics::gauge!(format!("{ns}_scaling_pressure"), "name" => name.clone())
.set(*value);
}
metrics::gauge!(format!("{ns}_transport_inbound_pressure_ratio")).set(inbound);
metrics::gauge!(format!("{ns}_transport_outbound_pressure_ratio")).set(outbound);
metrics::gauge!(format!("{ns}_scaling_circuit_open")).set(if signals.circuit_open {
1.0
} else {
0.0
});
}
}
fn eval_context(
&self,
signals: &TransportSignals,
cpu_ratio: f64,
inbound: f64,
outbound: f64,
memory_ratio: f64,
) -> serde_json::Map<String, serde_json::Value> {
let mut m = serde_json::Map::new();
m.insert("cpu_utilisation_ratio".into(), json!(cpu_ratio));
m.insert("circuit_open".into(), json!(signals.circuit_open));
m.insert("transport_inbound_pressure_ratio".into(), json!(inbound));
m.insert("transport_outbound_pressure_ratio".into(), json!(outbound));
m.insert("memory_ratio".into(), json!(memory_ratio));
let params: serde_json::Map<String, serde_json::Value> = self
.params
.iter()
.map(|(k, v)| (k.clone(), json!(v)))
.collect();
m.insert("params".into(), serde_json::Value::Object(params));
m.insert(
"metrics".into(),
serde_json::Value::Object(signal_metrics(signals)),
);
let custom: serde_json::Map<String, serde_json::Value> = signals
.custom
.iter()
.map(|(k, v)| (k.clone(), json!(v)))
.collect();
m.insert("custom".into(), serde_json::Value::Object(custom));
m
}
#[must_use]
pub fn available_surface(&self) -> String {
let params: Vec<&str> = self.params.keys().map(String::as_str).collect();
format!(
"top-level: cpu_utilisation_ratio, circuit_open, \
transport_inbound_pressure_ratio, transport_outbound_pressure_ratio, memory_ratio; \
params.{{{}}}; metrics.{{kafka_assigned_lag, redis_pending, inflight, shed_rate, \
send_backpressure_rate, refused_rate, produce_queue_depth}}; \
custom.<app-pushed domain signals, validated at runtime not load>",
params.join(", ")
)
}
}
fn signal_metrics(s: &TransportSignals) -> serde_json::Map<String, serde_json::Value> {
let mut m = serde_json::Map::new();
let mut put = |k: &str, v: Option<f64>| {
if let Some(v) = v {
m.insert(k.to_string(), json!(v));
}
};
put("kafka_assigned_lag", s.kafka_assigned_lag);
put("redis_pending", s.redis_pending);
put("inflight", s.inflight);
put("shed_rate", s.shed_rate);
put("send_backpressure_rate", s.send_backpressure_rate);
put("refused_rate", s.refused_rate);
put("produce_queue_depth", s.produce_queue_depth);
m
}
fn compile_and_check(
expr: &str,
params: &std::collections::BTreeMap<String, f64>,
) -> Result<Program, String> {
let program = Program::compile(expr).map_err(|e| format!("compile error: {e}"))?;
let mut m = serde_json::Map::new();
m.insert("cpu_utilisation_ratio".into(), json!(0.0));
m.insert("circuit_open".into(), json!(false));
m.insert("transport_inbound_pressure_ratio".into(), json!(0.0));
m.insert("transport_outbound_pressure_ratio".into(), json!(0.0));
m.insert("memory_ratio".into(), json!(0.0));
let pmap: serde_json::Map<String, serde_json::Value> =
params.iter().map(|(k, v)| (k.clone(), json!(v))).collect();
m.insert("params".into(), serde_json::Value::Object(pmap));
let mut metrics = serde_json::Map::new();
for k in [
"kafka_assigned_lag",
"redis_pending",
"inflight",
"shed_rate",
"send_backpressure_rate",
"refused_rate",
"produce_queue_depth",
] {
metrics.insert(k.to_string(), json!(0.0));
}
m.insert("metrics".into(), serde_json::Value::Object(metrics));
m.insert(
"custom".into(),
serde_json::Value::Object(serde_json::Map::new()),
);
let surface = || {
format!(
"Available -- top-level: cpu_utilisation_ratio, circuit_open, \
transport_inbound_pressure_ratio, transport_outbound_pressure_ratio, memory_ratio; \
params.{{{}}}; metrics.{{kafka_assigned_lag, redis_pending, inflight, shed_rate, \
send_backpressure_rate, refused_rate, produce_queue_depth}}; \
custom.<app-pushed at runtime>",
params.keys().cloned().collect::<Vec<_>>().join(", ")
)
};
let ctx = crate::expression::build_context(m.iter())
.map_err(|e| format!("context build error: {e}. {}", surface()))?;
match program.execute(&ctx) {
Ok(Value::Float(_) | Value::Int(_) | Value::UInt(_)) => Ok(program),
Ok(other) => Err(format!(
"expression must evaluate to a number, got {other:?}"
)),
Err(ExecutionError::NoSuchKey(key)) => {
tracing::warn!(
missing_key = %key,
expression = expr,
"scaling pressure references a map key not present at load (likely a \
custom.<name> domain signal pushed at runtime) -- keeping the expression; \
it will be validated on each scaling tick and fall back to the smart \
default if it errors."
);
Ok(program)
}
Err(e) => Err(format!("evaluation error: {e}. {}", surface())),
}
}
fn eval_program(
program: &Program,
map: &serde_json::Map<String, serde_json::Value>,
) -> Option<f64> {
let ctx = crate::expression::build_context(map.iter()).ok()?;
match program.execute(&ctx).ok()? {
Value::Float(f) => Some(f),
Value::Int(i) => Some(i as f64),
Value::UInt(u) => Some(u as f64),
Value::Bool(b) => Some(if b { 1.0 } else { 0.0 }),
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::scaling::config::{PressureExpr, ScalingEngineConfig};
fn cfg(pressures: Vec<PressureExpr>, params: &[(&str, f64)]) -> ScalingEngineConfig {
let mut c = ScalingEngineConfig {
pressures,
..Default::default()
};
for (k, v) in params {
c.params.insert((*k).to_string(), *v);
}
c
}
#[test]
fn smart_default_cpu_only_when_no_transport() {
let (eng, errs) = ScalingEngine::new(
"t",
&cfg(vec![], &[("cpu_target", 0.70)]),
ScalingTransport::File, ScalingTransport::Kafka,
);
assert!(errs.is_empty());
let v = eng.evaluate(&TransportSignals::default(), 0.70, 0.0);
assert_eq!(v.len(), 1);
assert_eq!(v[0].0, "default");
assert!((v[0].1 - 100.0).abs() < 1e-6);
}
#[test]
fn smart_default_takes_max_of_cpu_and_inbound_kafka() {
let (eng, _) = ScalingEngine::new(
"t",
&cfg(vec![], &[("cpu_target", 0.70), ("lag_target", 100_000.0)]),
ScalingTransport::Kafka,
ScalingTransport::Kafka,
);
let s = TransportSignals {
kafka_assigned_lag: Some(80_000.0),
..Default::default()
};
let v = eng.evaluate(&s, 0.35, 0.0);
assert!((v[0].1 - 80.0).abs() < 1e-6);
}
#[test]
fn circuit_open_gates_to_zero() {
let (eng, _) = ScalingEngine::new(
"t",
&cfg(vec![], &[("cpu_target", 0.70), ("lag_target", 1.0)]),
ScalingTransport::Kafka,
ScalingTransport::Kafka,
);
let s = TransportSignals {
kafka_assigned_lag: Some(1_000_000.0),
circuit_open: true,
..Default::default()
};
assert!(eng.evaluate(&s, 0.99, 0.0)[0].1.abs() < f64::EPSILON);
}
#[test]
fn user_expression_evaluated() {
let p = PressureExpr {
name: "cpu".into(),
expression: "cpu_utilisation_ratio * 100.0".into(),
enabled: true,
};
let (eng, errs) = ScalingEngine::new(
"t",
&cfg(vec![p], &[]),
ScalingTransport::Kafka,
ScalingTransport::Kafka,
);
assert!(errs.is_empty(), "errors: {errs:?}");
let v = eng.evaluate(&TransportSignals::default(), 0.42, 0.0);
assert_eq!(v.len(), 1);
assert_eq!(v[0].0, "cpu");
assert!((v[0].1 - 42.0).abs() < 1e-6);
}
#[test]
fn user_expression_can_read_params_and_metrics() {
let p = PressureExpr {
name: "lag".into(),
expression: "metrics.kafka_assigned_lag / params.lag_target".into(),
enabled: true,
};
let (eng, errs) = ScalingEngine::new(
"t",
&cfg(vec![p], &[("lag_target", 1000.0)]),
ScalingTransport::Kafka,
ScalingTransport::Kafka,
);
assert!(errs.is_empty(), "errors: {errs:?}");
let s = TransportSignals {
kafka_assigned_lag: Some(500.0),
..Default::default()
};
assert!((eng.evaluate(&s, 0.0, 0.0)[0].1 - 0.5).abs() < 1e-6);
}
#[test]
fn custom_domain_signal_flows_end_to_end() {
let p = PressureExpr {
name: "ch".into(),
expression: "custom.clickhouse_backlog / params.ch_target".into(),
enabled: true,
};
let (eng, errs) = ScalingEngine::new(
"t",
&cfg(vec![p], &[("ch_target", 1000.0)]),
ScalingTransport::File, ScalingTransport::Kafka,
);
assert!(errs.is_empty(), "custom.* must not hard-reject: {errs:?}");
let mut signals = TransportSignals::default();
signals.custom.insert("clickhouse_backlog".into(), 2500.0);
let v = eng.evaluate(&signals, 0.0, 0.0);
assert_eq!(v.len(), 1);
assert_eq!(v[0].0, "ch");
assert!(
(v[0].1 - 2.5).abs() < 1e-9,
"custom signal should flow end-to-end, got {}",
v[0].1
);
}
#[test]
fn custom_signal_absent_at_runtime_falls_back() {
let p = PressureExpr {
name: "ch".into(),
expression: "custom.never_pushed / params.ch_target".into(),
enabled: true,
};
let (eng, errs) = ScalingEngine::new(
"t",
&cfg(vec![p], &[("ch_target", 1000.0), ("cpu_target", 0.70)]),
ScalingTransport::File,
ScalingTransport::Kafka,
);
assert!(errs.is_empty(), "errors: {errs:?}");
let v = eng.evaluate(&TransportSignals::default(), 0.70, 0.0);
assert!(
(v[0].1 - 100.0).abs() < 1e-6,
"absent custom signal must fall back to smart default, got {}",
v[0].1
);
}
#[test]
fn syntax_error_falls_back_with_friendly_message() {
let p = PressureExpr {
name: "broken".into(),
expression: "cpu_utilisation_ratio +".into(), enabled: true,
};
let (eng, errs) = ScalingEngine::new(
"t",
&cfg(vec![p], &[("cpu_target", 0.70)]),
ScalingTransport::Kafka,
ScalingTransport::Kafka,
);
assert_eq!(errs.len(), 1);
assert!(errs[0].contains("broken"), "msg: {}", errs[0]);
let v = eng.evaluate(&TransportSignals::default(), 0.70, 0.0);
assert!((v[0].1 - 100.0).abs() < 1e-6);
}
#[test]
fn unknown_identifier_caught_at_load() {
let p = PressureExpr {
name: "typo".into(),
expression: "cpu_utilisation_ratoi * 100".into(), enabled: true,
};
let (_eng, errs) = ScalingEngine::new(
"t",
&cfg(vec![p], &[]),
ScalingTransport::Kafka,
ScalingTransport::Kafka,
);
assert_eq!(errs.len(), 1, "should catch the unknown identifier at load");
assert!(errs[0].contains("typo"));
}
#[test]
fn multi_output_independent_gauges() {
let ps = vec![
PressureExpr {
name: "a".into(),
expression: "cpu_utilisation_ratio * 100.0".into(),
enabled: true,
},
PressureExpr {
name: "b".into(),
expression: "transport_inbound_pressure_ratio * 100.0".into(),
enabled: true,
},
];
let (eng, errs) = ScalingEngine::new(
"t",
&cfg(ps, &[("lag_target", 100.0)]),
ScalingTransport::Kafka,
ScalingTransport::Kafka,
);
assert!(errs.is_empty(), "errors: {errs:?}");
let s = TransportSignals {
kafka_assigned_lag: Some(50.0),
..Default::default()
};
let v = eng.evaluate(&s, 0.30, 0.0);
assert_eq!(v.len(), 2);
assert!((v[0].1 - 30.0).abs() < 1e-6); assert!((v[1].1 - 50.0).abs() < 1e-6); }
}