1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
26
27use super::config::{ScalingComponent, ScalingPressureConfig};
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum GateType {
32 CircuitBreaker,
34 MemoryPressure,
36}
37
38impl std::fmt::Display for GateType {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 match self {
41 GateType::CircuitBreaker => write!(f, "circuit_breaker"),
42 GateType::MemoryPressure => write!(f, "memory_pressure"),
43 }
44 }
45}
46
47#[derive(Debug, Clone)]
49pub struct ComponentSnapshot {
50 pub name: String,
52 pub raw_value: f64,
54 pub score: f64,
56 pub weight: f64,
58 pub saturation: f64,
60}
61
62#[derive(Debug, Clone)]
64pub struct PressureSnapshot {
65 pub value: f64,
67 pub gate_active: Option<GateType>,
69 pub components: Vec<ComponentSnapshot>,
71 pub memory_ratio: f64,
73 pub circuit_open: bool,
75}
76
77struct ComponentEntry {
79 name: String,
80 weight: f64,
81 saturation: f64,
82 value: AtomicU64,
84}
85
86pub struct ScalingPressure {
114 enabled: bool,
115 memory_gate_threshold: f64,
116 components: Vec<ComponentEntry>,
117 circuit_open: AtomicBool,
118 memory_used: AtomicU64,
119 memory_limit: AtomicU64,
120}
121
122impl ScalingPressure {
123 #[must_use]
127 pub fn new(config: ScalingPressureConfig, components: Vec<ScalingComponent>) -> Self {
128 let entries = components
129 .into_iter()
130 .map(|c| ComponentEntry {
131 name: c.name,
132 weight: c.weight,
133 saturation: c.saturation,
134 value: AtomicU64::new(0_f64.to_bits()),
135 })
136 .collect();
137
138 Self {
139 enabled: config.enabled,
140 memory_gate_threshold: config.memory_gate_threshold,
141 components: entries,
142 circuit_open: AtomicBool::new(false),
143 memory_used: AtomicU64::new(0),
144 memory_limit: AtomicU64::new(0),
145 }
146 }
147
148 pub fn set_component(&self, name: &str, value: f64) {
152 for entry in &self.components {
153 if entry.name == name {
154 entry.value.store(value.to_bits(), Ordering::Relaxed);
155 return;
156 }
157 }
158 }
159
160 pub fn set_circuit_open(&self, open: bool) {
165 self.circuit_open.store(open, Ordering::Relaxed);
166 }
167
168 pub fn set_memory(&self, used_bytes: u64, limit_bytes: u64) {
173 self.memory_used.store(used_bytes, Ordering::Relaxed);
174 self.memory_limit.store(limit_bytes, Ordering::Relaxed);
175 }
176
177 #[must_use]
185 pub fn calculate(&self) -> f64 {
186 if !self.enabled {
187 return 0.0;
188 }
189
190 if self.circuit_open.load(Ordering::Relaxed) {
192 return 0.0;
193 }
194
195 let memory_used = self.memory_used.load(Ordering::Relaxed) as f64;
197 let memory_limit = self.memory_limit.load(Ordering::Relaxed) as f64;
198 let memory_ratio = if memory_limit > 0.0 {
199 memory_used / memory_limit
200 } else {
201 0.0
202 };
203
204 if memory_ratio >= self.memory_gate_threshold {
206 return 100.0;
207 }
208
209 let mut total = 0.0_f64;
211 for entry in &self.components {
212 let value = f64::from_bits(entry.value.load(Ordering::Relaxed));
213 let score = if entry.saturation > 0.0 {
214 (value / entry.saturation).min(1.0) * entry.weight * 100.0
215 } else {
216 0.0
217 };
218 total += score;
219 }
220
221 total.min(100.0)
222 }
223
224 #[must_use]
226 pub fn snapshot(&self) -> PressureSnapshot {
227 let circuit_open = self.circuit_open.load(Ordering::Relaxed);
228
229 let memory_used = self.memory_used.load(Ordering::Relaxed) as f64;
230 let memory_limit = self.memory_limit.load(Ordering::Relaxed) as f64;
231 let memory_ratio = if memory_limit > 0.0 {
232 memory_used / memory_limit
233 } else {
234 0.0
235 };
236
237 let gate_active = if !self.enabled {
239 None
240 } else if circuit_open {
241 Some(GateType::CircuitBreaker)
242 } else if memory_ratio >= self.memory_gate_threshold {
243 Some(GateType::MemoryPressure)
244 } else {
245 None
246 };
247
248 let components: Vec<ComponentSnapshot> = self
249 .components
250 .iter()
251 .map(|entry| {
252 let raw_value = f64::from_bits(entry.value.load(Ordering::Relaxed));
253 let score = if entry.saturation > 0.0 {
254 (raw_value / entry.saturation).min(1.0) * entry.weight * 100.0
255 } else {
256 0.0
257 };
258 ComponentSnapshot {
259 name: entry.name.clone(),
260 raw_value,
261 score,
262 weight: entry.weight,
263 saturation: entry.saturation,
264 }
265 })
266 .collect();
267
268 PressureSnapshot {
269 value: self.calculate(),
270 gate_active,
271 components,
272 memory_ratio,
273 circuit_open,
274 }
275 }
276
277 #[must_use]
279 pub fn is_enabled(&self) -> bool {
280 self.enabled
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287 use crate::scaling::ScalingComponent;
288
289 fn test_components() -> Vec<ScalingComponent> {
290 vec![
291 ScalingComponent::new("kafka_lag", 0.35, 100_000.0),
292 ScalingComponent::new("buffer_depth", 0.25, 10_000.0),
293 ScalingComponent::new("insert_latency", 0.15, 5.0),
294 ScalingComponent::new("memory", 0.15, 1.0),
295 ScalingComponent::new("errors", 0.10, 100.0),
296 ]
297 }
298
299 fn test_pressure() -> ScalingPressure {
300 ScalingPressure::new(ScalingPressureConfig::default(), test_components())
301 }
302
303 #[test]
304 fn test_zero_load() {
305 let p = test_pressure();
306 let value = p.calculate();
307 assert!(
308 value.abs() < f64::EPSILON,
309 "Zero load should produce 0.0, got {value}"
310 );
311 }
312
313 #[test]
314 fn test_single_component_at_saturation() {
315 let p = test_pressure();
316 p.set_component("kafka_lag", 100_000.0);
318 let value = p.calculate();
319 assert!(
320 (value - 35.0).abs() < 0.01,
321 "kafka_lag at saturation should contribute 35.0, got {value}"
322 );
323 }
324
325 #[test]
326 fn test_single_component_half_saturation() {
327 let p = test_pressure();
328 p.set_component("kafka_lag", 50_000.0);
330 let value = p.calculate();
331 assert!(
332 (value - 17.5).abs() < 0.01,
333 "kafka_lag at half saturation should contribute 17.5, got {value}"
334 );
335 }
336
337 #[test]
338 fn test_all_components_saturated() {
339 let p = test_pressure();
340 p.set_component("kafka_lag", 200_000.0); p.set_component("buffer_depth", 20_000.0);
342 p.set_component("insert_latency", 10.0);
343 p.set_component("memory", 2.0);
344 p.set_component("errors", 200.0);
345 let value = p.calculate();
346 assert!(
347 (value - 100.0).abs() < 0.01,
348 "All saturated should produce 100.0, got {value}"
349 );
350 }
351
352 #[test]
353 fn test_capped_at_100() {
354 let p = test_pressure();
355 p.set_component("kafka_lag", 1_000_000.0);
357 p.set_component("buffer_depth", 1_000_000.0);
358 p.set_component("insert_latency", 1_000.0);
359 p.set_component("memory", 100.0);
360 p.set_component("errors", 100_000.0);
361 let value = p.calculate();
362 assert!(
363 (value - 100.0).abs() < f64::EPSILON,
364 "Should be capped at 100.0, got {value}"
365 );
366 }
367
368 #[test]
369 fn test_circuit_breaker_gate() {
370 let p = test_pressure();
371 p.set_component("kafka_lag", 100_000.0);
372 p.set_circuit_open(true);
373 let value = p.calculate();
374 assert!(
375 value.abs() < f64::EPSILON,
376 "Circuit breaker open should produce 0.0, got {value}"
377 );
378 }
379
380 #[test]
381 fn test_memory_gate() {
382 let p = test_pressure();
383 p.set_memory(800, 1000);
385 let value = p.calculate();
386 assert!(
387 (value - 100.0).abs() < f64::EPSILON,
388 "Memory at threshold should produce 100.0, got {value}"
389 );
390 }
391
392 #[test]
393 fn test_memory_gate_above_threshold() {
394 let p = test_pressure();
395 p.set_memory(900, 1000);
396 let value = p.calculate();
397 assert!(
398 (value - 100.0).abs() < f64::EPSILON,
399 "Memory above threshold should produce 100.0, got {value}"
400 );
401 }
402
403 #[test]
404 fn test_memory_below_threshold_uses_composite() {
405 let p = test_pressure();
406 p.set_memory(700, 1000);
408 p.set_component("kafka_lag", 50_000.0);
409 let value = p.calculate();
410 assert!(
412 value > 0.0 && value < 100.0,
413 "Memory below threshold should use composite, got {value}"
414 );
415 }
416
417 #[test]
418 fn test_memory_gate_takes_precedence_over_circuit_breaker() {
419 let p = test_pressure();
422 p.set_memory(900, 1000);
423 p.set_circuit_open(true);
424 let value = p.calculate();
425 assert!(
426 value.abs() < f64::EPSILON,
427 "Circuit breaker should take precedence, got {value}"
428 );
429 }
430
431 #[test]
432 fn test_disabled() {
433 let config = ScalingPressureConfig {
434 enabled: false,
435 ..Default::default()
436 };
437 let p = ScalingPressure::new(config, test_components());
438 p.set_component("kafka_lag", 100_000.0);
439 p.set_memory(900, 1000);
440 let value = p.calculate();
441 assert!(
442 value.abs() < f64::EPSILON,
443 "Disabled should produce 0.0, got {value}"
444 );
445 }
446
447 #[test]
448 fn test_unknown_component_is_noop() {
449 let p = test_pressure();
450 p.set_component("nonexistent", 999.0);
452 let value = p.calculate();
453 assert!(
454 value.abs() < f64::EPSILON,
455 "Unknown component should not affect result, got {value}"
456 );
457 }
458
459 #[test]
460 fn test_zero_memory_limit() {
461 let p = test_pressure();
462 p.set_memory(100, 0);
464 p.set_component("kafka_lag", 50_000.0);
465 let value = p.calculate();
466 assert!(
467 value > 0.0,
468 "Zero memory limit should not trigger gate, got {value}"
469 );
470 }
471
472 #[test]
473 fn test_zero_saturation_component() {
474 let p = ScalingPressure::new(
475 ScalingPressureConfig::default(),
476 vec![ScalingComponent::new("broken", 0.50, 0.0)],
477 );
478 p.set_component("broken", 100.0);
479 let value = p.calculate();
480 assert!(
481 value.abs() < f64::EPSILON,
482 "Zero saturation component should contribute 0.0, got {value}"
483 );
484 }
485
486 #[test]
487 fn test_snapshot() {
488 let p = test_pressure();
489 p.set_component("kafka_lag", 50_000.0);
490 p.set_component("buffer_depth", 5_000.0);
491 p.set_memory(500, 1000);
492
493 let snap = p.snapshot();
494 assert!(!snap.circuit_open);
495 assert!(snap.gate_active.is_none());
496 assert!((snap.memory_ratio - 0.5).abs() < f64::EPSILON);
497 assert_eq!(snap.components.len(), 5);
498 assert!(snap.value > 0.0);
499
500 let lag = snap
502 .components
503 .iter()
504 .find(|c| c.name == "kafka_lag")
505 .unwrap();
506 assert!((lag.raw_value - 50_000.0).abs() < f64::EPSILON);
507 assert!((lag.score - 17.5).abs() < 0.01);
508 }
509
510 #[test]
511 fn test_snapshot_with_gate() {
512 let p = test_pressure();
513 p.set_circuit_open(true);
514
515 let snap = p.snapshot();
516 assert!(snap.circuit_open);
517 assert_eq!(snap.gate_active, Some(GateType::CircuitBreaker));
518 assert!(snap.value.abs() < f64::EPSILON);
519 }
520
521 #[test]
522 fn test_is_enabled() {
523 let p = test_pressure();
524 assert!(p.is_enabled());
525
526 let disabled = ScalingPressure::new(
527 ScalingPressureConfig {
528 enabled: false,
529 ..Default::default()
530 },
531 vec![],
532 );
533 assert!(!disabled.is_enabled());
534 }
535
536 #[test]
537 fn test_mixed_load() {
538 let p = test_pressure();
539 p.set_component("kafka_lag", 20_000.0); p.set_component("buffer_depth", 3_000.0); p.set_component("insert_latency", 1.0); p.set_component("memory", 0.4); p.set_component("errors", 5.0); let value = p.calculate();
547 assert!(
550 (value - 24.0).abs() < 0.01,
551 "Mixed load should produce ~24.0, got {value}"
552 );
553 }
554}