Skip to main content

hyperi_rustlib/scaling/
transport_pressure.rs

1// Project:   hyperi-rustlib
2// File:      src/scaling/transport_pressure.rs
3// Purpose:   Compound, per-pod transport scaling pressure (gratis default)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Compound transport scaling pressure -- the "works most of the time" inbound
10//! (and outbound) signal (scaling ACR + spec 5c). rustlib provides the COMPUTOR,
11//! the compound gauge, and the CPU term gratis; the per-pod transport SIGNALS
12//! are fed by the app (or a governed receiver) via `ScalingSignalsCell` -- they
13//! are NOT auto-collected, so an app that pushes nothing gets a CPU-only default.
14//!
15//! rustlib computes ONE normalised inbound pressure (and one outbound) from the
16//! signals a pod can know LOCALLY -- no peer/replica count. The conditional
17//! "which signal for which transport" lives HERE, once, instead of in every
18//! app's CEL expression:
19//!
20//! - **Kafka** -> lag over THIS instance's assigned partitions / `lag_target`
21//!   (inherently per-pod; falls as the group grows).
22//! - **Redis** -> this consumer's pending / `redis_lag_target`.
23//! - **HTTP / gRPC** -> this pod's in-flight / concurrency target, `max`'d with
24//!   its shed rate / `shed_target`.
25//! - **file / pipe / memory** -> 0 (not horizontally scalable).
26//!
27//! Ratios are floored at 0 and left UNCLAMPED above 1.0 so a KEDA Prometheus
28//! scaler (Value/AverageValue target) gets proportional scale-up; the smart
29//! default composite applies the `min(1, ...)` bound when producing the 0-100
30//! `scaling_pressure`.
31
32use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33
34use parking_lot::Mutex;
35use serde::{Deserialize, Serialize};
36
37/// Inbound/outbound transport kind, for scaling-signal selection.
38///
39/// Deliberately separate from [`crate::metrics::TransportKind`] -- this carries
40/// the *scaling* semantics (is-horizontally-scalable, which signal to read).
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
42#[serde(rename_all = "lowercase")]
43pub enum ScalingTransport {
44    /// Apache Kafka consumer (pull, durable backlog).
45    Kafka,
46    /// Redis Streams consumer (pull, durable backlog).
47    Redis,
48    /// HTTP server (push originator).
49    Http,
50    /// gRPC server (push originator).
51    Grpc,
52    /// File source (single sequential reader).
53    File,
54    /// Pipe / stdin (forward-only).
55    Pipe,
56    /// In-process memory transport (test / loopback).
57    Memory,
58    /// Anything else / not classified.
59    Other,
60}
61
62impl ScalingTransport {
63    /// Parse from a transport label (case-insensitive). Unknown -> [`Self::Other`].
64    #[must_use]
65    pub fn from_label(s: &str) -> Self {
66        match s.to_ascii_lowercase().as_str() {
67            "kafka" => Self::Kafka,
68            "redis" | "redis_stream" | "redis-streams" | "redisstream" => Self::Redis,
69            "http" => Self::Http,
70            "grpc" => Self::Grpc,
71            "file" => Self::File,
72            "pipe" | "stdin" => Self::Pipe,
73            "memory" => Self::Memory,
74            _ => Self::Other,
75        }
76    }
77
78    /// Whether adding pods can relieve load on this inbound transport -- i.e. it
79    /// has a durable external backlog (kafka/redis) or LB-distributed load
80    /// (http/grpc). file/pipe/memory cannot be scaled out.
81    #[must_use]
82    pub fn is_horizontally_scalable_inbound(self) -> bool {
83        matches!(self, Self::Kafka | Self::Redis | Self::Http | Self::Grpc)
84    }
85}
86
87/// Per-pod, locally-knowable scaling signals, sampled each scaling tick.
88///
89/// EVERY field is what THIS instance knows from its own state or its
90/// broker/coordinator session -- no peer/replica assumptions (scaling ACR
91/// principle 1). `None` means "not applicable / not yet known" and contributes
92/// 0 to the pressure (never NaN).
93#[derive(Debug, Clone, Default)]
94pub struct TransportSignals {
95    /// Kafka: summed lag over THIS instance's ASSIGNED partitions (messages).
96    pub kafka_assigned_lag: Option<f64>,
97    /// Redis: this consumer's pending / per-consumer group lag (messages).
98    pub redis_pending: Option<f64>,
99    /// HTTP/gRPC: this pod's in-flight request count.
100    pub inflight: Option<f64>,
101    /// HTTP/gRPC: this pod's shed/reject rate (events per second).
102    pub shed_rate: Option<f64>,
103    /// Outbound: send-backpressure rate (events per second).
104    pub send_backpressure_rate: Option<f64>,
105    /// Outbound: refused/dropped rate (events per second).
106    pub refused_rate: Option<f64>,
107    /// Outbound: producer/sink queue depth (messages).
108    pub produce_queue_depth: Option<f64>,
109    /// Outbound circuit breaker open (sink dead -> the composite gate).
110    pub circuit_open: bool,
111    /// App-pushed DOMAIN signals, by name (e.g. cloud-API pending-fetch backlog,
112    /// ClickHouse insert backlog). Empty by default; exposed to CEL under the
113    /// `custom.<name>` map, SEPARATE from the fixed transport `metrics` map.
114    pub custom: std::collections::BTreeMap<String, f64>,
115}
116
117/// Per-pod normalisation targets (KEDA `lagThreshold`-style: "what ONE pod
118/// tolerates"). Pulled from the config `params` map with researched defaults.
119#[derive(Debug, Clone)]
120pub struct PressureTargets {
121    /// Kafka per-pod lag target. NO universal default (spec) -- absent => the
122    /// kafka term contributes 0 (never NaN).
123    pub lag_target: Option<f64>,
124    /// Redis per-pod lag target. Absent => the redis term contributes 0.
125    pub redis_lag_target: Option<f64>,
126    /// HTTP per-pod in-flight concurrency target (KEDA http-add-on ref: 100).
127    pub http_concurrency_target: f64,
128    /// gRPC per-pod in-flight concurrency target.
129    pub grpc_concurrency_target: f64,
130    /// Shed/reject rate that counts as full overload (events/sec).
131    pub shed_target: f64,
132    /// Outbound producer/sink queue-depth target. Absent => term 0.
133    pub produce_queue_target: Option<f64>,
134}
135
136impl PressureTargets {
137    /// Build from the config `params` map. Transport targets without a
138    /// universally-safe default are left `None` (their term contributes 0 until
139    /// the operator sizes them); concurrency/shed get researched defaults.
140    #[must_use]
141    pub fn from_params(params: &std::collections::BTreeMap<String, f64>) -> Self {
142        let get = |k: &str| params.get(k).copied();
143        Self {
144            lag_target: get("lag_target"),
145            redis_lag_target: get("redis_lag_target"),
146            http_concurrency_target: get("http_concurrency_target").unwrap_or(100.0),
147            grpc_concurrency_target: get("grpc_concurrency_target").unwrap_or(100.0),
148            shed_target: get("shed_target").unwrap_or(10.0),
149            produce_queue_target: get("produce_queue_target"),
150        }
151    }
152}
153
154/// `value / target` floored at 0; 0 when the target is unset/<=0 or the value is
155/// non-finite (never NaN/Inf into the pressure).
156fn ratio_opt(value: f64, target: Option<f64>) -> f64 {
157    match target {
158        Some(t) if t > 0.0 && value.is_finite() => (value / t).max(0.0),
159        _ => 0.0,
160    }
161}
162
163/// `value / target` floored at 0, for targets that always have a default.
164fn ratio(value: f64, target: f64) -> f64 {
165    if target > 0.0 && value.is_finite() {
166        (value / target).max(0.0)
167    } else {
168        0.0
169    }
170}
171
172/// Compound INBOUND pressure ratio (per-pod; >=0, unclamped above 1.0 for
173/// proportional scale-up). Picks the signal by the configured inbound kind.
174#[must_use]
175pub fn inbound_pressure(kind: ScalingTransport, s: &TransportSignals, t: &PressureTargets) -> f64 {
176    match kind {
177        ScalingTransport::Kafka => ratio_opt(s.kafka_assigned_lag.unwrap_or(0.0), t.lag_target),
178        ScalingTransport::Redis => ratio_opt(s.redis_pending.unwrap_or(0.0), t.redis_lag_target),
179        ScalingTransport::Http => {
180            let conc = ratio(s.inflight.unwrap_or(0.0), t.http_concurrency_target);
181            let shed = ratio(s.shed_rate.unwrap_or(0.0), t.shed_target);
182            conc.max(shed)
183        }
184        ScalingTransport::Grpc => {
185            let conc = ratio(s.inflight.unwrap_or(0.0), t.grpc_concurrency_target);
186            let shed = ratio(s.shed_rate.unwrap_or(0.0), t.shed_target);
187            conc.max(shed)
188        }
189        // file/pipe/memory/other: not horizontally scalable -> CPU + gate only.
190        _ => 0.0,
191    }
192}
193
194/// Compound OUTBOUND pressure ratio (per-pod). EMIT-ONLY by default -- NOT in
195/// the smart-default composite (downstream-bound; more pods rarely relieve a
196/// saturated sink -- scaling ACR). A dead sink surfaces as the circuit gate in
197/// the composite, not here.
198#[must_use]
199pub fn outbound_pressure(s: &TransportSignals, t: &PressureTargets) -> f64 {
200    let bp = ratio(s.send_backpressure_rate.unwrap_or(0.0), t.shed_target);
201    let refused = ratio(s.refused_rate.unwrap_or(0.0), t.shed_target);
202    let queue = ratio_opt(s.produce_queue_depth.unwrap_or(0.0), t.produce_queue_target);
203    bp.max(refused).max(queue)
204}
205
206/// Lock-free cell the app / transport updates with the current per-pod signals;
207/// the engine tick reads a [`snapshot`](Self::snapshot). A NaN bit-pattern means
208/// "absent" -> `None` in the snapshot (contributes 0 to the pressure).
209///
210/// CPU is sampled by the engine tick itself (process cumulative / cores), so it
211/// is NOT in this cell -- the cell carries only the transport-side signals an
212/// app pushes from its receive/send loops.
213#[derive(Debug)]
214pub struct ScalingSignalsCell {
215    kafka_assigned_lag: AtomicU64,
216    redis_pending: AtomicU64,
217    inflight: AtomicU64,
218    shed_rate: AtomicU64,
219    send_backpressure_rate: AtomicU64,
220    refused_rate: AtomicU64,
221    produce_queue_depth: AtomicU64,
222    circuit_open: AtomicBool,
223    /// App-pushed DOMAIN signals, keyed by name. A `BTreeMap` (not per-field
224    /// atomics) because the keys are open-ended and only set on the scaling
225    /// tick, not the data hot-path -- a short `parking_lot` lock is cheap here.
226    custom: Mutex<std::collections::BTreeMap<String, f64>>,
227}
228
229impl Default for ScalingSignalsCell {
230    fn default() -> Self {
231        let absent = || AtomicU64::new(f64::NAN.to_bits());
232        Self {
233            kafka_assigned_lag: absent(),
234            redis_pending: absent(),
235            inflight: absent(),
236            shed_rate: absent(),
237            send_backpressure_rate: absent(),
238            refused_rate: absent(),
239            produce_queue_depth: absent(),
240            circuit_open: AtomicBool::new(false),
241            custom: Mutex::new(std::collections::BTreeMap::new()),
242        }
243    }
244}
245
246impl ScalingSignalsCell {
247    /// Create an empty cell (all signals absent, circuit closed).
248    #[must_use]
249    pub fn new() -> Self {
250        Self::default()
251    }
252
253    /// Set Kafka per-pod assigned-partition lag (messages).
254    pub fn set_kafka_assigned_lag(&self, v: f64) {
255        self.kafka_assigned_lag
256            .store(v.to_bits(), Ordering::Relaxed);
257    }
258    /// Set Redis per-consumer pending / lag (messages).
259    pub fn set_redis_pending(&self, v: f64) {
260        self.redis_pending.store(v.to_bits(), Ordering::Relaxed);
261    }
262    /// Set this pod's in-flight request count (http/grpc).
263    pub fn set_inflight(&self, v: f64) {
264        self.inflight.store(v.to_bits(), Ordering::Relaxed);
265    }
266    /// Set this pod's shed/reject rate (events/sec).
267    pub fn set_shed_rate(&self, v: f64) {
268        self.shed_rate.store(v.to_bits(), Ordering::Relaxed);
269    }
270    /// Set outbound send-backpressure rate (events/sec).
271    pub fn set_send_backpressure_rate(&self, v: f64) {
272        self.send_backpressure_rate
273            .store(v.to_bits(), Ordering::Relaxed);
274    }
275    /// Set outbound refused/dropped rate (events/sec).
276    pub fn set_refused_rate(&self, v: f64) {
277        self.refused_rate.store(v.to_bits(), Ordering::Relaxed);
278    }
279    /// Set outbound producer/sink queue depth (messages).
280    pub fn set_produce_queue_depth(&self, v: f64) {
281        self.produce_queue_depth
282            .store(v.to_bits(), Ordering::Relaxed);
283    }
284    /// Set the outbound circuit-breaker state (the only default gate).
285    pub fn set_circuit_open(&self, open: bool) {
286        self.circuit_open.store(open, Ordering::Relaxed);
287    }
288
289    /// Set (insert or overwrite) an app-pushed DOMAIN signal by name. These are
290    /// NOT known at config-load (apps push them at runtime) and are exposed to
291    /// CEL under the `custom.<name>` map -- e.g. `set_custom("ch_insert_backlog",
292    /// n)` then a pressure expression `custom.ch_insert_backlog / params.ch_target`.
293    pub fn set_custom(&self, name: &str, value: f64) {
294        self.custom.lock().insert(name.to_string(), value);
295    }
296
297    /// Read a consistent-enough snapshot for this tick (Relaxed -- a tick is a
298    /// periodic best-effort sample, not a linearisation point).
299    #[must_use]
300    pub fn snapshot(&self) -> TransportSignals {
301        let read = |a: &AtomicU64| -> Option<f64> {
302            let v = f64::from_bits(a.load(Ordering::Relaxed));
303            if v.is_nan() { None } else { Some(v) }
304        };
305        TransportSignals {
306            kafka_assigned_lag: read(&self.kafka_assigned_lag),
307            redis_pending: read(&self.redis_pending),
308            inflight: read(&self.inflight),
309            shed_rate: read(&self.shed_rate),
310            send_backpressure_rate: read(&self.send_backpressure_rate),
311            refused_rate: read(&self.refused_rate),
312            produce_queue_depth: read(&self.produce_queue_depth),
313            circuit_open: self.circuit_open.load(Ordering::Relaxed),
314            custom: self.custom.lock().clone(),
315        }
316    }
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322    use std::collections::BTreeMap;
323
324    fn targets(pairs: &[(&str, f64)]) -> PressureTargets {
325        let mut m = BTreeMap::new();
326        for (k, v) in pairs {
327            m.insert((*k).to_string(), *v);
328        }
329        PressureTargets::from_params(&m)
330    }
331
332    #[test]
333    fn kind_from_label() {
334        assert_eq!(
335            ScalingTransport::from_label("Kafka"),
336            ScalingTransport::Kafka
337        );
338        assert_eq!(
339            ScalingTransport::from_label("redis-streams"),
340            ScalingTransport::Redis
341        );
342        assert_eq!(ScalingTransport::from_label("grpc"), ScalingTransport::Grpc);
343        assert_eq!(
344            ScalingTransport::from_label("nonsense"),
345            ScalingTransport::Other
346        );
347    }
348
349    #[test]
350    fn horizontally_scalable_classification() {
351        for k in [
352            ScalingTransport::Kafka,
353            ScalingTransport::Redis,
354            ScalingTransport::Http,
355            ScalingTransport::Grpc,
356        ] {
357            assert!(k.is_horizontally_scalable_inbound(), "{k:?}");
358        }
359        for k in [
360            ScalingTransport::File,
361            ScalingTransport::Pipe,
362            ScalingTransport::Memory,
363            ScalingTransport::Other,
364        ] {
365            assert!(!k.is_horizontally_scalable_inbound(), "{k:?}");
366        }
367    }
368
369    #[test]
370    fn kafka_lag_needs_a_target_else_zero() {
371        let s = TransportSignals {
372            kafka_assigned_lag: Some(50_000.0),
373            ..Default::default()
374        };
375        // No lag_target -> term is 0 (never NaN), per spec.
376        let t = targets(&[]);
377        assert!(inbound_pressure(ScalingTransport::Kafka, &s, &t).abs() < f64::EPSILON);
378        // With a per-pod target it normalises.
379        let t = targets(&[("lag_target", 100_000.0)]);
380        assert!((inbound_pressure(ScalingTransport::Kafka, &s, &t) - 0.5).abs() < 1e-9);
381    }
382
383    #[test]
384    fn kafka_lag_unclamped_above_one() {
385        let s = TransportSignals {
386            kafka_assigned_lag: Some(250_000.0),
387            ..Default::default()
388        };
389        let t = targets(&[("lag_target", 100_000.0)]);
390        // 2.5 -- left unclamped for proportional scale-up.
391        assert!((inbound_pressure(ScalingTransport::Kafka, &s, &t) - 2.5).abs() < 1e-9);
392    }
393
394    #[test]
395    fn http_takes_max_of_inflight_and_shed() {
396        // in-flight 50/100 = 0.5; shed 8/10 = 0.8 -> max 0.8 (confirmed overload outvotes).
397        let s = TransportSignals {
398            inflight: Some(50.0),
399            shed_rate: Some(8.0),
400            ..Default::default()
401        };
402        let t = targets(&[("http_concurrency_target", 100.0), ("shed_target", 10.0)]);
403        assert!((inbound_pressure(ScalingTransport::Http, &s, &t) - 0.8).abs() < 1e-9);
404    }
405
406    #[test]
407    fn non_scalable_inbound_is_zero() {
408        let s = TransportSignals {
409            kafka_assigned_lag: Some(999.0),
410            inflight: Some(999.0),
411            ..Default::default()
412        };
413        let t = targets(&[("lag_target", 1.0), ("http_concurrency_target", 1.0)]);
414        for k in [
415            ScalingTransport::File,
416            ScalingTransport::Pipe,
417            ScalingTransport::Memory,
418        ] {
419            assert!(inbound_pressure(k, &s, &t).abs() < f64::EPSILON, "{k:?}");
420        }
421    }
422
423    #[test]
424    fn nan_inputs_never_propagate() {
425        let s = TransportSignals {
426            kafka_assigned_lag: Some(f64::NAN),
427            inflight: Some(f64::INFINITY),
428            ..Default::default()
429        };
430        let t = targets(&[("lag_target", 100.0), ("http_concurrency_target", 100.0)]);
431        assert!(inbound_pressure(ScalingTransport::Kafka, &s, &t).abs() < f64::EPSILON);
432        assert!(inbound_pressure(ScalingTransport::Http, &s, &t).abs() < f64::EPSILON);
433    }
434
435    #[test]
436    fn set_custom_flows_into_snapshot() {
437        let cell = ScalingSignalsCell::new();
438        assert!(cell.snapshot().custom.is_empty());
439        cell.set_custom("clickhouse_backlog", 42.0);
440        cell.set_custom("api_throttle", 0.7);
441        let snap = cell.snapshot();
442        assert!((snap.custom["clickhouse_backlog"] - 42.0).abs() < 1e-9);
443        assert!((snap.custom["api_throttle"] - 0.7).abs() < 1e-9);
444        // Overwrite semantics.
445        cell.set_custom("clickhouse_backlog", 99.0);
446        assert!((cell.snapshot().custom["clickhouse_backlog"] - 99.0).abs() < 1e-9);
447    }
448
449    #[test]
450    fn outbound_composes_but_defaults_zero() {
451        let s = TransportSignals::default();
452        let t = targets(&[]);
453        assert!(outbound_pressure(&s, &t).abs() < f64::EPSILON);
454        let s = TransportSignals {
455            refused_rate: Some(20.0),
456            ..Default::default()
457        };
458        let t = targets(&[("shed_target", 10.0)]);
459        assert!((outbound_pressure(&s, &t) - 2.0).abs() < 1e-9);
460    }
461}