#![cfg(all(feature = "scaling", feature = "expression"))]
use hyperi_rustlib::scaling::{
PressureExpr, PressureTargets, ScalingEngine, ScalingEngineConfig, ScalingTransport,
TransportSignals, inbound_pressure, outbound_pressure,
};
struct Rng(u64);
impl Rng {
fn new(seed: u64) -> Self {
Self(seed)
}
fn next_u64(&mut self) -> u64 {
self.0 = self.0.wrapping_add(0x9E37_79B9_7F4A_7C15);
let mut z = self.0;
z = (z ^ (z >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
z = (z ^ (z >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
z ^ (z >> 31)
}
#[allow(clippy::cast_precision_loss)]
fn f64_to(&mut self, max: f64) -> f64 {
let frac = (self.next_u64() >> 11) as f64 / (1u64 << 53) as f64;
frac * max
}
fn bool(&mut self) -> bool {
self.next_u64() & 1 == 1
}
fn kind(&mut self) -> ScalingTransport {
match self.next_u64() % 8 {
0 => ScalingTransport::Kafka,
1 => ScalingTransport::Redis,
2 => ScalingTransport::Http,
3 => ScalingTransport::Grpc,
4 => ScalingTransport::File,
5 => ScalingTransport::Pipe,
6 => ScalingTransport::Memory,
_ => ScalingTransport::Other,
}
}
}
fn cfg(pressures: Vec<PressureExpr>, params: &[(&str, f64)]) -> ScalingEngineConfig {
let mut c = ScalingEngineConfig {
pressures,
..Default::default()
};
for (k, v) in params {
c.params.insert((*k).to_string(), *v);
}
c
}
fn expr(name: &str, expression: &str) -> PressureExpr {
PressureExpr {
name: name.to_string(),
expression: expression.to_string(),
enabled: true,
}
}
#[test]
fn property_smart_default_is_bounded_and_finite() {
let mut rng = Rng::new(0xDEAD_BEEF);
for _ in 0..20_000 {
let kind = rng.kind();
let lag_target = if rng.bool() {
rng.f64_to(500_000.0)
} else {
0.0
};
let (engine, errs) = ScalingEngine::new(
"prop",
&cfg(
vec![],
&[
("cpu_target", 0.1 + rng.f64_to(0.9)),
("lag_target", lag_target),
],
),
kind,
ScalingTransport::Kafka,
);
assert!(errs.is_empty());
let signals = TransportSignals {
kafka_assigned_lag: Some(rng.f64_to(5_000_000.0)),
redis_pending: Some(rng.f64_to(1_000_000.0)),
inflight: Some(rng.f64_to(5_000.0)),
shed_rate: Some(rng.f64_to(500.0)),
circuit_open: rng.bool(),
..Default::default()
};
let cpu = rng.f64_to(3.0);
let mem = rng.f64_to(1.0);
let out = engine.evaluate(&signals, cpu, mem);
assert_eq!(out.len(), 1);
let v = out[0].1;
assert!(v.is_finite(), "non-finite default: {v}");
assert!((0.0..=100.0).contains(&v), "out of [0,100]: {v}");
if signals.circuit_open {
assert!(v.abs() < f64::EPSILON, "circuit open must gate to 0");
}
}
}
#[test]
fn property_user_expressions_never_panic_and_stay_finite() {
let exprs = [
"cpu_utilisation_ratio * 100.0",
"transport_inbound_pressure_ratio * 100.0",
"circuit_open ? 0.0 : 100.0 * cpu_utilisation_ratio",
"cpu_utilisation_ratio > transport_inbound_pressure_ratio ? cpu_utilisation_ratio * 100.0 : transport_inbound_pressure_ratio * 100.0",
"metrics.kafka_assigned_lag / params.lag_target",
"(cpu_utilisation_ratio / params.cpu_target) * 100.0",
"memory_ratio * 100.0",
"transport_outbound_pressure_ratio * 50.0 + cpu_utilisation_ratio * 50.0",
"custom.ch_backlog / params.ch_target",
];
let pressures: Vec<PressureExpr> = exprs
.iter()
.enumerate()
.map(|(i, e)| expr(&format!("p{i}"), e))
.collect();
let n = pressures.len();
let (engine, errs) = ScalingEngine::new(
"prop",
&cfg(
pressures,
&[
("cpu_target", 0.70),
("lag_target", 100_000.0),
("ch_target", 50_000.0),
],
),
ScalingTransport::Kafka,
ScalingTransport::Kafka,
);
assert!(
errs.is_empty(),
"valid expressions should compile: {errs:?}"
);
let mut rng = Rng::new(0x1234_5678);
for _ in 0..20_000 {
let mut custom = std::collections::BTreeMap::new();
if rng.bool() {
custom.insert("ch_backlog".to_string(), rng.f64_to(500_000.0));
}
let signals = TransportSignals {
kafka_assigned_lag: Some(rng.f64_to(5_000_000.0)),
redis_pending: Some(rng.f64_to(1_000_000.0)),
inflight: Some(rng.f64_to(5_000.0)),
shed_rate: Some(rng.f64_to(500.0)),
send_backpressure_rate: Some(rng.f64_to(500.0)),
refused_rate: Some(rng.f64_to(500.0)),
produce_queue_depth: Some(rng.f64_to(100_000.0)),
circuit_open: rng.bool(),
custom,
};
let out = engine.evaluate(&signals, rng.f64_to(3.0), rng.f64_to(1.0));
assert_eq!(out.len(), n);
for (name, v) in out {
assert!(v.is_finite(), "pressure '{name}' produced non-finite {v}");
}
}
}
#[test]
fn property_compound_pressure_finite_nonnegative() {
let mut rng = Rng::new(0xABCD_1234);
for _ in 0..20_000 {
let signals = TransportSignals {
kafka_assigned_lag: Some(rng.f64_to(5_000_000.0)),
redis_pending: Some(rng.f64_to(1_000_000.0)),
inflight: Some(rng.f64_to(5_000.0)),
shed_rate: Some(rng.f64_to(500.0)),
send_backpressure_rate: Some(rng.f64_to(500.0)),
refused_rate: Some(rng.f64_to(500.0)),
produce_queue_depth: Some(rng.f64_to(100_000.0)),
circuit_open: rng.bool(),
..Default::default()
};
let mut params = std::collections::BTreeMap::new();
if rng.bool() {
params.insert("lag_target".to_string(), rng.f64_to(200_000.0));
}
if rng.bool() {
params.insert("redis_lag_target".to_string(), rng.f64_to(100_000.0));
}
params.insert(
"http_concurrency_target".to_string(),
1.0 + rng.f64_to(500.0),
);
params.insert("shed_target".to_string(), 1.0 + rng.f64_to(100.0));
let targets = PressureTargets::from_params(¶ms);
let kind = rng.kind();
let inb = inbound_pressure(kind, &signals, &targets);
let outb = outbound_pressure(&signals, &targets);
assert!(inb.is_finite() && inb >= 0.0, "inbound {inb} for {kind:?}");
assert!(outb.is_finite() && outb >= 0.0, "outbound {outb}");
if !kind.is_horizontally_scalable_inbound() {
assert!(
inb.abs() < f64::EPSILON,
"{kind:?} must contribute 0 inbound"
);
}
}
}
#[test]
fn precedence_config_expression_overrides_default() {
let (engine, errs) = ScalingEngine::new(
"t",
&cfg(vec![expr("fixed", "7.0")], &[("cpu_target", 0.70)]),
ScalingTransport::Kafka,
ScalingTransport::Kafka,
);
assert!(errs.is_empty());
let v = engine.evaluate(&TransportSignals::default(), 0.70, 0.0);
assert_eq!(v.len(), 1);
assert!((v[0].1 - 7.0).abs() < 1e-9);
}
#[test]
fn context_aware_default_ignores_lag_for_non_kafka_inbound() {
let signals = TransportSignals {
kafka_assigned_lag: Some(10_000_000.0), ..Default::default()
};
let (engine, _) = ScalingEngine::new(
"t",
&cfg(vec![], &[("cpu_target", 0.70), ("lag_target", 1.0)]),
ScalingTransport::Http, ScalingTransport::Kafka,
);
let v = engine.evaluate(&signals, 0.0, 0.0);
assert!(
v[0].1.abs() < f64::EPSILON,
"HTTP inbound must not read kafka lag"
);
let (engine_k, _) = ScalingEngine::new(
"t",
&cfg(vec![], &[("cpu_target", 0.70), ("lag_target", 1.0)]),
ScalingTransport::Kafka,
ScalingTransport::Kafka,
);
assert!((engine_k.evaluate(&signals, 0.0, 0.0)[0].1 - 100.0).abs() < 1e-9);
}
#[test]
fn missing_map_key_is_kept_then_falls_back_at_runtime() {
let (engine, errs) = ScalingEngine::new(
"t",
&cfg(
vec![expr("bad", "metrics.does_not_exist * 2.0")],
&[("cpu_target", 0.70)],
),
ScalingTransport::File, ScalingTransport::Kafka,
);
assert!(
errs.is_empty(),
"missing map key should be warn-and-kept, not a hard load error: {errs:?}"
);
let v = engine.evaluate(&TransportSignals::default(), 0.70, 0.0);
assert_eq!(v.len(), 1);
assert_eq!(v[0].0, "bad");
assert!(
(v[0].1 - 100.0).abs() < 1e-6,
"missing metric must fall back to the smart default, got {}",
v[0].1
);
}
#[test]
fn unknown_top_level_identifier_caught_at_load() {
let (_engine, errs) = ScalingEngine::new(
"t",
&cfg(vec![expr("bad", "cpu_utilisation_ratoi * 2.0")], &[]),
ScalingTransport::Kafka,
ScalingTransport::Kafka,
);
assert_eq!(
errs.len(),
1,
"unknown top-level identifier should hard-fail at load"
);
assert!(errs[0].contains("bad"));
}
#[test]
fn custom_signal_scales_end_to_end() {
let (engine, errs) = ScalingEngine::new(
"t",
&cfg(
vec![expr("ch", "custom.clickhouse_backlog / params.ch_target")],
&[("ch_target", 2000.0)],
),
ScalingTransport::File,
ScalingTransport::Kafka,
);
assert!(errs.is_empty(), "custom.* must not hard-reject: {errs:?}");
let mut signals = TransportSignals::default();
signals.custom.insert("clickhouse_backlog".into(), 5000.0);
let v = engine.evaluate(&signals, 0.0, 0.0);
assert!(
(v[0].1 - 2.5).abs() < 1e-9,
"custom 5000/2000 = 2.5, got {}",
v[0].1
);
}
#[test]
fn disabled_pressure_is_skipped() {
let mut p = expr("off", "cpu_utilisation_ratio * 100.0");
p.enabled = false;
let (engine, errs) = ScalingEngine::new(
"t",
&cfg(vec![p], &[]),
ScalingTransport::Kafka,
ScalingTransport::Kafka,
);
assert!(errs.is_empty());
assert!(
engine
.evaluate(&TransportSignals::default(), 0.9, 0.0)
.is_empty()
);
}