1use core::cmp::Ordering;
8
9const KIB: u32 = 1024;
10const ACTION_COUNT: usize = 4;
11
12#[derive(Debug, Clone, Copy, PartialEq)]
14pub struct LossWeights {
15 pub oom: f64,
17 pub latency: f64,
19 pub throughput: f64,
21}
22
23impl Default for LossWeights {
24 fn default() -> Self {
25 Self {
26 oom: 1_000_000.0,
27 latency: 10_000.0,
28 throughput: 100.0,
29 }
30 }
31}
32
33#[derive(Debug, Clone, Copy, PartialEq)]
35pub struct FlowControlConfig {
36 pub input_soft_cap_bytes: u32,
38 pub input_hard_cap_bytes: u32,
40 pub output_soft_cap_bytes: u32,
42 pub output_hard_cap_bytes: u32,
44 pub fairness_floor: f64,
46 pub key_latency_budget_ms: f64,
48 pub output_batch_with_input_bytes: u32,
50 pub output_batch_idle_bytes: u32,
52 pub output_batch_recovery_bytes: u32,
54 pub replenish_interval_ms: u64,
56 pub hard_cap_terminate_ms: u64,
58 pub terminate_throughput_loss: f64,
60 pub weights: LossWeights,
62}
63
64impl Default for FlowControlConfig {
65 fn default() -> Self {
66 Self {
67 input_soft_cap_bytes: 12 * KIB,
68 input_hard_cap_bytes: 16 * KIB,
69 output_soft_cap_bytes: 192 * KIB,
70 output_hard_cap_bytes: 256 * KIB,
71 fairness_floor: 0.80,
72 key_latency_budget_ms: 50.0,
73 output_batch_with_input_bytes: 32 * KIB,
74 output_batch_idle_bytes: 64 * KIB,
75 output_batch_recovery_bytes: 8 * KIB,
76 replenish_interval_ms: 10,
77 hard_cap_terminate_ms: 5_000,
78 terminate_throughput_loss: 6_000.0,
79 weights: LossWeights::default(),
80 }
81 }
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub struct QueueDepthBytes {
87 pub input: u32,
89 pub output: u32,
91 pub render_frames: u8,
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub struct RateWindowBps {
98 pub lambda_in: u32,
100 pub lambda_out: u32,
102 pub mu_in: u32,
104 pub mu_out: u32,
106}
107
108#[derive(Debug, Clone, Copy, PartialEq)]
110pub struct LatencyWindowMs {
111 pub key_p50_ms: f64,
113 pub key_p95_ms: f64,
115}
116
117#[derive(Debug, Clone, Copy, PartialEq)]
119pub struct FlowControlSnapshot {
120 pub queues: QueueDepthBytes,
122 pub rates: RateWindowBps,
124 pub latency: LatencyWindowMs,
126 pub serviced_input_bytes: u64,
128 pub serviced_output_bytes: u64,
130 pub output_hard_cap_duration_ms: u64,
132}
133
134impl FlowControlSnapshot {
135 #[must_use]
137 pub fn fairness_index(self) -> f64 {
138 jain_fairness_index(self.serviced_input_bytes, self.serviced_output_bytes)
139 }
140}
141
142#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
144pub enum BackpressureAction {
145 CoalesceNonInteractive,
147 ThrottleOutput,
149 DropNonInteractive,
151 TerminateSession,
153}
154
155impl BackpressureAction {
156 #[must_use]
157 const fn tie_break_rank(self) -> u8 {
158 match self {
159 Self::CoalesceNonInteractive => 0,
160 Self::ThrottleOutput => 1,
161 Self::DropNonInteractive => 2,
162 Self::TerminateSession => 3,
163 }
164 }
165}
166
167#[derive(Debug, Clone, Copy, PartialEq)]
169pub struct ActionLoss {
170 pub action: BackpressureAction,
172 pub expected_loss: f64,
174 pub oom_risk: f64,
176 pub latency_risk: f64,
178 pub throughput_loss: f64,
180}
181
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
184pub enum DecisionReason {
185 Stable,
187 QueuePressure,
189 ProtectKeyLatencyBudget,
191 HardCapExceeded,
193}
194
195#[derive(Debug, Clone, Copy, PartialEq)]
197pub struct FlowControlDecision {
198 pub chosen_action: Option<BackpressureAction>,
200 pub reason: DecisionReason,
202 pub fairness_index: f64,
204 pub output_batch_budget_bytes: u32,
206 pub should_pause_pty_reads: bool,
208 pub losses: [ActionLoss; ACTION_COUNT],
210}
211
212#[derive(Debug, Clone, Copy, PartialEq, Eq)]
214pub enum InputEventClass {
215 Interactive,
217 NonInteractive,
219}
220
221#[derive(Debug, Clone, Copy, PartialEq)]
223pub struct FlowControlPolicy {
224 pub config: FlowControlConfig,
226}
227
228impl Default for FlowControlPolicy {
229 fn default() -> Self {
230 Self::new(FlowControlConfig::default())
231 }
232}
233
234impl FlowControlPolicy {
235 #[must_use]
237 pub const fn new(config: FlowControlConfig) -> Self {
238 Self { config }
239 }
240
241 #[must_use]
243 pub fn evaluate(self, snapshot: FlowControlSnapshot) -> FlowControlDecision {
244 let fairness_index = snapshot.fairness_index();
245 let losses = self.score_actions(snapshot, fairness_index);
246 let chosen_action = self.choose_action(snapshot, fairness_index, &losses);
247 let reason = self.reason(snapshot, fairness_index, chosen_action);
248 let output_batch_budget_bytes = self.output_batch_budget(
249 snapshot.queues.input,
250 fairness_index,
251 snapshot.latency.key_p95_ms,
252 );
253 let should_pause_pty_reads = snapshot.queues.output >= self.config.output_hard_cap_bytes;
254 FlowControlDecision {
255 chosen_action,
256 reason,
257 fairness_index,
258 output_batch_budget_bytes,
259 should_pause_pty_reads,
260 losses,
261 }
262 }
263
264 #[must_use]
266 pub fn should_replenish(self, consumed_bytes: u32, window_bytes: u32, elapsed_ms: u64) -> bool {
267 if window_bytes == 0 {
268 return true;
269 }
270 consumed_bytes.saturating_mul(2) >= window_bytes
271 || elapsed_ms >= self.config.replenish_interval_ms
272 }
273
274 #[must_use]
276 pub fn should_drop_input_event(self, queue_bytes: u32, class: InputEventClass) -> bool {
277 match class {
278 InputEventClass::Interactive => false,
279 InputEventClass::NonInteractive => queue_bytes >= self.config.input_hard_cap_bytes,
280 }
281 }
282
283 #[must_use]
284 fn output_batch_budget(
285 self,
286 input_queue_bytes: u32,
287 fairness_index: f64,
288 key_p95_ms: f64,
289 ) -> u32 {
290 let baseline = if input_queue_bytes > 0 {
291 self.config.output_batch_with_input_bytes
292 } else {
293 self.config.output_batch_idle_bytes
294 };
295 if fairness_index < self.config.fairness_floor
296 || key_p95_ms > self.config.key_latency_budget_ms
297 {
298 baseline.min(self.config.output_batch_recovery_bytes)
299 } else {
300 baseline
301 }
302 }
303
304 #[must_use]
305 fn reason(
306 self,
307 snapshot: FlowControlSnapshot,
308 fairness_index: f64,
309 chosen_action: Option<BackpressureAction>,
310 ) -> DecisionReason {
311 if snapshot.output_hard_cap_duration_ms >= self.config.hard_cap_terminate_ms {
312 return DecisionReason::HardCapExceeded;
313 }
314 if chosen_action.is_none() {
315 return DecisionReason::Stable;
316 }
317 if fairness_index < self.config.fairness_floor
318 || snapshot.latency.key_p95_ms > self.config.key_latency_budget_ms
319 {
320 DecisionReason::ProtectKeyLatencyBudget
321 } else {
322 DecisionReason::QueuePressure
323 }
324 }
325
326 #[must_use]
327 fn choose_action(
328 self,
329 snapshot: FlowControlSnapshot,
330 fairness_index: f64,
331 losses: &[ActionLoss; ACTION_COUNT],
332 ) -> Option<BackpressureAction> {
333 if snapshot.output_hard_cap_duration_ms >= self.config.hard_cap_terminate_ms {
334 return Some(BackpressureAction::TerminateSession);
335 }
336 if !self.is_pressured(snapshot, fairness_index) {
337 return None;
338 }
339 Some(select_best_action(losses))
340 }
341
342 #[must_use]
343 fn is_pressured(self, snapshot: FlowControlSnapshot, fairness_index: f64) -> bool {
344 let input_soft = snapshot.queues.input >= self.config.input_soft_cap_bytes;
345 let output_soft = snapshot.queues.output >= self.config.output_soft_cap_bytes;
346 let rho_in = ratio_u32(snapshot.rates.lambda_in, snapshot.rates.mu_in);
347 let rho_out = ratio_u32(snapshot.rates.lambda_out, snapshot.rates.mu_out);
348 input_soft
349 || output_soft
350 || rho_in > 1.0
351 || rho_out > 1.0
352 || fairness_index < self.config.fairness_floor
353 || snapshot.latency.key_p95_ms > self.config.key_latency_budget_ms
354 }
355
356 #[must_use]
357 fn score_actions(
358 self,
359 snapshot: FlowControlSnapshot,
360 fairness_index: f64,
361 ) -> [ActionLoss; ACTION_COUNT] {
362 let signals = self.pressure_signals(snapshot, fairness_index);
363 let actions = [
364 BackpressureAction::CoalesceNonInteractive,
365 BackpressureAction::ThrottleOutput,
366 BackpressureAction::DropNonInteractive,
367 BackpressureAction::TerminateSession,
368 ];
369 actions.map(|action| self.score_action(action, signals))
370 }
371
372 #[must_use]
373 fn score_action(self, action: BackpressureAction, signals: PressureSignals) -> ActionLoss {
374 let (oom_risk, latency_risk, throughput_loss) = match action {
375 BackpressureAction::CoalesceNonInteractive => (
376 0.35 * signals.oom_signal.powi(3),
377 0.50 * signals.latency_signal.powi(2),
378 0.08 + 0.18 * signals.throughput_signal,
379 ),
380 BackpressureAction::ThrottleOutput => (
381 0.22 * signals.oom_signal.powi(3),
382 0.28 * signals.latency_signal.powi(2),
383 0.24 + 0.32 * signals.throughput_signal,
384 ),
385 BackpressureAction::DropNonInteractive => (
386 0.15 * signals.oom_signal.powi(3),
387 0.20 * signals.latency_signal.powi(2),
388 0.42 + 0.45 * signals.throughput_signal,
389 ),
390 BackpressureAction::TerminateSession => {
391 (0.0, 0.0, self.config.terminate_throughput_loss)
392 }
393 };
394 let expected_loss = (self.config.weights.oom * oom_risk)
395 + (self.config.weights.latency * latency_risk)
396 + (self.config.weights.throughput * throughput_loss);
397 ActionLoss {
398 action,
399 expected_loss,
400 oom_risk,
401 latency_risk,
402 throughput_loss,
403 }
404 }
405
406 #[must_use]
407 fn pressure_signals(
408 self,
409 snapshot: FlowControlSnapshot,
410 fairness_index: f64,
411 ) -> PressureSignals {
412 let out_hard_ratio = ratio_u32(snapshot.queues.output, self.config.output_hard_cap_bytes);
413 let in_hard_ratio = ratio_u32(snapshot.queues.input, self.config.input_hard_cap_bytes);
414 let out_soft_ratio = ratio_u32(snapshot.queues.output, self.config.output_soft_cap_bytes);
415 let rho_in = ratio_u32(snapshot.rates.lambda_in, snapshot.rates.mu_in);
416 let rho_out = ratio_u32(snapshot.rates.lambda_out, snapshot.rates.mu_out);
417
418 let queue_pressure = out_hard_ratio.max(in_hard_ratio);
419 let util_pressure = ((rho_in.max(rho_out) - 1.0).max(0.0) / 0.5).min(1.0);
420 let oom_signal = clamp01(((queue_pressure - 0.70).max(0.0) / 0.30).max(util_pressure));
421
422 let latency_over_budget =
423 (snapshot.latency.key_p95_ms / self.config.key_latency_budget_ms - 1.0).max(0.0);
424 let fairness_shortfall = if fairness_index < self.config.fairness_floor {
425 (self.config.fairness_floor - fairness_index) / self.config.fairness_floor
426 } else {
427 0.0
428 };
429 let latency_signal = clamp01(
430 latency_over_budget
431 + fairness_shortfall
432 + ((rho_in - 1.0).max(0.0))
433 + (ratio_u32(snapshot.queues.input, self.config.input_soft_cap_bytes) - 1.0)
434 .max(0.0),
435 );
436
437 let throughput_signal = clamp01((rho_out - 1.0).max(0.0) + (out_soft_ratio - 1.0).max(0.0));
438 PressureSignals {
439 oom_signal,
440 latency_signal,
441 throughput_signal,
442 }
443 }
444}
445
446#[derive(Debug, Clone, Copy, PartialEq)]
447struct PressureSignals {
448 oom_signal: f64,
449 latency_signal: f64,
450 throughput_signal: f64,
451}
452
453#[must_use]
454fn select_best_action(losses: &[ActionLoss; ACTION_COUNT]) -> BackpressureAction {
455 let mut best = losses[0];
456 for candidate in losses.iter().copied().skip(1) {
457 let ordering = candidate.expected_loss.total_cmp(&best.expected_loss);
458 if ordering == Ordering::Less
459 || (ordering == Ordering::Equal
460 && candidate.action.tie_break_rank() < best.action.tie_break_rank())
461 {
462 best = candidate;
463 }
464 }
465 best.action
466}
467
468#[must_use]
469fn ratio_u32(numerator: u32, denominator: u32) -> f64 {
470 if denominator == 0 {
471 return f64::INFINITY;
472 }
473 f64::from(numerator) / f64::from(denominator)
474}
475
476#[must_use]
477fn clamp01(value: f64) -> f64 {
478 value.clamp(0.0, 1.0)
479}
480
481#[must_use]
483pub fn jain_fairness_index(serviced_input_bytes: u64, serviced_output_bytes: u64) -> f64 {
484 let input = serviced_input_bytes as f64;
485 let output = serviced_output_bytes as f64;
486 let denominator = 2.0 * (input * input + output * output);
487 if denominator <= f64::EPSILON {
488 return 1.0;
489 }
490 ((input + output) * (input + output)) / denominator
491}
492
493#[cfg(test)]
494mod tests {
495 use super::*;
496
497 #[test]
498 fn jain_fairness_index_matches_expected_limits() {
499 assert_close(jain_fairness_index(0, 0), 1.0, 1e-9);
500 assert_close(jain_fairness_index(100, 100), 1.0, 1e-9);
501 assert_close(jain_fairness_index(100, 0), 0.5, 1e-9);
502 }
503
504 #[test]
505 fn output_batch_budget_respects_fairness_and_latency() {
506 let policy = FlowControlPolicy::default();
507 let baseline = policy.output_batch_budget(0, 0.95, 10.0);
508 assert_eq!(baseline, 64 * KIB);
509
510 let with_input = policy.output_batch_budget(1, 0.95, 10.0);
511 assert_eq!(with_input, 32 * KIB);
512
513 let fairness_recovery = policy.output_batch_budget(1, 0.60, 10.0);
514 assert_eq!(fairness_recovery, 8 * KIB);
515
516 let latency_recovery = policy.output_batch_budget(0, 0.95, 120.0);
517 assert_eq!(latency_recovery, 8 * KIB);
518 }
519
520 #[test]
521 fn stable_snapshot_emits_no_action() {
522 let policy = FlowControlPolicy::default();
523 let snapshot = FlowControlSnapshot {
524 queues: QueueDepthBytes {
525 input: 1024,
526 output: 4096,
527 render_frames: 0,
528 },
529 rates: RateWindowBps {
530 lambda_in: 1_000,
531 lambda_out: 20_000,
532 mu_in: 10_000,
533 mu_out: 100_000,
534 },
535 latency: LatencyWindowMs {
536 key_p50_ms: 2.0,
537 key_p95_ms: 8.0,
538 },
539 serviced_input_bytes: 40_000,
540 serviced_output_bytes: 42_000,
541 output_hard_cap_duration_ms: 0,
542 };
543 let decision = policy.evaluate(snapshot);
544 assert_eq!(decision.chosen_action, None);
545 assert_eq!(decision.reason, DecisionReason::Stable);
546 assert_eq!(decision.output_batch_budget_bytes, 32 * KIB);
547 }
548
549 #[test]
550 fn tie_break_order_is_deterministic() {
551 let losses = [
552 ActionLoss {
553 action: BackpressureAction::CoalesceNonInteractive,
554 expected_loss: 10.0,
555 oom_risk: 0.0,
556 latency_risk: 0.0,
557 throughput_loss: 0.0,
558 },
559 ActionLoss {
560 action: BackpressureAction::ThrottleOutput,
561 expected_loss: 10.0,
562 oom_risk: 0.0,
563 latency_risk: 0.0,
564 throughput_loss: 0.0,
565 },
566 ActionLoss {
567 action: BackpressureAction::DropNonInteractive,
568 expected_loss: 10.0,
569 oom_risk: 0.0,
570 latency_risk: 0.0,
571 throughput_loss: 0.0,
572 },
573 ActionLoss {
574 action: BackpressureAction::TerminateSession,
575 expected_loss: 10.0,
576 oom_risk: 0.0,
577 latency_risk: 0.0,
578 throughput_loss: 0.0,
579 },
580 ];
581 assert_eq!(
582 select_best_action(&losses),
583 BackpressureAction::CoalesceNonInteractive
584 );
585 }
586
587 #[test]
588 fn hard_cap_duration_forces_terminate() {
589 let policy = FlowControlPolicy::default();
590 let snapshot = FlowControlSnapshot {
591 queues: QueueDepthBytes {
592 input: 0,
593 output: policy.config.output_hard_cap_bytes,
594 render_frames: 1,
595 },
596 rates: RateWindowBps {
597 lambda_in: 0,
598 lambda_out: 1_000_000,
599 mu_in: 1,
600 mu_out: 200_000,
601 },
602 latency: LatencyWindowMs {
603 key_p50_ms: 10.0,
604 key_p95_ms: 60.0,
605 },
606 serviced_input_bytes: 128,
607 serviced_output_bytes: 64_000,
608 output_hard_cap_duration_ms: policy.config.hard_cap_terminate_ms,
609 };
610 let decision = policy.evaluate(snapshot);
611 assert_eq!(
612 decision.chosen_action,
613 Some(BackpressureAction::TerminateSession)
614 );
615 assert_eq!(decision.reason, DecisionReason::HardCapExceeded);
616 assert!(decision.should_pause_pty_reads);
617 }
618
619 #[test]
620 fn deterministic_stress_simulation_keeps_queues_bounded() {
621 let policy = FlowControlPolicy::default();
622 let dt_ms = 10_u64;
623 let steps = 6_000_u32; let rates = RateWindowBps {
626 lambda_in: 4_000,
627 lambda_out: 1_000_000,
628 mu_in: 80_000,
629 mu_out: 300_000,
630 };
631
632 let mut q_in = 0_u32;
633 let mut q_out = 0_u32;
634 let mut hard_cap_duration_ms = 0_u64;
635 let mut max_q_in = 0_u32;
636 let mut max_q_out = 0_u32;
637 let mut saw_intervention = false;
638 let mut terminated = false;
639 let mut key_latencies = Vec::with_capacity(steps as usize);
640
641 for _ in 0..steps {
642 let key_latency_ms = latency_from_queue(q_in, rates.mu_in);
643 let snapshot = FlowControlSnapshot {
644 queues: QueueDepthBytes {
645 input: q_in,
646 output: q_out,
647 render_frames: 1,
648 },
649 rates,
650 latency: LatencyWindowMs {
651 key_p50_ms: (key_latency_ms / 2.0).max(1.0),
652 key_p95_ms: key_latency_ms,
653 },
654 serviced_input_bytes: u64::from(bytes_for_interval(rates.mu_in, dt_ms)),
655 serviced_output_bytes: u64::from(bytes_for_interval(rates.mu_out, dt_ms)),
656 output_hard_cap_duration_ms: hard_cap_duration_ms,
657 };
658
659 let decision = policy.evaluate(snapshot);
660 if decision.chosen_action.is_some() {
661 saw_intervention = true;
662 }
663
664 let mut input_arrival = bytes_for_interval(rates.lambda_in, dt_ms);
665 let mut output_arrival = bytes_for_interval(rates.lambda_out, dt_ms);
666
667 match decision.chosen_action {
668 Some(BackpressureAction::CoalesceNonInteractive) => {
669 input_arrival = input_arrival.saturating_mul(7) / 10;
670 output_arrival = output_arrival.saturating_mul(8) / 10;
671 }
672 Some(BackpressureAction::ThrottleOutput) => {
673 output_arrival = output_arrival.saturating_mul(18) / 100;
674 }
675 Some(BackpressureAction::DropNonInteractive) => {
676 input_arrival /= 2;
677 }
678 Some(BackpressureAction::TerminateSession) => {
679 terminated = true;
680 break;
681 }
682 None => {}
683 }
684
685 if decision.should_pause_pty_reads {
686 output_arrival = 0;
687 }
688
689 q_in = q_in.saturating_add(input_arrival);
690 q_out = q_out.saturating_add(output_arrival);
691
692 if q_in > policy.config.input_hard_cap_bytes {
693 q_in = policy.config.input_hard_cap_bytes;
694 }
695 if q_out > policy.config.output_hard_cap_bytes {
696 q_out = policy.config.output_hard_cap_bytes;
697 }
698
699 let input_service = bytes_for_interval(rates.mu_in, dt_ms).min(q_in);
700 q_in -= input_service;
701
702 let output_budget = bytes_for_interval(rates.mu_out, dt_ms)
703 .min(decision.output_batch_budget_bytes)
704 .min(q_out);
705 q_out -= output_budget;
706
707 max_q_in = max_q_in.max(q_in);
708 max_q_out = max_q_out.max(q_out);
709 hard_cap_duration_ms = if q_out >= policy.config.output_hard_cap_bytes {
710 hard_cap_duration_ms.saturating_add(dt_ms)
711 } else {
712 0
713 };
714 key_latencies.push(latency_from_queue(q_in, rates.mu_in));
715 }
716
717 assert!(
718 saw_intervention,
719 "policy should intervene under output flood"
720 );
721 assert!(
722 !terminated,
723 "policy should recover before termination in this scenario"
724 );
725 assert!(max_q_in <= policy.config.input_hard_cap_bytes);
726 assert!(max_q_out <= policy.config.output_hard_cap_bytes);
727 let key_p95 = percentile(&key_latencies, 95);
728 assert!(
729 key_p95 <= 100.0,
730 "expected p95 <= 100ms, got {key_p95:.2}ms"
731 );
732 }
733
734 #[test]
735 fn interactive_events_are_never_dropped() {
736 let policy = FlowControlPolicy::default();
737 assert!(!policy.should_drop_input_event(
738 policy.config.input_hard_cap_bytes,
739 InputEventClass::Interactive
740 ));
741 assert!(policy.should_drop_input_event(
742 policy.config.input_hard_cap_bytes,
743 InputEventClass::NonInteractive
744 ));
745 }
746
747 fn bytes_for_interval(rate_bps: u32, dt_ms: u64) -> u32 {
748 let bytes = u128::from(rate_bps) * u128::from(dt_ms) / 1_000_u128;
749 u32::try_from(bytes).unwrap_or(u32::MAX)
750 }
751
752 fn latency_from_queue(queue_bytes: u32, service_bps: u32) -> f64 {
753 if service_bps == 0 {
754 return f64::INFINITY;
755 }
756 1_000.0 * (f64::from(queue_bytes) / f64::from(service_bps))
757 }
758
759 fn percentile(values: &[f64], pct: u8) -> f64 {
760 if values.is_empty() {
761 return 0.0;
762 }
763 let mut sorted = values.to_vec();
764 sorted.sort_by(f64::total_cmp);
765 let last = sorted.len() - 1;
766 let index = (last * usize::from(pct)) / 100;
767 sorted[index]
768 }
769
770 fn assert_close(actual: f64, expected: f64, epsilon: f64) {
771 let delta = (actual - expected).abs();
772 assert!(
773 delta <= epsilon,
774 "expected {expected}, got {actual}, delta={delta}"
775 );
776 }
777
778 fn stable_snapshot() -> FlowControlSnapshot {
780 FlowControlSnapshot {
781 queues: QueueDepthBytes {
782 input: 1024,
783 output: 4096,
784 render_frames: 0,
785 },
786 rates: RateWindowBps {
787 lambda_in: 1_000,
788 lambda_out: 20_000,
789 mu_in: 10_000,
790 mu_out: 100_000,
791 },
792 latency: LatencyWindowMs {
793 key_p50_ms: 2.0,
794 key_p95_ms: 8.0,
795 },
796 serviced_input_bytes: 40_000,
797 serviced_output_bytes: 42_000,
798 output_hard_cap_duration_ms: 0,
799 }
800 }
801
802 #[test]
805 fn ratio_u32_normal_division() {
806 assert_close(ratio_u32(100, 200), 0.5, 1e-9);
807 assert_close(ratio_u32(200, 100), 2.0, 1e-9);
808 assert_close(ratio_u32(0, 100), 0.0, 1e-9);
809 }
810
811 #[test]
812 fn ratio_u32_zero_denominator_is_infinity() {
813 assert!(ratio_u32(100, 0).is_infinite());
814 assert!(ratio_u32(0, 0).is_infinite());
815 }
816
817 #[test]
820 fn clamp01_bounds() {
821 assert_close(clamp01(-1.0), 0.0, 1e-9);
822 assert_close(clamp01(0.5), 0.5, 1e-9);
823 assert_close(clamp01(1.5), 1.0, 1e-9);
824 assert_close(clamp01(0.0), 0.0, 1e-9);
825 assert_close(clamp01(1.0), 1.0, 1e-9);
826 }
827
828 #[test]
831 fn jain_fairness_index_asymmetric() {
832 let f = jain_fairness_index(1_000_000, 1);
834 assert!(f < 0.6, "highly asymmetric should be near 0.5: got {f}");
835 assert!(f >= 0.5, "Jain index for 2 flows is always >= 0.5: got {f}");
836 }
837
838 #[test]
839 fn jain_fairness_index_symmetry() {
840 let a = jain_fairness_index(100, 200);
842 let b = jain_fairness_index(200, 100);
843 assert_close(a, b, 1e-9);
844 }
845
846 #[test]
847 fn fairness_index_snapshot_convenience() {
848 let snap = stable_snapshot();
849 let direct = jain_fairness_index(snap.serviced_input_bytes, snap.serviced_output_bytes);
850 assert_close(snap.fairness_index(), direct, 1e-12);
851 }
852
853 #[test]
856 fn should_replenish_always_when_window_zero() {
857 let policy = FlowControlPolicy::default();
858 assert!(policy.should_replenish(0, 0, 0));
859 }
860
861 #[test]
862 fn should_replenish_at_50_percent_consumption() {
863 let policy = FlowControlPolicy::default();
864 assert!(policy.should_replenish(500, 1000, 0));
866 assert!(!policy.should_replenish(400, 1000, 0));
868 }
869
870 #[test]
871 fn should_replenish_on_interval_timeout() {
872 let policy = FlowControlPolicy::default();
873 assert!(policy.should_replenish(0, 10_000, policy.config.replenish_interval_ms));
875 assert!(!policy.should_replenish(0, 10_000, policy.config.replenish_interval_ms - 1));
876 }
877
878 #[test]
881 fn non_interactive_dropped_only_at_hard_cap() {
882 let policy = FlowControlPolicy::default();
883 let below = policy.config.input_hard_cap_bytes - 1;
884 assert!(!policy.should_drop_input_event(below, InputEventClass::NonInteractive));
885 assert!(policy.should_drop_input_event(
886 policy.config.input_hard_cap_bytes,
887 InputEventClass::NonInteractive
888 ));
889 }
890
891 #[test]
892 fn interactive_never_dropped_even_at_max() {
893 let policy = FlowControlPolicy::default();
894 assert!(!policy.should_drop_input_event(u32::MAX, InputEventClass::Interactive));
895 }
896
897 #[test]
900 fn tie_break_ranks_are_ordered() {
901 assert!(
902 BackpressureAction::CoalesceNonInteractive.tie_break_rank()
903 < BackpressureAction::ThrottleOutput.tie_break_rank()
904 );
905 assert!(
906 BackpressureAction::ThrottleOutput.tie_break_rank()
907 < BackpressureAction::DropNonInteractive.tie_break_rank()
908 );
909 assert!(
910 BackpressureAction::DropNonInteractive.tie_break_rank()
911 < BackpressureAction::TerminateSession.tie_break_rank()
912 );
913 }
914
915 #[test]
918 fn select_best_action_picks_lowest_loss() {
919 let losses = [
920 ActionLoss {
921 action: BackpressureAction::CoalesceNonInteractive,
922 expected_loss: 50.0,
923 oom_risk: 0.0,
924 latency_risk: 0.0,
925 throughput_loss: 0.0,
926 },
927 ActionLoss {
928 action: BackpressureAction::ThrottleOutput,
929 expected_loss: 10.0,
930 oom_risk: 0.0,
931 latency_risk: 0.0,
932 throughput_loss: 0.0,
933 },
934 ActionLoss {
935 action: BackpressureAction::DropNonInteractive,
936 expected_loss: 30.0,
937 oom_risk: 0.0,
938 latency_risk: 0.0,
939 throughput_loss: 0.0,
940 },
941 ActionLoss {
942 action: BackpressureAction::TerminateSession,
943 expected_loss: 100.0,
944 oom_risk: 0.0,
945 latency_risk: 0.0,
946 throughput_loss: 0.0,
947 },
948 ];
949 assert_eq!(
950 select_best_action(&losses),
951 BackpressureAction::ThrottleOutput
952 );
953 }
954
955 #[test]
958 fn output_batch_budget_idle_no_pressure() {
959 let policy = FlowControlPolicy::default();
960 let budget = policy.output_batch_budget(0, 1.0, 10.0);
961 assert_eq!(budget, policy.config.output_batch_idle_bytes);
962 }
963
964 #[test]
965 fn output_batch_budget_with_input_no_pressure() {
966 let policy = FlowControlPolicy::default();
967 let budget = policy.output_batch_budget(100, 1.0, 10.0);
968 assert_eq!(budget, policy.config.output_batch_with_input_bytes);
969 }
970
971 #[test]
972 fn output_batch_budget_fairness_recovery_clamps() {
973 let policy = FlowControlPolicy::default();
974 let budget = policy.output_batch_budget(100, 0.5, 10.0);
976 assert_eq!(budget, policy.config.output_batch_recovery_bytes);
977 }
978
979 #[test]
980 fn output_batch_budget_latency_recovery_clamps() {
981 let policy = FlowControlPolicy::default();
982 let budget = policy.output_batch_budget(0, 1.0, 200.0);
984 assert_eq!(budget, policy.config.output_batch_recovery_bytes);
985 }
986
987 #[test]
988 fn output_batch_budget_both_triggers_still_recovery() {
989 let policy = FlowControlPolicy::default();
990 let budget = policy.output_batch_budget(50, 0.3, 200.0);
992 assert_eq!(budget, policy.config.output_batch_recovery_bytes);
993 }
994
995 #[test]
998 fn pressured_when_input_at_soft_cap() {
999 let policy = FlowControlPolicy::default();
1000 let mut snap = stable_snapshot();
1001 snap.queues.input = policy.config.input_soft_cap_bytes;
1002 let decision = policy.evaluate(snap);
1003 assert!(
1004 decision.chosen_action.is_some(),
1005 "should intervene when input queue at soft cap"
1006 );
1007 }
1008
1009 #[test]
1010 fn pressured_when_output_at_soft_cap() {
1011 let policy = FlowControlPolicy::default();
1012 let mut snap = stable_snapshot();
1013 snap.queues.output = policy.config.output_soft_cap_bytes;
1014 let decision = policy.evaluate(snap);
1015 assert!(
1016 decision.chosen_action.is_some(),
1017 "should intervene when output queue at soft cap"
1018 );
1019 }
1020
1021 #[test]
1022 fn pressured_when_input_rate_exceeds_service() {
1023 let policy = FlowControlPolicy::default();
1024 let mut snap = stable_snapshot();
1025 snap.rates.lambda_in = 100_000;
1026 snap.rates.mu_in = 50_000; let decision = policy.evaluate(snap);
1028 assert!(
1029 decision.chosen_action.is_some(),
1030 "should intervene when input arrival > service"
1031 );
1032 }
1033
1034 #[test]
1035 fn pressured_when_output_rate_exceeds_service() {
1036 let policy = FlowControlPolicy::default();
1037 let mut snap = stable_snapshot();
1038 snap.rates.lambda_out = 200_000;
1039 snap.rates.mu_out = 100_000; let decision = policy.evaluate(snap);
1041 assert!(
1042 decision.chosen_action.is_some(),
1043 "should intervene when output arrival > service"
1044 );
1045 }
1046
1047 #[test]
1048 fn pressured_when_latency_budget_breached() {
1049 let policy = FlowControlPolicy::default();
1050 let mut snap = stable_snapshot();
1051 snap.latency.key_p95_ms = policy.config.key_latency_budget_ms + 10.0;
1052 let decision = policy.evaluate(snap);
1053 assert!(
1054 decision.chosen_action.is_some(),
1055 "should intervene when latency exceeds budget"
1056 );
1057 assert_eq!(decision.reason, DecisionReason::ProtectKeyLatencyBudget);
1058 }
1059
1060 #[test]
1061 fn pressured_when_fairness_below_floor() {
1062 let policy = FlowControlPolicy::default();
1063 let mut snap = stable_snapshot();
1064 snap.serviced_input_bytes = 1;
1066 snap.serviced_output_bytes = 1_000_000;
1067 assert!(snap.fairness_index() < policy.config.fairness_floor);
1068 let decision = policy.evaluate(snap);
1069 assert!(
1070 decision.chosen_action.is_some(),
1071 "should intervene when fairness below floor"
1072 );
1073 assert_eq!(decision.reason, DecisionReason::ProtectKeyLatencyBudget);
1074 }
1075
1076 #[test]
1079 fn reason_stable_when_no_pressure() {
1080 let policy = FlowControlPolicy::default();
1081 let decision = policy.evaluate(stable_snapshot());
1082 assert_eq!(decision.reason, DecisionReason::Stable);
1083 }
1084
1085 #[test]
1086 fn reason_queue_pressure_without_latency_fairness_issue() {
1087 let policy = FlowControlPolicy::default();
1088 let mut snap = stable_snapshot();
1089 snap.queues.output = policy.config.output_soft_cap_bytes;
1091 let decision = policy.evaluate(snap);
1092 assert_eq!(decision.reason, DecisionReason::QueuePressure);
1093 }
1094
1095 #[test]
1096 fn reason_hard_cap_exceeded_overrides_everything() {
1097 let policy = FlowControlPolicy::default();
1098 let mut snap = stable_snapshot();
1099 snap.output_hard_cap_duration_ms = policy.config.hard_cap_terminate_ms;
1100 let decision = policy.evaluate(snap);
1101 assert_eq!(decision.reason, DecisionReason::HardCapExceeded);
1102 assert_eq!(
1103 decision.chosen_action,
1104 Some(BackpressureAction::TerminateSession)
1105 );
1106 }
1107
1108 #[test]
1111 fn pause_pty_reads_at_output_hard_cap() {
1112 let policy = FlowControlPolicy::default();
1113 let mut snap = stable_snapshot();
1114 snap.queues.output = policy.config.output_hard_cap_bytes;
1115 let decision = policy.evaluate(snap);
1116 assert!(decision.should_pause_pty_reads);
1117 }
1118
1119 #[test]
1120 fn no_pause_pty_reads_below_hard_cap() {
1121 let policy = FlowControlPolicy::default();
1122 let mut snap = stable_snapshot();
1123 snap.queues.output = policy.config.output_hard_cap_bytes - 1;
1124 let decision = policy.evaluate(snap);
1125 assert!(!decision.should_pause_pty_reads);
1126 }
1127
1128 #[test]
1131 fn terminate_has_fixed_throughput_loss() {
1132 let policy = FlowControlPolicy::default();
1133 let signals = PressureSignals {
1134 oom_signal: 0.5,
1135 latency_signal: 0.5,
1136 throughput_signal: 0.5,
1137 };
1138 let loss = policy.score_action(BackpressureAction::TerminateSession, signals);
1139 assert_close(
1140 loss.throughput_loss,
1141 policy.config.terminate_throughput_loss,
1142 1e-9,
1143 );
1144 assert_close(loss.oom_risk, 0.0, 1e-9);
1145 assert_close(loss.latency_risk, 0.0, 1e-9);
1146 }
1147
1148 #[test]
1149 fn zero_pressure_yields_minimal_non_terminate_losses() {
1150 let policy = FlowControlPolicy::default();
1151 let signals = PressureSignals {
1152 oom_signal: 0.0,
1153 latency_signal: 0.0,
1154 throughput_signal: 0.0,
1155 };
1156 for action in [
1157 BackpressureAction::CoalesceNonInteractive,
1158 BackpressureAction::ThrottleOutput,
1159 BackpressureAction::DropNonInteractive,
1160 ] {
1161 let loss = policy.score_action(action, signals);
1162 assert_close(loss.oom_risk, 0.0, 1e-9);
1163 assert_close(loss.latency_risk, 0.0, 1e-9);
1164 assert!(loss.throughput_loss > 0.0);
1166 }
1167 }
1168
1169 #[test]
1170 fn coalesce_always_cheapest_under_zero_pressure() {
1171 let policy = FlowControlPolicy::default();
1172 let signals = PressureSignals {
1173 oom_signal: 0.0,
1174 latency_signal: 0.0,
1175 throughput_signal: 0.0,
1176 };
1177 let coalesce = policy.score_action(BackpressureAction::CoalesceNonInteractive, signals);
1178 let throttle = policy.score_action(BackpressureAction::ThrottleOutput, signals);
1179 let drop = policy.score_action(BackpressureAction::DropNonInteractive, signals);
1180 assert!(
1181 coalesce.expected_loss <= throttle.expected_loss,
1182 "coalesce should be <= throttle at zero pressure"
1183 );
1184 assert!(
1185 throttle.expected_loss <= drop.expected_loss,
1186 "throttle should be <= drop at zero pressure"
1187 );
1188 }
1189
1190 #[test]
1191 fn high_pressure_increases_oom_and_latency_risk() {
1192 let policy = FlowControlPolicy::default();
1193 let low = PressureSignals {
1194 oom_signal: 0.1,
1195 latency_signal: 0.1,
1196 throughput_signal: 0.1,
1197 };
1198 let high = PressureSignals {
1199 oom_signal: 0.9,
1200 latency_signal: 0.9,
1201 throughput_signal: 0.9,
1202 };
1203 let action = BackpressureAction::CoalesceNonInteractive;
1204 let loss_low = policy.score_action(action, low);
1205 let loss_high = policy.score_action(action, high);
1206 assert!(loss_high.oom_risk > loss_low.oom_risk);
1207 assert!(loss_high.latency_risk > loss_low.latency_risk);
1208 assert!(loss_high.throughput_loss > loss_low.throughput_loss);
1209 }
1210
1211 #[test]
1214 fn pressure_signals_all_zero_when_stable() {
1215 let policy = FlowControlPolicy::default();
1216 let snap = stable_snapshot();
1217 let fi = snap.fairness_index();
1218 let signals = policy.pressure_signals(snap, fi);
1219 assert_close(signals.oom_signal, 0.0, 1e-9);
1220 assert_close(signals.latency_signal, 0.0, 1e-9);
1221 assert_close(signals.throughput_signal, 0.0, 1e-9);
1222 }
1223
1224 #[test]
1225 fn oom_signal_rises_with_queue_depth() {
1226 let policy = FlowControlPolicy::default();
1227 let mut snap = stable_snapshot();
1228 snap.queues.output = (policy.config.output_hard_cap_bytes as f64 * 0.9) as u32;
1230 let fi = snap.fairness_index();
1231 let signals = policy.pressure_signals(snap, fi);
1232 assert!(
1233 signals.oom_signal > 0.0,
1234 "oom_signal should rise at 90% of hard cap: got {}",
1235 signals.oom_signal
1236 );
1237 }
1238
1239 #[test]
1240 fn oom_signal_rises_with_utilization_above_one() {
1241 let policy = FlowControlPolicy::default();
1242 let mut snap = stable_snapshot();
1243 snap.rates.lambda_out = 200_000;
1244 snap.rates.mu_out = 100_000; let fi = snap.fairness_index();
1246 let signals = policy.pressure_signals(snap, fi);
1247 assert!(
1248 signals.oom_signal > 0.0,
1249 "oom_signal should rise when rho > 1: got {}",
1250 signals.oom_signal
1251 );
1252 }
1253
1254 #[test]
1255 fn latency_signal_rises_when_p95_exceeds_budget() {
1256 let policy = FlowControlPolicy::default();
1257 let mut snap = stable_snapshot();
1258 snap.latency.key_p95_ms = policy.config.key_latency_budget_ms * 2.0;
1259 let fi = snap.fairness_index();
1260 let signals = policy.pressure_signals(snap, fi);
1261 assert!(
1262 signals.latency_signal > 0.0,
1263 "latency_signal should rise when p95 > budget: got {}",
1264 signals.latency_signal
1265 );
1266 }
1267
1268 #[test]
1269 fn latency_signal_rises_with_fairness_shortfall() {
1270 let policy = FlowControlPolicy::default();
1271 let mut snap = stable_snapshot();
1272 snap.serviced_input_bytes = 1;
1273 snap.serviced_output_bytes = 1_000_000;
1274 let fi = snap.fairness_index();
1275 assert!(fi < policy.config.fairness_floor);
1276 let signals = policy.pressure_signals(snap, fi);
1277 assert!(
1278 signals.latency_signal > 0.0,
1279 "latency_signal should rise when fairness < floor: got {}",
1280 signals.latency_signal
1281 );
1282 }
1283
1284 #[test]
1285 fn throughput_signal_rises_with_output_utilization() {
1286 let policy = FlowControlPolicy::default();
1287 let mut snap = stable_snapshot();
1288 snap.rates.lambda_out = 200_000;
1289 snap.rates.mu_out = 100_000; let fi = snap.fairness_index();
1291 let signals = policy.pressure_signals(snap, fi);
1292 assert!(
1293 signals.throughput_signal > 0.0,
1294 "throughput_signal should rise when rho_out > 1: got {}",
1295 signals.throughput_signal
1296 );
1297 }
1298
1299 #[test]
1300 fn throughput_signal_rises_with_output_soft_ratio() {
1301 let policy = FlowControlPolicy::default();
1302 let mut snap = stable_snapshot();
1303 snap.queues.output = policy.config.output_soft_cap_bytes * 2;
1304 let fi = snap.fairness_index();
1305 let signals = policy.pressure_signals(snap, fi);
1306 assert!(
1307 signals.throughput_signal > 0.0,
1308 "throughput_signal should rise when output > soft cap: got {}",
1309 signals.throughput_signal
1310 );
1311 }
1312
1313 #[test]
1316 fn evaluate_losses_array_covers_all_actions() {
1317 let policy = FlowControlPolicy::default();
1318 let decision = policy.evaluate(stable_snapshot());
1319 assert_eq!(decision.losses.len(), 4);
1320 let actions: Vec<_> = decision.losses.iter().map(|l| l.action).collect();
1321 assert!(actions.contains(&BackpressureAction::CoalesceNonInteractive));
1322 assert!(actions.contains(&BackpressureAction::ThrottleOutput));
1323 assert!(actions.contains(&BackpressureAction::DropNonInteractive));
1324 assert!(actions.contains(&BackpressureAction::TerminateSession));
1325 }
1326
1327 #[test]
1330 fn hard_cap_just_below_threshold_does_not_terminate() {
1331 let policy = FlowControlPolicy::default();
1332 let mut snap = stable_snapshot();
1333 snap.output_hard_cap_duration_ms = policy.config.hard_cap_terminate_ms - 1;
1334 snap.queues.output = policy.config.output_hard_cap_bytes;
1335 let decision = policy.evaluate(snap);
1336 assert_ne!(decision.reason, DecisionReason::HardCapExceeded);
1337 assert_ne!(
1338 decision.chosen_action,
1339 Some(BackpressureAction::TerminateSession)
1340 );
1341 }
1342
1343 #[test]
1346 fn default_weights_hierarchy() {
1347 let w = LossWeights::default();
1348 assert!(w.oom > w.latency);
1350 assert!(w.latency > w.throughput);
1351 }
1352
1353 #[test]
1354 fn default_config_cap_hierarchy() {
1355 let c = FlowControlConfig::default();
1356 assert!(c.input_soft_cap_bytes < c.input_hard_cap_bytes);
1357 assert!(c.output_soft_cap_bytes < c.output_hard_cap_bytes);
1358 assert!(c.output_batch_recovery_bytes < c.output_batch_with_input_bytes);
1359 assert!(c.output_batch_with_input_bytes < c.output_batch_idle_bytes);
1360 }
1361
1362 #[test]
1365 fn custom_config_changes_behavior() {
1366 let config = FlowControlConfig {
1367 input_soft_cap_bytes: 100,
1368 input_hard_cap_bytes: 200,
1369 output_soft_cap_bytes: 100,
1370 output_hard_cap_bytes: 200,
1371 ..FlowControlConfig::default()
1372 };
1373 let policy = FlowControlPolicy::new(config);
1374 assert!(policy.should_drop_input_event(200, InputEventClass::NonInteractive));
1376 assert!(!policy.should_drop_input_event(199, InputEventClass::NonInteractive));
1377 }
1378}