Skip to main content

hyperi_rustlib/scaling/
transport_pressure.rs

1// Project:   hyperi-rustlib
2// File:      src/scaling/transport_pressure.rs
3// Purpose:   Compound, per-pod transport scaling pressure (gratis default)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Compound transport scaling pressure -- the "works most of the time" inbound
10//! (and outbound) signal (scaling ACR + spec 5c). rustlib provides the COMPUTOR,
11//! the compound gauge, and the CPU term gratis; the per-pod transport SIGNALS
12//! are fed by the app (or a governed receiver) via `ScalingSignalsCell` -- they
13//! are NOT auto-collected, so an app that pushes nothing gets a CPU-only default.
14//!
15//! rustlib computes ONE normalised inbound pressure (and one outbound) from the
16//! signals a pod can know LOCALLY -- no peer/replica count. The conditional
17//! "which signal for which transport" lives HERE, once, instead of in every
18//! app's CEL expression:
19//!
20//! - **Kafka** -> lag over THIS instance's assigned partitions / `lag_target`
21//!   (inherently per-pod; falls as the group grows).
22//! - **Redis** -> this consumer's pending / `redis_lag_target`.
23//! - **HTTP / gRPC** -> this pod's in-flight / concurrency target, `max`'d with
24//!   its shed rate / `shed_target`.
25//! - **file / pipe / memory** -> 0 (not horizontally scalable).
26//!
27//! Ratios are floored at 0 and left UNCLAMPED above 1.0 so a KEDA Prometheus
28//! scaler (Value/AverageValue target) gets proportional scale-up; the smart
29//! default composite applies the `min(1, ...)` bound when producing the 0-100
30//! `scaling_pressure`.
31
32use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33
34use serde::{Deserialize, Serialize};
35
36/// Inbound/outbound transport kind, for scaling-signal selection.
37///
38/// Deliberately separate from [`crate::metrics::TransportKind`] -- this carries
39/// the *scaling* semantics (is-horizontally-scalable, which signal to read).
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
41#[serde(rename_all = "lowercase")]
42pub enum ScalingTransport {
43    /// Apache Kafka consumer (pull, durable backlog).
44    Kafka,
45    /// Redis Streams consumer (pull, durable backlog).
46    Redis,
47    /// HTTP server (push originator).
48    Http,
49    /// gRPC server (push originator).
50    Grpc,
51    /// File source (single sequential reader).
52    File,
53    /// Pipe / stdin (forward-only).
54    Pipe,
55    /// In-process memory transport (test / loopback).
56    Memory,
57    /// Anything else / not classified.
58    Other,
59}
60
61impl ScalingTransport {
62    /// Parse from a transport label (case-insensitive). Unknown -> [`Self::Other`].
63    #[must_use]
64    pub fn from_label(s: &str) -> Self {
65        match s.to_ascii_lowercase().as_str() {
66            "kafka" => Self::Kafka,
67            "redis" | "redis_stream" | "redis-streams" | "redisstream" => Self::Redis,
68            "http" => Self::Http,
69            "grpc" => Self::Grpc,
70            "file" => Self::File,
71            "pipe" | "stdin" => Self::Pipe,
72            "memory" => Self::Memory,
73            _ => Self::Other,
74        }
75    }
76
77    /// Whether adding pods can relieve load on this inbound transport -- i.e. it
78    /// has a durable external backlog (kafka/redis) or LB-distributed load
79    /// (http/grpc). file/pipe/memory cannot be scaled out.
80    #[must_use]
81    pub fn is_horizontally_scalable_inbound(self) -> bool {
82        matches!(self, Self::Kafka | Self::Redis | Self::Http | Self::Grpc)
83    }
84}
85
86/// Per-pod, locally-knowable scaling signals, sampled each scaling tick.
87///
88/// EVERY field is what THIS instance knows from its own state or its
89/// broker/coordinator session -- no peer/replica assumptions (scaling ACR
90/// principle 1). `None` means "not applicable / not yet known" and contributes
91/// 0 to the pressure (never NaN).
92#[derive(Debug, Clone, Default)]
93pub struct TransportSignals {
94    /// Kafka: summed lag over THIS instance's ASSIGNED partitions (messages).
95    pub kafka_assigned_lag: Option<f64>,
96    /// Redis: this consumer's pending / per-consumer group lag (messages).
97    pub redis_pending: Option<f64>,
98    /// HTTP/gRPC: this pod's in-flight request count.
99    pub inflight: Option<f64>,
100    /// HTTP/gRPC: this pod's shed/reject rate (events per second).
101    pub shed_rate: Option<f64>,
102    /// Outbound: send-backpressure rate (events per second).
103    pub send_backpressure_rate: Option<f64>,
104    /// Outbound: refused/dropped rate (events per second).
105    pub refused_rate: Option<f64>,
106    /// Outbound: producer/sink queue depth (messages).
107    pub produce_queue_depth: Option<f64>,
108    /// Outbound circuit breaker open (sink dead -> the composite gate).
109    pub circuit_open: bool,
110}
111
112/// Per-pod normalisation targets (KEDA `lagThreshold`-style: "what ONE pod
113/// tolerates"). Pulled from the config `params` map with researched defaults.
114#[derive(Debug, Clone)]
115pub struct PressureTargets {
116    /// Kafka per-pod lag target. NO universal default (spec) -- absent => the
117    /// kafka term contributes 0 (never NaN).
118    pub lag_target: Option<f64>,
119    /// Redis per-pod lag target. Absent => the redis term contributes 0.
120    pub redis_lag_target: Option<f64>,
121    /// HTTP per-pod in-flight concurrency target (KEDA http-add-on ref: 100).
122    pub http_concurrency_target: f64,
123    /// gRPC per-pod in-flight concurrency target.
124    pub grpc_concurrency_target: f64,
125    /// Shed/reject rate that counts as full overload (events/sec).
126    pub shed_target: f64,
127    /// Outbound producer/sink queue-depth target. Absent => term 0.
128    pub produce_queue_target: Option<f64>,
129}
130
131impl PressureTargets {
132    /// Build from the config `params` map. Transport targets without a
133    /// universally-safe default are left `None` (their term contributes 0 until
134    /// the operator sizes them); concurrency/shed get researched defaults.
135    #[must_use]
136    pub fn from_params(params: &std::collections::BTreeMap<String, f64>) -> Self {
137        let get = |k: &str| params.get(k).copied();
138        Self {
139            lag_target: get("lag_target"),
140            redis_lag_target: get("redis_lag_target"),
141            http_concurrency_target: get("http_concurrency_target").unwrap_or(100.0),
142            grpc_concurrency_target: get("grpc_concurrency_target").unwrap_or(100.0),
143            shed_target: get("shed_target").unwrap_or(10.0),
144            produce_queue_target: get("produce_queue_target"),
145        }
146    }
147}
148
149/// `value / target` floored at 0; 0 when the target is unset/<=0 or the value is
150/// non-finite (never NaN/Inf into the pressure).
151fn ratio_opt(value: f64, target: Option<f64>) -> f64 {
152    match target {
153        Some(t) if t > 0.0 && value.is_finite() => (value / t).max(0.0),
154        _ => 0.0,
155    }
156}
157
158/// `value / target` floored at 0, for targets that always have a default.
159fn ratio(value: f64, target: f64) -> f64 {
160    if target > 0.0 && value.is_finite() {
161        (value / target).max(0.0)
162    } else {
163        0.0
164    }
165}
166
167/// Compound INBOUND pressure ratio (per-pod; >=0, unclamped above 1.0 for
168/// proportional scale-up). Picks the signal by the configured inbound kind.
169#[must_use]
170pub fn inbound_pressure(kind: ScalingTransport, s: &TransportSignals, t: &PressureTargets) -> f64 {
171    match kind {
172        ScalingTransport::Kafka => ratio_opt(s.kafka_assigned_lag.unwrap_or(0.0), t.lag_target),
173        ScalingTransport::Redis => ratio_opt(s.redis_pending.unwrap_or(0.0), t.redis_lag_target),
174        ScalingTransport::Http => {
175            let conc = ratio(s.inflight.unwrap_or(0.0), t.http_concurrency_target);
176            let shed = ratio(s.shed_rate.unwrap_or(0.0), t.shed_target);
177            conc.max(shed)
178        }
179        ScalingTransport::Grpc => {
180            let conc = ratio(s.inflight.unwrap_or(0.0), t.grpc_concurrency_target);
181            let shed = ratio(s.shed_rate.unwrap_or(0.0), t.shed_target);
182            conc.max(shed)
183        }
184        // file/pipe/memory/other: not horizontally scalable -> CPU + gate only.
185        _ => 0.0,
186    }
187}
188
189/// Compound OUTBOUND pressure ratio (per-pod). EMIT-ONLY by default -- NOT in
190/// the smart-default composite (downstream-bound; more pods rarely relieve a
191/// saturated sink -- scaling ACR). A dead sink surfaces as the circuit gate in
192/// the composite, not here.
193#[must_use]
194pub fn outbound_pressure(s: &TransportSignals, t: &PressureTargets) -> f64 {
195    let bp = ratio(s.send_backpressure_rate.unwrap_or(0.0), t.shed_target);
196    let refused = ratio(s.refused_rate.unwrap_or(0.0), t.shed_target);
197    let queue = ratio_opt(s.produce_queue_depth.unwrap_or(0.0), t.produce_queue_target);
198    bp.max(refused).max(queue)
199}
200
201/// Lock-free cell the app / transport updates with the current per-pod signals;
202/// the engine tick reads a [`snapshot`](Self::snapshot). A NaN bit-pattern means
203/// "absent" -> `None` in the snapshot (contributes 0 to the pressure).
204///
205/// CPU is sampled by the engine tick itself (process cumulative / cores), so it
206/// is NOT in this cell -- the cell carries only the transport-side signals an
207/// app pushes from its receive/send loops.
208#[derive(Debug)]
209pub struct ScalingSignalsCell {
210    kafka_assigned_lag: AtomicU64,
211    redis_pending: AtomicU64,
212    inflight: AtomicU64,
213    shed_rate: AtomicU64,
214    send_backpressure_rate: AtomicU64,
215    refused_rate: AtomicU64,
216    produce_queue_depth: AtomicU64,
217    circuit_open: AtomicBool,
218}
219
220impl Default for ScalingSignalsCell {
221    fn default() -> Self {
222        let absent = || AtomicU64::new(f64::NAN.to_bits());
223        Self {
224            kafka_assigned_lag: absent(),
225            redis_pending: absent(),
226            inflight: absent(),
227            shed_rate: absent(),
228            send_backpressure_rate: absent(),
229            refused_rate: absent(),
230            produce_queue_depth: absent(),
231            circuit_open: AtomicBool::new(false),
232        }
233    }
234}
235
236impl ScalingSignalsCell {
237    /// Create an empty cell (all signals absent, circuit closed).
238    #[must_use]
239    pub fn new() -> Self {
240        Self::default()
241    }
242
243    /// Set Kafka per-pod assigned-partition lag (messages).
244    pub fn set_kafka_assigned_lag(&self, v: f64) {
245        self.kafka_assigned_lag
246            .store(v.to_bits(), Ordering::Relaxed);
247    }
248    /// Set Redis per-consumer pending / lag (messages).
249    pub fn set_redis_pending(&self, v: f64) {
250        self.redis_pending.store(v.to_bits(), Ordering::Relaxed);
251    }
252    /// Set this pod's in-flight request count (http/grpc).
253    pub fn set_inflight(&self, v: f64) {
254        self.inflight.store(v.to_bits(), Ordering::Relaxed);
255    }
256    /// Set this pod's shed/reject rate (events/sec).
257    pub fn set_shed_rate(&self, v: f64) {
258        self.shed_rate.store(v.to_bits(), Ordering::Relaxed);
259    }
260    /// Set outbound send-backpressure rate (events/sec).
261    pub fn set_send_backpressure_rate(&self, v: f64) {
262        self.send_backpressure_rate
263            .store(v.to_bits(), Ordering::Relaxed);
264    }
265    /// Set outbound refused/dropped rate (events/sec).
266    pub fn set_refused_rate(&self, v: f64) {
267        self.refused_rate.store(v.to_bits(), Ordering::Relaxed);
268    }
269    /// Set outbound producer/sink queue depth (messages).
270    pub fn set_produce_queue_depth(&self, v: f64) {
271        self.produce_queue_depth
272            .store(v.to_bits(), Ordering::Relaxed);
273    }
274    /// Set the outbound circuit-breaker state (the only default gate).
275    pub fn set_circuit_open(&self, open: bool) {
276        self.circuit_open.store(open, Ordering::Relaxed);
277    }
278
279    /// Read a consistent-enough snapshot for this tick (Relaxed -- a tick is a
280    /// periodic best-effort sample, not a linearisation point).
281    #[must_use]
282    pub fn snapshot(&self) -> TransportSignals {
283        let read = |a: &AtomicU64| -> Option<f64> {
284            let v = f64::from_bits(a.load(Ordering::Relaxed));
285            if v.is_nan() { None } else { Some(v) }
286        };
287        TransportSignals {
288            kafka_assigned_lag: read(&self.kafka_assigned_lag),
289            redis_pending: read(&self.redis_pending),
290            inflight: read(&self.inflight),
291            shed_rate: read(&self.shed_rate),
292            send_backpressure_rate: read(&self.send_backpressure_rate),
293            refused_rate: read(&self.refused_rate),
294            produce_queue_depth: read(&self.produce_queue_depth),
295            circuit_open: self.circuit_open.load(Ordering::Relaxed),
296        }
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303    use std::collections::BTreeMap;
304
305    fn targets(pairs: &[(&str, f64)]) -> PressureTargets {
306        let mut m = BTreeMap::new();
307        for (k, v) in pairs {
308            m.insert((*k).to_string(), *v);
309        }
310        PressureTargets::from_params(&m)
311    }
312
313    #[test]
314    fn kind_from_label() {
315        assert_eq!(
316            ScalingTransport::from_label("Kafka"),
317            ScalingTransport::Kafka
318        );
319        assert_eq!(
320            ScalingTransport::from_label("redis-streams"),
321            ScalingTransport::Redis
322        );
323        assert_eq!(ScalingTransport::from_label("grpc"), ScalingTransport::Grpc);
324        assert_eq!(
325            ScalingTransport::from_label("nonsense"),
326            ScalingTransport::Other
327        );
328    }
329
330    #[test]
331    fn horizontally_scalable_classification() {
332        for k in [
333            ScalingTransport::Kafka,
334            ScalingTransport::Redis,
335            ScalingTransport::Http,
336            ScalingTransport::Grpc,
337        ] {
338            assert!(k.is_horizontally_scalable_inbound(), "{k:?}");
339        }
340        for k in [
341            ScalingTransport::File,
342            ScalingTransport::Pipe,
343            ScalingTransport::Memory,
344            ScalingTransport::Other,
345        ] {
346            assert!(!k.is_horizontally_scalable_inbound(), "{k:?}");
347        }
348    }
349
350    #[test]
351    fn kafka_lag_needs_a_target_else_zero() {
352        let s = TransportSignals {
353            kafka_assigned_lag: Some(50_000.0),
354            ..Default::default()
355        };
356        // No lag_target -> term is 0 (never NaN), per spec.
357        let t = targets(&[]);
358        assert!(inbound_pressure(ScalingTransport::Kafka, &s, &t).abs() < f64::EPSILON);
359        // With a per-pod target it normalises.
360        let t = targets(&[("lag_target", 100_000.0)]);
361        assert!((inbound_pressure(ScalingTransport::Kafka, &s, &t) - 0.5).abs() < 1e-9);
362    }
363
364    #[test]
365    fn kafka_lag_unclamped_above_one() {
366        let s = TransportSignals {
367            kafka_assigned_lag: Some(250_000.0),
368            ..Default::default()
369        };
370        let t = targets(&[("lag_target", 100_000.0)]);
371        // 2.5 -- left unclamped for proportional scale-up.
372        assert!((inbound_pressure(ScalingTransport::Kafka, &s, &t) - 2.5).abs() < 1e-9);
373    }
374
375    #[test]
376    fn http_takes_max_of_inflight_and_shed() {
377        // in-flight 50/100 = 0.5; shed 8/10 = 0.8 -> max 0.8 (confirmed overload outvotes).
378        let s = TransportSignals {
379            inflight: Some(50.0),
380            shed_rate: Some(8.0),
381            ..Default::default()
382        };
383        let t = targets(&[("http_concurrency_target", 100.0), ("shed_target", 10.0)]);
384        assert!((inbound_pressure(ScalingTransport::Http, &s, &t) - 0.8).abs() < 1e-9);
385    }
386
387    #[test]
388    fn non_scalable_inbound_is_zero() {
389        let s = TransportSignals {
390            kafka_assigned_lag: Some(999.0),
391            inflight: Some(999.0),
392            ..Default::default()
393        };
394        let t = targets(&[("lag_target", 1.0), ("http_concurrency_target", 1.0)]);
395        for k in [
396            ScalingTransport::File,
397            ScalingTransport::Pipe,
398            ScalingTransport::Memory,
399        ] {
400            assert!(inbound_pressure(k, &s, &t).abs() < f64::EPSILON, "{k:?}");
401        }
402    }
403
404    #[test]
405    fn nan_inputs_never_propagate() {
406        let s = TransportSignals {
407            kafka_assigned_lag: Some(f64::NAN),
408            inflight: Some(f64::INFINITY),
409            ..Default::default()
410        };
411        let t = targets(&[("lag_target", 100.0), ("http_concurrency_target", 100.0)]);
412        assert!(inbound_pressure(ScalingTransport::Kafka, &s, &t).abs() < f64::EPSILON);
413        assert!(inbound_pressure(ScalingTransport::Http, &s, &t).abs() < f64::EPSILON);
414    }
415
416    #[test]
417    fn outbound_composes_but_defaults_zero() {
418        let s = TransportSignals::default();
419        let t = targets(&[]);
420        assert!(outbound_pressure(&s, &t).abs() < f64::EPSILON);
421        let s = TransportSignals {
422            refused_rate: Some(20.0),
423            ..Default::default()
424        };
425        let t = targets(&[("shed_target", 10.0)]);
426        assert!((outbound_pressure(&s, &t) - 2.0).abs() < 1e-9);
427    }
428}