use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ScalingTransport {
Kafka,
Redis,
Http,
Grpc,
File,
Pipe,
Memory,
Other,
}
impl ScalingTransport {
#[must_use]
pub fn from_label(s: &str) -> Self {
match s.to_ascii_lowercase().as_str() {
"kafka" => Self::Kafka,
"redis" | "redis_stream" | "redis-streams" | "redisstream" => Self::Redis,
"http" => Self::Http,
"grpc" => Self::Grpc,
"file" => Self::File,
"pipe" | "stdin" => Self::Pipe,
"memory" => Self::Memory,
_ => Self::Other,
}
}
#[must_use]
pub fn is_horizontally_scalable_inbound(self) -> bool {
matches!(self, Self::Kafka | Self::Redis | Self::Http | Self::Grpc)
}
}
#[derive(Debug, Clone, Default)]
pub struct TransportSignals {
pub kafka_assigned_lag: Option<f64>,
pub redis_pending: Option<f64>,
pub inflight: Option<f64>,
pub shed_rate: Option<f64>,
pub send_backpressure_rate: Option<f64>,
pub refused_rate: Option<f64>,
pub produce_queue_depth: Option<f64>,
pub circuit_open: bool,
pub custom: std::collections::BTreeMap<String, f64>,
}
#[derive(Debug, Clone)]
pub struct PressureTargets {
pub lag_target: Option<f64>,
pub redis_lag_target: Option<f64>,
pub http_concurrency_target: f64,
pub grpc_concurrency_target: f64,
pub shed_target: f64,
pub produce_queue_target: Option<f64>,
}
impl PressureTargets {
#[must_use]
pub fn from_params(params: &std::collections::BTreeMap<String, f64>) -> Self {
let get = |k: &str| params.get(k).copied();
Self {
lag_target: get("lag_target"),
redis_lag_target: get("redis_lag_target"),
http_concurrency_target: get("http_concurrency_target").unwrap_or(100.0),
grpc_concurrency_target: get("grpc_concurrency_target").unwrap_or(100.0),
shed_target: get("shed_target").unwrap_or(10.0),
produce_queue_target: get("produce_queue_target"),
}
}
}
fn ratio_opt(value: f64, target: Option<f64>) -> f64 {
match target {
Some(t) if t > 0.0 && value.is_finite() => (value / t).max(0.0),
_ => 0.0,
}
}
fn ratio(value: f64, target: f64) -> f64 {
if target > 0.0 && value.is_finite() {
(value / target).max(0.0)
} else {
0.0
}
}
#[must_use]
pub fn inbound_pressure(kind: ScalingTransport, s: &TransportSignals, t: &PressureTargets) -> f64 {
match kind {
ScalingTransport::Kafka => ratio_opt(s.kafka_assigned_lag.unwrap_or(0.0), t.lag_target),
ScalingTransport::Redis => ratio_opt(s.redis_pending.unwrap_or(0.0), t.redis_lag_target),
ScalingTransport::Http => {
let conc = ratio(s.inflight.unwrap_or(0.0), t.http_concurrency_target);
let shed = ratio(s.shed_rate.unwrap_or(0.0), t.shed_target);
conc.max(shed)
}
ScalingTransport::Grpc => {
let conc = ratio(s.inflight.unwrap_or(0.0), t.grpc_concurrency_target);
let shed = ratio(s.shed_rate.unwrap_or(0.0), t.shed_target);
conc.max(shed)
}
_ => 0.0,
}
}
#[must_use]
pub fn outbound_pressure(s: &TransportSignals, t: &PressureTargets) -> f64 {
let bp = ratio(s.send_backpressure_rate.unwrap_or(0.0), t.shed_target);
let refused = ratio(s.refused_rate.unwrap_or(0.0), t.shed_target);
let queue = ratio_opt(s.produce_queue_depth.unwrap_or(0.0), t.produce_queue_target);
bp.max(refused).max(queue)
}
#[derive(Debug)]
pub struct ScalingSignalsCell {
kafka_assigned_lag: AtomicU64,
redis_pending: AtomicU64,
inflight: AtomicU64,
shed_rate: AtomicU64,
send_backpressure_rate: AtomicU64,
refused_rate: AtomicU64,
produce_queue_depth: AtomicU64,
circuit_open: AtomicBool,
custom: Mutex<std::collections::BTreeMap<String, f64>>,
}
impl Default for ScalingSignalsCell {
fn default() -> Self {
let absent = || AtomicU64::new(f64::NAN.to_bits());
Self {
kafka_assigned_lag: absent(),
redis_pending: absent(),
inflight: absent(),
shed_rate: absent(),
send_backpressure_rate: absent(),
refused_rate: absent(),
produce_queue_depth: absent(),
circuit_open: AtomicBool::new(false),
custom: Mutex::new(std::collections::BTreeMap::new()),
}
}
}
impl ScalingSignalsCell {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn set_kafka_assigned_lag(&self, v: f64) {
self.kafka_assigned_lag
.store(v.to_bits(), Ordering::Relaxed);
}
pub fn set_redis_pending(&self, v: f64) {
self.redis_pending.store(v.to_bits(), Ordering::Relaxed);
}
pub fn set_inflight(&self, v: f64) {
self.inflight.store(v.to_bits(), Ordering::Relaxed);
}
pub fn set_shed_rate(&self, v: f64) {
self.shed_rate.store(v.to_bits(), Ordering::Relaxed);
}
pub fn set_send_backpressure_rate(&self, v: f64) {
self.send_backpressure_rate
.store(v.to_bits(), Ordering::Relaxed);
}
pub fn set_refused_rate(&self, v: f64) {
self.refused_rate.store(v.to_bits(), Ordering::Relaxed);
}
pub fn set_produce_queue_depth(&self, v: f64) {
self.produce_queue_depth
.store(v.to_bits(), Ordering::Relaxed);
}
pub fn set_circuit_open(&self, open: bool) {
self.circuit_open.store(open, Ordering::Relaxed);
}
pub fn set_custom(&self, name: &str, value: f64) {
self.custom.lock().insert(name.to_string(), value);
}
#[must_use]
pub fn snapshot(&self) -> TransportSignals {
let read = |a: &AtomicU64| -> Option<f64> {
let v = f64::from_bits(a.load(Ordering::Relaxed));
if v.is_nan() { None } else { Some(v) }
};
TransportSignals {
kafka_assigned_lag: read(&self.kafka_assigned_lag),
redis_pending: read(&self.redis_pending),
inflight: read(&self.inflight),
shed_rate: read(&self.shed_rate),
send_backpressure_rate: read(&self.send_backpressure_rate),
refused_rate: read(&self.refused_rate),
produce_queue_depth: read(&self.produce_queue_depth),
circuit_open: self.circuit_open.load(Ordering::Relaxed),
custom: self.custom.lock().clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::BTreeMap;
fn targets(pairs: &[(&str, f64)]) -> PressureTargets {
let mut m = BTreeMap::new();
for (k, v) in pairs {
m.insert((*k).to_string(), *v);
}
PressureTargets::from_params(&m)
}
#[test]
fn kind_from_label() {
assert_eq!(
ScalingTransport::from_label("Kafka"),
ScalingTransport::Kafka
);
assert_eq!(
ScalingTransport::from_label("redis-streams"),
ScalingTransport::Redis
);
assert_eq!(ScalingTransport::from_label("grpc"), ScalingTransport::Grpc);
assert_eq!(
ScalingTransport::from_label("nonsense"),
ScalingTransport::Other
);
}
#[test]
fn horizontally_scalable_classification() {
for k in [
ScalingTransport::Kafka,
ScalingTransport::Redis,
ScalingTransport::Http,
ScalingTransport::Grpc,
] {
assert!(k.is_horizontally_scalable_inbound(), "{k:?}");
}
for k in [
ScalingTransport::File,
ScalingTransport::Pipe,
ScalingTransport::Memory,
ScalingTransport::Other,
] {
assert!(!k.is_horizontally_scalable_inbound(), "{k:?}");
}
}
#[test]
fn kafka_lag_needs_a_target_else_zero() {
let s = TransportSignals {
kafka_assigned_lag: Some(50_000.0),
..Default::default()
};
let t = targets(&[]);
assert!(inbound_pressure(ScalingTransport::Kafka, &s, &t).abs() < f64::EPSILON);
let t = targets(&[("lag_target", 100_000.0)]);
assert!((inbound_pressure(ScalingTransport::Kafka, &s, &t) - 0.5).abs() < 1e-9);
}
#[test]
fn kafka_lag_unclamped_above_one() {
let s = TransportSignals {
kafka_assigned_lag: Some(250_000.0),
..Default::default()
};
let t = targets(&[("lag_target", 100_000.0)]);
assert!((inbound_pressure(ScalingTransport::Kafka, &s, &t) - 2.5).abs() < 1e-9);
}
#[test]
fn http_takes_max_of_inflight_and_shed() {
let s = TransportSignals {
inflight: Some(50.0),
shed_rate: Some(8.0),
..Default::default()
};
let t = targets(&[("http_concurrency_target", 100.0), ("shed_target", 10.0)]);
assert!((inbound_pressure(ScalingTransport::Http, &s, &t) - 0.8).abs() < 1e-9);
}
#[test]
fn non_scalable_inbound_is_zero() {
let s = TransportSignals {
kafka_assigned_lag: Some(999.0),
inflight: Some(999.0),
..Default::default()
};
let t = targets(&[("lag_target", 1.0), ("http_concurrency_target", 1.0)]);
for k in [
ScalingTransport::File,
ScalingTransport::Pipe,
ScalingTransport::Memory,
] {
assert!(inbound_pressure(k, &s, &t).abs() < f64::EPSILON, "{k:?}");
}
}
#[test]
fn nan_inputs_never_propagate() {
let s = TransportSignals {
kafka_assigned_lag: Some(f64::NAN),
inflight: Some(f64::INFINITY),
..Default::default()
};
let t = targets(&[("lag_target", 100.0), ("http_concurrency_target", 100.0)]);
assert!(inbound_pressure(ScalingTransport::Kafka, &s, &t).abs() < f64::EPSILON);
assert!(inbound_pressure(ScalingTransport::Http, &s, &t).abs() < f64::EPSILON);
}
#[test]
fn set_custom_flows_into_snapshot() {
let cell = ScalingSignalsCell::new();
assert!(cell.snapshot().custom.is_empty());
cell.set_custom("clickhouse_backlog", 42.0);
cell.set_custom("api_throttle", 0.7);
let snap = cell.snapshot();
assert!((snap.custom["clickhouse_backlog"] - 42.0).abs() < 1e-9);
assert!((snap.custom["api_throttle"] - 0.7).abs() < 1e-9);
cell.set_custom("clickhouse_backlog", 99.0);
assert!((cell.snapshot().custom["clickhouse_backlog"] - 99.0).abs() < 1e-9);
}
#[test]
fn outbound_composes_but_defaults_zero() {
let s = TransportSignals::default();
let t = targets(&[]);
assert!(outbound_pressure(&s, &t).abs() < f64::EPSILON);
let s = TransportSignals {
refused_rate: Some(20.0),
..Default::default()
};
let t = targets(&[("shed_target", 10.0)]);
assert!((outbound_pressure(&s, &t) - 2.0).abs() < 1e-9);
}
}