1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33
34use serde::{Deserialize, Serialize};
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
41#[serde(rename_all = "lowercase")]
42pub enum ScalingTransport {
43 Kafka,
45 Redis,
47 Http,
49 Grpc,
51 File,
53 Pipe,
55 Memory,
57 Other,
59}
60
61impl ScalingTransport {
62 #[must_use]
64 pub fn from_label(s: &str) -> Self {
65 match s.to_ascii_lowercase().as_str() {
66 "kafka" => Self::Kafka,
67 "redis" | "redis_stream" | "redis-streams" | "redisstream" => Self::Redis,
68 "http" => Self::Http,
69 "grpc" => Self::Grpc,
70 "file" => Self::File,
71 "pipe" | "stdin" => Self::Pipe,
72 "memory" => Self::Memory,
73 _ => Self::Other,
74 }
75 }
76
77 #[must_use]
81 pub fn is_horizontally_scalable_inbound(self) -> bool {
82 matches!(self, Self::Kafka | Self::Redis | Self::Http | Self::Grpc)
83 }
84}
85
86#[derive(Debug, Clone, Default)]
93pub struct TransportSignals {
94 pub kafka_assigned_lag: Option<f64>,
96 pub redis_pending: Option<f64>,
98 pub inflight: Option<f64>,
100 pub shed_rate: Option<f64>,
102 pub send_backpressure_rate: Option<f64>,
104 pub refused_rate: Option<f64>,
106 pub produce_queue_depth: Option<f64>,
108 pub circuit_open: bool,
110}
111
112#[derive(Debug, Clone)]
115pub struct PressureTargets {
116 pub lag_target: Option<f64>,
119 pub redis_lag_target: Option<f64>,
121 pub http_concurrency_target: f64,
123 pub grpc_concurrency_target: f64,
125 pub shed_target: f64,
127 pub produce_queue_target: Option<f64>,
129}
130
131impl PressureTargets {
132 #[must_use]
136 pub fn from_params(params: &std::collections::BTreeMap<String, f64>) -> Self {
137 let get = |k: &str| params.get(k).copied();
138 Self {
139 lag_target: get("lag_target"),
140 redis_lag_target: get("redis_lag_target"),
141 http_concurrency_target: get("http_concurrency_target").unwrap_or(100.0),
142 grpc_concurrency_target: get("grpc_concurrency_target").unwrap_or(100.0),
143 shed_target: get("shed_target").unwrap_or(10.0),
144 produce_queue_target: get("produce_queue_target"),
145 }
146 }
147}
148
149fn ratio_opt(value: f64, target: Option<f64>) -> f64 {
152 match target {
153 Some(t) if t > 0.0 && value.is_finite() => (value / t).max(0.0),
154 _ => 0.0,
155 }
156}
157
158fn ratio(value: f64, target: f64) -> f64 {
160 if target > 0.0 && value.is_finite() {
161 (value / target).max(0.0)
162 } else {
163 0.0
164 }
165}
166
167#[must_use]
170pub fn inbound_pressure(kind: ScalingTransport, s: &TransportSignals, t: &PressureTargets) -> f64 {
171 match kind {
172 ScalingTransport::Kafka => ratio_opt(s.kafka_assigned_lag.unwrap_or(0.0), t.lag_target),
173 ScalingTransport::Redis => ratio_opt(s.redis_pending.unwrap_or(0.0), t.redis_lag_target),
174 ScalingTransport::Http => {
175 let conc = ratio(s.inflight.unwrap_or(0.0), t.http_concurrency_target);
176 let shed = ratio(s.shed_rate.unwrap_or(0.0), t.shed_target);
177 conc.max(shed)
178 }
179 ScalingTransport::Grpc => {
180 let conc = ratio(s.inflight.unwrap_or(0.0), t.grpc_concurrency_target);
181 let shed = ratio(s.shed_rate.unwrap_or(0.0), t.shed_target);
182 conc.max(shed)
183 }
184 _ => 0.0,
186 }
187}
188
189#[must_use]
194pub fn outbound_pressure(s: &TransportSignals, t: &PressureTargets) -> f64 {
195 let bp = ratio(s.send_backpressure_rate.unwrap_or(0.0), t.shed_target);
196 let refused = ratio(s.refused_rate.unwrap_or(0.0), t.shed_target);
197 let queue = ratio_opt(s.produce_queue_depth.unwrap_or(0.0), t.produce_queue_target);
198 bp.max(refused).max(queue)
199}
200
201#[derive(Debug)]
209pub struct ScalingSignalsCell {
210 kafka_assigned_lag: AtomicU64,
211 redis_pending: AtomicU64,
212 inflight: AtomicU64,
213 shed_rate: AtomicU64,
214 send_backpressure_rate: AtomicU64,
215 refused_rate: AtomicU64,
216 produce_queue_depth: AtomicU64,
217 circuit_open: AtomicBool,
218}
219
220impl Default for ScalingSignalsCell {
221 fn default() -> Self {
222 let absent = || AtomicU64::new(f64::NAN.to_bits());
223 Self {
224 kafka_assigned_lag: absent(),
225 redis_pending: absent(),
226 inflight: absent(),
227 shed_rate: absent(),
228 send_backpressure_rate: absent(),
229 refused_rate: absent(),
230 produce_queue_depth: absent(),
231 circuit_open: AtomicBool::new(false),
232 }
233 }
234}
235
236impl ScalingSignalsCell {
237 #[must_use]
239 pub fn new() -> Self {
240 Self::default()
241 }
242
243 pub fn set_kafka_assigned_lag(&self, v: f64) {
245 self.kafka_assigned_lag
246 .store(v.to_bits(), Ordering::Relaxed);
247 }
248 pub fn set_redis_pending(&self, v: f64) {
250 self.redis_pending.store(v.to_bits(), Ordering::Relaxed);
251 }
252 pub fn set_inflight(&self, v: f64) {
254 self.inflight.store(v.to_bits(), Ordering::Relaxed);
255 }
256 pub fn set_shed_rate(&self, v: f64) {
258 self.shed_rate.store(v.to_bits(), Ordering::Relaxed);
259 }
260 pub fn set_send_backpressure_rate(&self, v: f64) {
262 self.send_backpressure_rate
263 .store(v.to_bits(), Ordering::Relaxed);
264 }
265 pub fn set_refused_rate(&self, v: f64) {
267 self.refused_rate.store(v.to_bits(), Ordering::Relaxed);
268 }
269 pub fn set_produce_queue_depth(&self, v: f64) {
271 self.produce_queue_depth
272 .store(v.to_bits(), Ordering::Relaxed);
273 }
274 pub fn set_circuit_open(&self, open: bool) {
276 self.circuit_open.store(open, Ordering::Relaxed);
277 }
278
279 #[must_use]
282 pub fn snapshot(&self) -> TransportSignals {
283 let read = |a: &AtomicU64| -> Option<f64> {
284 let v = f64::from_bits(a.load(Ordering::Relaxed));
285 if v.is_nan() { None } else { Some(v) }
286 };
287 TransportSignals {
288 kafka_assigned_lag: read(&self.kafka_assigned_lag),
289 redis_pending: read(&self.redis_pending),
290 inflight: read(&self.inflight),
291 shed_rate: read(&self.shed_rate),
292 send_backpressure_rate: read(&self.send_backpressure_rate),
293 refused_rate: read(&self.refused_rate),
294 produce_queue_depth: read(&self.produce_queue_depth),
295 circuit_open: self.circuit_open.load(Ordering::Relaxed),
296 }
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303 use std::collections::BTreeMap;
304
305 fn targets(pairs: &[(&str, f64)]) -> PressureTargets {
306 let mut m = BTreeMap::new();
307 for (k, v) in pairs {
308 m.insert((*k).to_string(), *v);
309 }
310 PressureTargets::from_params(&m)
311 }
312
313 #[test]
314 fn kind_from_label() {
315 assert_eq!(
316 ScalingTransport::from_label("Kafka"),
317 ScalingTransport::Kafka
318 );
319 assert_eq!(
320 ScalingTransport::from_label("redis-streams"),
321 ScalingTransport::Redis
322 );
323 assert_eq!(ScalingTransport::from_label("grpc"), ScalingTransport::Grpc);
324 assert_eq!(
325 ScalingTransport::from_label("nonsense"),
326 ScalingTransport::Other
327 );
328 }
329
330 #[test]
331 fn horizontally_scalable_classification() {
332 for k in [
333 ScalingTransport::Kafka,
334 ScalingTransport::Redis,
335 ScalingTransport::Http,
336 ScalingTransport::Grpc,
337 ] {
338 assert!(k.is_horizontally_scalable_inbound(), "{k:?}");
339 }
340 for k in [
341 ScalingTransport::File,
342 ScalingTransport::Pipe,
343 ScalingTransport::Memory,
344 ScalingTransport::Other,
345 ] {
346 assert!(!k.is_horizontally_scalable_inbound(), "{k:?}");
347 }
348 }
349
350 #[test]
351 fn kafka_lag_needs_a_target_else_zero() {
352 let s = TransportSignals {
353 kafka_assigned_lag: Some(50_000.0),
354 ..Default::default()
355 };
356 let t = targets(&[]);
358 assert!(inbound_pressure(ScalingTransport::Kafka, &s, &t).abs() < f64::EPSILON);
359 let t = targets(&[("lag_target", 100_000.0)]);
361 assert!((inbound_pressure(ScalingTransport::Kafka, &s, &t) - 0.5).abs() < 1e-9);
362 }
363
364 #[test]
365 fn kafka_lag_unclamped_above_one() {
366 let s = TransportSignals {
367 kafka_assigned_lag: Some(250_000.0),
368 ..Default::default()
369 };
370 let t = targets(&[("lag_target", 100_000.0)]);
371 assert!((inbound_pressure(ScalingTransport::Kafka, &s, &t) - 2.5).abs() < 1e-9);
373 }
374
375 #[test]
376 fn http_takes_max_of_inflight_and_shed() {
377 let s = TransportSignals {
379 inflight: Some(50.0),
380 shed_rate: Some(8.0),
381 ..Default::default()
382 };
383 let t = targets(&[("http_concurrency_target", 100.0), ("shed_target", 10.0)]);
384 assert!((inbound_pressure(ScalingTransport::Http, &s, &t) - 0.8).abs() < 1e-9);
385 }
386
387 #[test]
388 fn non_scalable_inbound_is_zero() {
389 let s = TransportSignals {
390 kafka_assigned_lag: Some(999.0),
391 inflight: Some(999.0),
392 ..Default::default()
393 };
394 let t = targets(&[("lag_target", 1.0), ("http_concurrency_target", 1.0)]);
395 for k in [
396 ScalingTransport::File,
397 ScalingTransport::Pipe,
398 ScalingTransport::Memory,
399 ] {
400 assert!(inbound_pressure(k, &s, &t).abs() < f64::EPSILON, "{k:?}");
401 }
402 }
403
404 #[test]
405 fn nan_inputs_never_propagate() {
406 let s = TransportSignals {
407 kafka_assigned_lag: Some(f64::NAN),
408 inflight: Some(f64::INFINITY),
409 ..Default::default()
410 };
411 let t = targets(&[("lag_target", 100.0), ("http_concurrency_target", 100.0)]);
412 assert!(inbound_pressure(ScalingTransport::Kafka, &s, &t).abs() < f64::EPSILON);
413 assert!(inbound_pressure(ScalingTransport::Http, &s, &t).abs() < f64::EPSILON);
414 }
415
416 #[test]
417 fn outbound_composes_but_defaults_zero() {
418 let s = TransportSignals::default();
419 let t = targets(&[]);
420 assert!(outbound_pressure(&s, &t).abs() < f64::EPSILON);
421 let s = TransportSignals {
422 refused_rate: Some(20.0),
423 ..Default::default()
424 };
425 let t = targets(&[("shed_target", 10.0)]);
426 assert!((outbound_pressure(&s, &t) - 2.0).abs() < 1e-9);
427 }
428}