1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33
34use parking_lot::Mutex;
35use serde::{Deserialize, Serialize};
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
42#[serde(rename_all = "lowercase")]
43pub enum ScalingTransport {
44 Kafka,
46 Redis,
48 Http,
50 Grpc,
52 File,
54 Pipe,
56 Memory,
58 Other,
60}
61
62impl ScalingTransport {
63 #[must_use]
65 pub fn from_label(s: &str) -> Self {
66 match s.to_ascii_lowercase().as_str() {
67 "kafka" => Self::Kafka,
68 "redis" | "redis_stream" | "redis-streams" | "redisstream" => Self::Redis,
69 "http" => Self::Http,
70 "grpc" => Self::Grpc,
71 "file" => Self::File,
72 "pipe" | "stdin" => Self::Pipe,
73 "memory" => Self::Memory,
74 _ => Self::Other,
75 }
76 }
77
78 #[must_use]
82 pub fn is_horizontally_scalable_inbound(self) -> bool {
83 matches!(self, Self::Kafka | Self::Redis | Self::Http | Self::Grpc)
84 }
85}
86
87#[derive(Debug, Clone, Default)]
94pub struct TransportSignals {
95 pub kafka_assigned_lag: Option<f64>,
97 pub redis_pending: Option<f64>,
99 pub inflight: Option<f64>,
101 pub shed_rate: Option<f64>,
103 pub send_backpressure_rate: Option<f64>,
105 pub refused_rate: Option<f64>,
107 pub produce_queue_depth: Option<f64>,
109 pub circuit_open: bool,
111 pub custom: std::collections::BTreeMap<String, f64>,
115}
116
117#[derive(Debug, Clone)]
120pub struct PressureTargets {
121 pub lag_target: Option<f64>,
124 pub redis_lag_target: Option<f64>,
126 pub http_concurrency_target: f64,
128 pub grpc_concurrency_target: f64,
130 pub shed_target: f64,
132 pub produce_queue_target: Option<f64>,
134}
135
136impl PressureTargets {
137 #[must_use]
141 pub fn from_params(params: &std::collections::BTreeMap<String, f64>) -> Self {
142 let get = |k: &str| params.get(k).copied();
143 Self {
144 lag_target: get("lag_target"),
145 redis_lag_target: get("redis_lag_target"),
146 http_concurrency_target: get("http_concurrency_target").unwrap_or(100.0),
147 grpc_concurrency_target: get("grpc_concurrency_target").unwrap_or(100.0),
148 shed_target: get("shed_target").unwrap_or(10.0),
149 produce_queue_target: get("produce_queue_target"),
150 }
151 }
152}
153
154fn ratio_opt(value: f64, target: Option<f64>) -> f64 {
157 match target {
158 Some(t) if t > 0.0 && value.is_finite() => (value / t).max(0.0),
159 _ => 0.0,
160 }
161}
162
163fn ratio(value: f64, target: f64) -> f64 {
165 if target > 0.0 && value.is_finite() {
166 (value / target).max(0.0)
167 } else {
168 0.0
169 }
170}
171
172#[must_use]
175pub fn inbound_pressure(kind: ScalingTransport, s: &TransportSignals, t: &PressureTargets) -> f64 {
176 match kind {
177 ScalingTransport::Kafka => ratio_opt(s.kafka_assigned_lag.unwrap_or(0.0), t.lag_target),
178 ScalingTransport::Redis => ratio_opt(s.redis_pending.unwrap_or(0.0), t.redis_lag_target),
179 ScalingTransport::Http => {
180 let conc = ratio(s.inflight.unwrap_or(0.0), t.http_concurrency_target);
181 let shed = ratio(s.shed_rate.unwrap_or(0.0), t.shed_target);
182 conc.max(shed)
183 }
184 ScalingTransport::Grpc => {
185 let conc = ratio(s.inflight.unwrap_or(0.0), t.grpc_concurrency_target);
186 let shed = ratio(s.shed_rate.unwrap_or(0.0), t.shed_target);
187 conc.max(shed)
188 }
189 _ => 0.0,
191 }
192}
193
194#[must_use]
199pub fn outbound_pressure(s: &TransportSignals, t: &PressureTargets) -> f64 {
200 let bp = ratio(s.send_backpressure_rate.unwrap_or(0.0), t.shed_target);
201 let refused = ratio(s.refused_rate.unwrap_or(0.0), t.shed_target);
202 let queue = ratio_opt(s.produce_queue_depth.unwrap_or(0.0), t.produce_queue_target);
203 bp.max(refused).max(queue)
204}
205
206#[derive(Debug)]
214pub struct ScalingSignalsCell {
215 kafka_assigned_lag: AtomicU64,
216 redis_pending: AtomicU64,
217 inflight: AtomicU64,
218 shed_rate: AtomicU64,
219 send_backpressure_rate: AtomicU64,
220 refused_rate: AtomicU64,
221 produce_queue_depth: AtomicU64,
222 circuit_open: AtomicBool,
223 custom: Mutex<std::collections::BTreeMap<String, f64>>,
227}
228
229impl Default for ScalingSignalsCell {
230 fn default() -> Self {
231 let absent = || AtomicU64::new(f64::NAN.to_bits());
232 Self {
233 kafka_assigned_lag: absent(),
234 redis_pending: absent(),
235 inflight: absent(),
236 shed_rate: absent(),
237 send_backpressure_rate: absent(),
238 refused_rate: absent(),
239 produce_queue_depth: absent(),
240 circuit_open: AtomicBool::new(false),
241 custom: Mutex::new(std::collections::BTreeMap::new()),
242 }
243 }
244}
245
246impl ScalingSignalsCell {
247 #[must_use]
249 pub fn new() -> Self {
250 Self::default()
251 }
252
253 pub fn set_kafka_assigned_lag(&self, v: f64) {
255 self.kafka_assigned_lag
256 .store(v.to_bits(), Ordering::Relaxed);
257 }
258 pub fn set_redis_pending(&self, v: f64) {
260 self.redis_pending.store(v.to_bits(), Ordering::Relaxed);
261 }
262 pub fn set_inflight(&self, v: f64) {
264 self.inflight.store(v.to_bits(), Ordering::Relaxed);
265 }
266 pub fn set_shed_rate(&self, v: f64) {
268 self.shed_rate.store(v.to_bits(), Ordering::Relaxed);
269 }
270 pub fn set_send_backpressure_rate(&self, v: f64) {
272 self.send_backpressure_rate
273 .store(v.to_bits(), Ordering::Relaxed);
274 }
275 pub fn set_refused_rate(&self, v: f64) {
277 self.refused_rate.store(v.to_bits(), Ordering::Relaxed);
278 }
279 pub fn set_produce_queue_depth(&self, v: f64) {
281 self.produce_queue_depth
282 .store(v.to_bits(), Ordering::Relaxed);
283 }
284 pub fn set_circuit_open(&self, open: bool) {
286 self.circuit_open.store(open, Ordering::Relaxed);
287 }
288
289 pub fn set_custom(&self, name: &str, value: f64) {
294 self.custom.lock().insert(name.to_string(), value);
295 }
296
297 #[must_use]
300 pub fn snapshot(&self) -> TransportSignals {
301 let read = |a: &AtomicU64| -> Option<f64> {
302 let v = f64::from_bits(a.load(Ordering::Relaxed));
303 if v.is_nan() { None } else { Some(v) }
304 };
305 TransportSignals {
306 kafka_assigned_lag: read(&self.kafka_assigned_lag),
307 redis_pending: read(&self.redis_pending),
308 inflight: read(&self.inflight),
309 shed_rate: read(&self.shed_rate),
310 send_backpressure_rate: read(&self.send_backpressure_rate),
311 refused_rate: read(&self.refused_rate),
312 produce_queue_depth: read(&self.produce_queue_depth),
313 circuit_open: self.circuit_open.load(Ordering::Relaxed),
314 custom: self.custom.lock().clone(),
315 }
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322 use std::collections::BTreeMap;
323
324 fn targets(pairs: &[(&str, f64)]) -> PressureTargets {
325 let mut m = BTreeMap::new();
326 for (k, v) in pairs {
327 m.insert((*k).to_string(), *v);
328 }
329 PressureTargets::from_params(&m)
330 }
331
332 #[test]
333 fn kind_from_label() {
334 assert_eq!(
335 ScalingTransport::from_label("Kafka"),
336 ScalingTransport::Kafka
337 );
338 assert_eq!(
339 ScalingTransport::from_label("redis-streams"),
340 ScalingTransport::Redis
341 );
342 assert_eq!(ScalingTransport::from_label("grpc"), ScalingTransport::Grpc);
343 assert_eq!(
344 ScalingTransport::from_label("nonsense"),
345 ScalingTransport::Other
346 );
347 }
348
349 #[test]
350 fn horizontally_scalable_classification() {
351 for k in [
352 ScalingTransport::Kafka,
353 ScalingTransport::Redis,
354 ScalingTransport::Http,
355 ScalingTransport::Grpc,
356 ] {
357 assert!(k.is_horizontally_scalable_inbound(), "{k:?}");
358 }
359 for k in [
360 ScalingTransport::File,
361 ScalingTransport::Pipe,
362 ScalingTransport::Memory,
363 ScalingTransport::Other,
364 ] {
365 assert!(!k.is_horizontally_scalable_inbound(), "{k:?}");
366 }
367 }
368
369 #[test]
370 fn kafka_lag_needs_a_target_else_zero() {
371 let s = TransportSignals {
372 kafka_assigned_lag: Some(50_000.0),
373 ..Default::default()
374 };
375 let t = targets(&[]);
377 assert!(inbound_pressure(ScalingTransport::Kafka, &s, &t).abs() < f64::EPSILON);
378 let t = targets(&[("lag_target", 100_000.0)]);
380 assert!((inbound_pressure(ScalingTransport::Kafka, &s, &t) - 0.5).abs() < 1e-9);
381 }
382
383 #[test]
384 fn kafka_lag_unclamped_above_one() {
385 let s = TransportSignals {
386 kafka_assigned_lag: Some(250_000.0),
387 ..Default::default()
388 };
389 let t = targets(&[("lag_target", 100_000.0)]);
390 assert!((inbound_pressure(ScalingTransport::Kafka, &s, &t) - 2.5).abs() < 1e-9);
392 }
393
394 #[test]
395 fn http_takes_max_of_inflight_and_shed() {
396 let s = TransportSignals {
398 inflight: Some(50.0),
399 shed_rate: Some(8.0),
400 ..Default::default()
401 };
402 let t = targets(&[("http_concurrency_target", 100.0), ("shed_target", 10.0)]);
403 assert!((inbound_pressure(ScalingTransport::Http, &s, &t) - 0.8).abs() < 1e-9);
404 }
405
406 #[test]
407 fn non_scalable_inbound_is_zero() {
408 let s = TransportSignals {
409 kafka_assigned_lag: Some(999.0),
410 inflight: Some(999.0),
411 ..Default::default()
412 };
413 let t = targets(&[("lag_target", 1.0), ("http_concurrency_target", 1.0)]);
414 for k in [
415 ScalingTransport::File,
416 ScalingTransport::Pipe,
417 ScalingTransport::Memory,
418 ] {
419 assert!(inbound_pressure(k, &s, &t).abs() < f64::EPSILON, "{k:?}");
420 }
421 }
422
423 #[test]
424 fn nan_inputs_never_propagate() {
425 let s = TransportSignals {
426 kafka_assigned_lag: Some(f64::NAN),
427 inflight: Some(f64::INFINITY),
428 ..Default::default()
429 };
430 let t = targets(&[("lag_target", 100.0), ("http_concurrency_target", 100.0)]);
431 assert!(inbound_pressure(ScalingTransport::Kafka, &s, &t).abs() < f64::EPSILON);
432 assert!(inbound_pressure(ScalingTransport::Http, &s, &t).abs() < f64::EPSILON);
433 }
434
435 #[test]
436 fn set_custom_flows_into_snapshot() {
437 let cell = ScalingSignalsCell::new();
438 assert!(cell.snapshot().custom.is_empty());
439 cell.set_custom("clickhouse_backlog", 42.0);
440 cell.set_custom("api_throttle", 0.7);
441 let snap = cell.snapshot();
442 assert!((snap.custom["clickhouse_backlog"] - 42.0).abs() < 1e-9);
443 assert!((snap.custom["api_throttle"] - 0.7).abs() < 1e-9);
444 cell.set_custom("clickhouse_backlog", 99.0);
446 assert!((cell.snapshot().custom["clickhouse_backlog"] - 99.0).abs() < 1e-9);
447 }
448
449 #[test]
450 fn outbound_composes_but_defaults_zero() {
451 let s = TransportSignals::default();
452 let t = targets(&[]);
453 assert!(outbound_pressure(&s, &t).abs() < f64::EPSILON);
454 let s = TransportSignals {
455 refused_rate: Some(20.0),
456 ..Default::default()
457 };
458 let t = targets(&[("shed_target", 10.0)]);
459 assert!((outbound_pressure(&s, &t) - 2.0).abs() < 1e-9);
460 }
461}