Skip to main content

frankenterm_core/
flow_control.rs

1//! Deterministic flow-control and backpressure policy for FrankenTerm remote I/O.
2//!
3//! This module encodes the queueing and fairness policy described in
4//! `docs/spec/frankenterm-websocket-protocol.md` section 4 so remote transport
5//! implementations can share one deterministic decision engine.
6
7use core::cmp::Ordering;
8
9const KIB: u32 = 1024;
10const ACTION_COUNT: usize = 4;
11
12/// Cost weights for asymmetric backpressure loss.
13#[derive(Debug, Clone, Copy, PartialEq)]
14pub struct LossWeights {
15    /// Catastrophic memory growth risk.
16    pub oom: f64,
17    /// Keystroke latency risk.
18    pub latency: f64,
19    /// Throughput degradation cost.
20    pub throughput: f64,
21}
22
23impl Default for LossWeights {
24    fn default() -> Self {
25        Self {
26            oom: 1_000_000.0,
27            latency: 10_000.0,
28            throughput: 100.0,
29        }
30    }
31}
32
33/// Runtime policy configuration for flow control decisions.
34#[derive(Debug, Clone, Copy, PartialEq)]
35pub struct FlowControlConfig {
36    /// Input queue soft cap.
37    pub input_soft_cap_bytes: u32,
38    /// Input queue hard cap.
39    pub input_hard_cap_bytes: u32,
40    /// Output queue soft cap.
41    pub output_soft_cap_bytes: u32,
42    /// Output queue hard cap.
43    pub output_hard_cap_bytes: u32,
44    /// Fairness lower bound (Jain index).
45    pub fairness_floor: f64,
46    /// Keystroke p95 latency budget.
47    pub key_latency_budget_ms: f64,
48    /// Output batch when input queue is non-empty.
49    pub output_batch_with_input_bytes: u32,
50    /// Output batch when input queue is empty.
51    pub output_batch_idle_bytes: u32,
52    /// Output batch while recovering fairness/latency.
53    pub output_batch_recovery_bytes: u32,
54    /// Trigger window-based replenish at this elapsed interval.
55    pub replenish_interval_ms: u64,
56    /// Terminate if output queue stays at hard cap longer than this.
57    pub hard_cap_terminate_ms: u64,
58    /// Cost assigned to hard disconnect (`terminate_session`).
59    pub terminate_throughput_loss: f64,
60    /// Loss function weights.
61    pub weights: LossWeights,
62}
63
64impl Default for FlowControlConfig {
65    fn default() -> Self {
66        Self {
67            input_soft_cap_bytes: 12 * KIB,
68            input_hard_cap_bytes: 16 * KIB,
69            output_soft_cap_bytes: 192 * KIB,
70            output_hard_cap_bytes: 256 * KIB,
71            fairness_floor: 0.80,
72            key_latency_budget_ms: 50.0,
73            output_batch_with_input_bytes: 32 * KIB,
74            output_batch_idle_bytes: 64 * KIB,
75            output_batch_recovery_bytes: 8 * KIB,
76            replenish_interval_ms: 10,
77            hard_cap_terminate_ms: 5_000,
78            terminate_throughput_loss: 6_000.0,
79            weights: LossWeights::default(),
80        }
81    }
82}
83
84/// Queue depths used by the policy at decision time.
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub struct QueueDepthBytes {
87    /// Input bytes waiting for PTY write.
88    pub input: u32,
89    /// Output bytes waiting for websocket send.
90    pub output: u32,
91    /// Client render queue depth in frames.
92    pub render_frames: u8,
93}
94
95/// Sliding-window throughput rates (bytes/sec).
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub struct RateWindowBps {
98    /// Arrival rate into input queue.
99    pub lambda_in: u32,
100    /// Arrival rate into output queue.
101    pub lambda_out: u32,
102    /// Service rate for PTY writes.
103    pub mu_in: u32,
104    /// Service rate for websocket output.
105    pub mu_out: u32,
106}
107
108/// Keystroke latency summary.
109#[derive(Debug, Clone, Copy, PartialEq)]
110pub struct LatencyWindowMs {
111    /// p50 key latency (ms).
112    pub key_p50_ms: f64,
113    /// p95 key latency (ms).
114    pub key_p95_ms: f64,
115}
116
117/// Deterministic snapshot consumed by the policy evaluator.
118#[derive(Debug, Clone, Copy, PartialEq)]
119pub struct FlowControlSnapshot {
120    /// Current queue depths.
121    pub queues: QueueDepthBytes,
122    /// Sliding-window rates.
123    pub rates: RateWindowBps,
124    /// Sliding-window latency summary.
125    pub latency: LatencyWindowMs,
126    /// Bytes serviced from input lane in the fairness window.
127    pub serviced_input_bytes: u64,
128    /// Bytes serviced from output lane in the fairness window.
129    pub serviced_output_bytes: u64,
130    /// How long output queue has continuously stayed at hard cap.
131    pub output_hard_cap_duration_ms: u64,
132}
133
134impl FlowControlSnapshot {
135    /// Jain fairness index over serviced input/output bytes in this window.
136    #[must_use]
137    pub fn fairness_index(self) -> f64 {
138        jain_fairness_index(self.serviced_input_bytes, self.serviced_output_bytes)
139    }
140}
141
142/// Candidate backpressure action.
143#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
144pub enum BackpressureAction {
145    /// Coalesce bursty non-interactive events.
146    CoalesceNonInteractive,
147    /// Throttle output emission/production.
148    ThrottleOutput,
149    /// Drop non-interactive input events.
150    DropNonInteractive,
151    /// Hard-stop the session to prevent unbounded growth.
152    TerminateSession,
153}
154
155impl BackpressureAction {
156    #[must_use]
157    const fn tie_break_rank(self) -> u8 {
158        match self {
159            Self::CoalesceNonInteractive => 0,
160            Self::ThrottleOutput => 1,
161            Self::DropNonInteractive => 2,
162            Self::TerminateSession => 3,
163        }
164    }
165}
166
167/// Scored loss entry for one backpressure action.
168#[derive(Debug, Clone, Copy, PartialEq)]
169pub struct ActionLoss {
170    /// Candidate action.
171    pub action: BackpressureAction,
172    /// Estimated expected loss.
173    pub expected_loss: f64,
174    /// Estimated OOM probability.
175    pub oom_risk: f64,
176    /// Estimated latency-budget violation probability.
177    pub latency_risk: f64,
178    /// Estimated throughput-loss term.
179    pub throughput_loss: f64,
180}
181
182/// Reason code for the chosen policy outcome.
183#[derive(Debug, Clone, Copy, PartialEq, Eq)]
184pub enum DecisionReason {
185    /// No intervention needed.
186    Stable,
187    /// Queue pressure/rates require intervention.
188    QueuePressure,
189    /// Fairness or key latency breached budget.
190    ProtectKeyLatencyBudget,
191    /// Output hard-cap sustained for too long.
192    HardCapExceeded,
193}
194
195/// Evaluated policy decision.
196#[derive(Debug, Clone, Copy, PartialEq)]
197pub struct FlowControlDecision {
198    /// Selected action; `None` means maintain current behavior.
199    pub chosen_action: Option<BackpressureAction>,
200    /// Deterministic reason code for the decision.
201    pub reason: DecisionReason,
202    /// Jain fairness index for the current decision window.
203    pub fairness_index: f64,
204    /// Output-service budget to apply this loop.
205    pub output_batch_budget_bytes: u32,
206    /// Whether PTY reads should be paused due output hard-cap.
207    pub should_pause_pty_reads: bool,
208    /// Loss estimates for all candidate actions.
209    pub losses: [ActionLoss; ACTION_COUNT],
210}
211
212/// Input event class for drop policy.
213#[derive(Debug, Clone, Copy, PartialEq, Eq)]
214pub enum InputEventClass {
215    /// Keystrokes/paste/focus transitions (must not drop).
216    Interactive,
217    /// Mouse move/drag and other coalescible signals.
218    NonInteractive,
219}
220
221/// Deterministic policy evaluator for websocket remote flow control.
222#[derive(Debug, Clone, Copy, PartialEq)]
223pub struct FlowControlPolicy {
224    /// Runtime policy configuration.
225    pub config: FlowControlConfig,
226}
227
228impl Default for FlowControlPolicy {
229    fn default() -> Self {
230        Self::new(FlowControlConfig::default())
231    }
232}
233
234impl FlowControlPolicy {
235    /// Construct a policy with explicit configuration.
236    #[must_use]
237    pub const fn new(config: FlowControlConfig) -> Self {
238        Self { config }
239    }
240
241    /// Evaluate one deterministic backpressure decision.
242    #[must_use]
243    pub fn evaluate(self, snapshot: FlowControlSnapshot) -> FlowControlDecision {
244        let fairness_index = snapshot.fairness_index();
245        let losses = self.score_actions(snapshot, fairness_index);
246        let chosen_action = self.choose_action(snapshot, fairness_index, &losses);
247        let reason = self.reason(snapshot, fairness_index, chosen_action);
248        let output_batch_budget_bytes = self.output_batch_budget(
249            snapshot.queues.input,
250            fairness_index,
251            snapshot.latency.key_p95_ms,
252        );
253        let should_pause_pty_reads = snapshot.queues.output >= self.config.output_hard_cap_bytes;
254        FlowControlDecision {
255            chosen_action,
256            reason,
257            fairness_index,
258            output_batch_budget_bytes,
259            should_pause_pty_reads,
260            losses,
261        }
262    }
263
264    /// Replenish flow-control window at 50% consumption or interval timeout.
265    #[must_use]
266    pub fn should_replenish(self, consumed_bytes: u32, window_bytes: u32, elapsed_ms: u64) -> bool {
267        if window_bytes == 0 {
268            return true;
269        }
270        consumed_bytes.saturating_mul(2) >= window_bytes
271            || elapsed_ms >= self.config.replenish_interval_ms
272    }
273
274    /// Non-negotiable input drop rule: only non-interactive events are droppable.
275    #[must_use]
276    pub fn should_drop_input_event(self, queue_bytes: u32, class: InputEventClass) -> bool {
277        match class {
278            InputEventClass::Interactive => false,
279            InputEventClass::NonInteractive => queue_bytes >= self.config.input_hard_cap_bytes,
280        }
281    }
282
283    #[must_use]
284    fn output_batch_budget(
285        self,
286        input_queue_bytes: u32,
287        fairness_index: f64,
288        key_p95_ms: f64,
289    ) -> u32 {
290        let baseline = if input_queue_bytes > 0 {
291            self.config.output_batch_with_input_bytes
292        } else {
293            self.config.output_batch_idle_bytes
294        };
295        if fairness_index < self.config.fairness_floor
296            || key_p95_ms > self.config.key_latency_budget_ms
297        {
298            baseline.min(self.config.output_batch_recovery_bytes)
299        } else {
300            baseline
301        }
302    }
303
304    #[must_use]
305    fn reason(
306        self,
307        snapshot: FlowControlSnapshot,
308        fairness_index: f64,
309        chosen_action: Option<BackpressureAction>,
310    ) -> DecisionReason {
311        if snapshot.output_hard_cap_duration_ms >= self.config.hard_cap_terminate_ms {
312            return DecisionReason::HardCapExceeded;
313        }
314        if chosen_action.is_none() {
315            return DecisionReason::Stable;
316        }
317        if fairness_index < self.config.fairness_floor
318            || snapshot.latency.key_p95_ms > self.config.key_latency_budget_ms
319        {
320            DecisionReason::ProtectKeyLatencyBudget
321        } else {
322            DecisionReason::QueuePressure
323        }
324    }
325
326    #[must_use]
327    fn choose_action(
328        self,
329        snapshot: FlowControlSnapshot,
330        fairness_index: f64,
331        losses: &[ActionLoss; ACTION_COUNT],
332    ) -> Option<BackpressureAction> {
333        if snapshot.output_hard_cap_duration_ms >= self.config.hard_cap_terminate_ms {
334            return Some(BackpressureAction::TerminateSession);
335        }
336        if !self.is_pressured(snapshot, fairness_index) {
337            return None;
338        }
339        Some(select_best_action(losses))
340    }
341
342    #[must_use]
343    fn is_pressured(self, snapshot: FlowControlSnapshot, fairness_index: f64) -> bool {
344        let input_soft = snapshot.queues.input >= self.config.input_soft_cap_bytes;
345        let output_soft = snapshot.queues.output >= self.config.output_soft_cap_bytes;
346        let rho_in = ratio_u32(snapshot.rates.lambda_in, snapshot.rates.mu_in);
347        let rho_out = ratio_u32(snapshot.rates.lambda_out, snapshot.rates.mu_out);
348        input_soft
349            || output_soft
350            || rho_in > 1.0
351            || rho_out > 1.0
352            || fairness_index < self.config.fairness_floor
353            || snapshot.latency.key_p95_ms > self.config.key_latency_budget_ms
354    }
355
356    #[must_use]
357    fn score_actions(
358        self,
359        snapshot: FlowControlSnapshot,
360        fairness_index: f64,
361    ) -> [ActionLoss; ACTION_COUNT] {
362        let signals = self.pressure_signals(snapshot, fairness_index);
363        let actions = [
364            BackpressureAction::CoalesceNonInteractive,
365            BackpressureAction::ThrottleOutput,
366            BackpressureAction::DropNonInteractive,
367            BackpressureAction::TerminateSession,
368        ];
369        actions.map(|action| self.score_action(action, signals))
370    }
371
372    #[must_use]
373    fn score_action(self, action: BackpressureAction, signals: PressureSignals) -> ActionLoss {
374        let (oom_risk, latency_risk, throughput_loss) = match action {
375            BackpressureAction::CoalesceNonInteractive => (
376                0.35 * signals.oom_signal.powi(3),
377                0.50 * signals.latency_signal.powi(2),
378                0.08 + 0.18 * signals.throughput_signal,
379            ),
380            BackpressureAction::ThrottleOutput => (
381                0.22 * signals.oom_signal.powi(3),
382                0.28 * signals.latency_signal.powi(2),
383                0.24 + 0.32 * signals.throughput_signal,
384            ),
385            BackpressureAction::DropNonInteractive => (
386                0.15 * signals.oom_signal.powi(3),
387                0.20 * signals.latency_signal.powi(2),
388                0.42 + 0.45 * signals.throughput_signal,
389            ),
390            BackpressureAction::TerminateSession => {
391                (0.0, 0.0, self.config.terminate_throughput_loss)
392            }
393        };
394        let expected_loss = (self.config.weights.oom * oom_risk)
395            + (self.config.weights.latency * latency_risk)
396            + (self.config.weights.throughput * throughput_loss);
397        ActionLoss {
398            action,
399            expected_loss,
400            oom_risk,
401            latency_risk,
402            throughput_loss,
403        }
404    }
405
406    #[must_use]
407    fn pressure_signals(
408        self,
409        snapshot: FlowControlSnapshot,
410        fairness_index: f64,
411    ) -> PressureSignals {
412        let out_hard_ratio = ratio_u32(snapshot.queues.output, self.config.output_hard_cap_bytes);
413        let in_hard_ratio = ratio_u32(snapshot.queues.input, self.config.input_hard_cap_bytes);
414        let out_soft_ratio = ratio_u32(snapshot.queues.output, self.config.output_soft_cap_bytes);
415        let rho_in = ratio_u32(snapshot.rates.lambda_in, snapshot.rates.mu_in);
416        let rho_out = ratio_u32(snapshot.rates.lambda_out, snapshot.rates.mu_out);
417
418        let queue_pressure = out_hard_ratio.max(in_hard_ratio);
419        let util_pressure = ((rho_in.max(rho_out) - 1.0).max(0.0) / 0.5).min(1.0);
420        let oom_signal = clamp01(((queue_pressure - 0.70).max(0.0) / 0.30).max(util_pressure));
421
422        let latency_over_budget =
423            (snapshot.latency.key_p95_ms / self.config.key_latency_budget_ms - 1.0).max(0.0);
424        let fairness_shortfall = if fairness_index < self.config.fairness_floor {
425            (self.config.fairness_floor - fairness_index) / self.config.fairness_floor
426        } else {
427            0.0
428        };
429        let latency_signal = clamp01(
430            latency_over_budget
431                + fairness_shortfall
432                + ((rho_in - 1.0).max(0.0))
433                + (ratio_u32(snapshot.queues.input, self.config.input_soft_cap_bytes) - 1.0)
434                    .max(0.0),
435        );
436
437        let throughput_signal = clamp01((rho_out - 1.0).max(0.0) + (out_soft_ratio - 1.0).max(0.0));
438        PressureSignals {
439            oom_signal,
440            latency_signal,
441            throughput_signal,
442        }
443    }
444}
445
446#[derive(Debug, Clone, Copy, PartialEq)]
447struct PressureSignals {
448    oom_signal: f64,
449    latency_signal: f64,
450    throughput_signal: f64,
451}
452
453#[must_use]
454fn select_best_action(losses: &[ActionLoss; ACTION_COUNT]) -> BackpressureAction {
455    let mut best = losses[0];
456    for candidate in losses.iter().copied().skip(1) {
457        let ordering = candidate.expected_loss.total_cmp(&best.expected_loss);
458        if ordering == Ordering::Less
459            || (ordering == Ordering::Equal
460                && candidate.action.tie_break_rank() < best.action.tie_break_rank())
461        {
462            best = candidate;
463        }
464    }
465    best.action
466}
467
468#[must_use]
469fn ratio_u32(numerator: u32, denominator: u32) -> f64 {
470    if denominator == 0 {
471        return f64::INFINITY;
472    }
473    f64::from(numerator) / f64::from(denominator)
474}
475
476#[must_use]
477fn clamp01(value: f64) -> f64 {
478    value.clamp(0.0, 1.0)
479}
480
481/// Jain fairness index over two serviced-byte streams.
482#[must_use]
483pub fn jain_fairness_index(serviced_input_bytes: u64, serviced_output_bytes: u64) -> f64 {
484    let input = serviced_input_bytes as f64;
485    let output = serviced_output_bytes as f64;
486    let denominator = 2.0 * (input * input + output * output);
487    if denominator <= f64::EPSILON {
488        return 1.0;
489    }
490    ((input + output) * (input + output)) / denominator
491}
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496
497    #[test]
498    fn jain_fairness_index_matches_expected_limits() {
499        assert_close(jain_fairness_index(0, 0), 1.0, 1e-9);
500        assert_close(jain_fairness_index(100, 100), 1.0, 1e-9);
501        assert_close(jain_fairness_index(100, 0), 0.5, 1e-9);
502    }
503
504    #[test]
505    fn output_batch_budget_respects_fairness_and_latency() {
506        let policy = FlowControlPolicy::default();
507        let baseline = policy.output_batch_budget(0, 0.95, 10.0);
508        assert_eq!(baseline, 64 * KIB);
509
510        let with_input = policy.output_batch_budget(1, 0.95, 10.0);
511        assert_eq!(with_input, 32 * KIB);
512
513        let fairness_recovery = policy.output_batch_budget(1, 0.60, 10.0);
514        assert_eq!(fairness_recovery, 8 * KIB);
515
516        let latency_recovery = policy.output_batch_budget(0, 0.95, 120.0);
517        assert_eq!(latency_recovery, 8 * KIB);
518    }
519
520    #[test]
521    fn stable_snapshot_emits_no_action() {
522        let policy = FlowControlPolicy::default();
523        let snapshot = FlowControlSnapshot {
524            queues: QueueDepthBytes {
525                input: 1024,
526                output: 4096,
527                render_frames: 0,
528            },
529            rates: RateWindowBps {
530                lambda_in: 1_000,
531                lambda_out: 20_000,
532                mu_in: 10_000,
533                mu_out: 100_000,
534            },
535            latency: LatencyWindowMs {
536                key_p50_ms: 2.0,
537                key_p95_ms: 8.0,
538            },
539            serviced_input_bytes: 40_000,
540            serviced_output_bytes: 42_000,
541            output_hard_cap_duration_ms: 0,
542        };
543        let decision = policy.evaluate(snapshot);
544        assert_eq!(decision.chosen_action, None);
545        assert_eq!(decision.reason, DecisionReason::Stable);
546        assert_eq!(decision.output_batch_budget_bytes, 32 * KIB);
547    }
548
549    #[test]
550    fn tie_break_order_is_deterministic() {
551        let losses = [
552            ActionLoss {
553                action: BackpressureAction::CoalesceNonInteractive,
554                expected_loss: 10.0,
555                oom_risk: 0.0,
556                latency_risk: 0.0,
557                throughput_loss: 0.0,
558            },
559            ActionLoss {
560                action: BackpressureAction::ThrottleOutput,
561                expected_loss: 10.0,
562                oom_risk: 0.0,
563                latency_risk: 0.0,
564                throughput_loss: 0.0,
565            },
566            ActionLoss {
567                action: BackpressureAction::DropNonInteractive,
568                expected_loss: 10.0,
569                oom_risk: 0.0,
570                latency_risk: 0.0,
571                throughput_loss: 0.0,
572            },
573            ActionLoss {
574                action: BackpressureAction::TerminateSession,
575                expected_loss: 10.0,
576                oom_risk: 0.0,
577                latency_risk: 0.0,
578                throughput_loss: 0.0,
579            },
580        ];
581        assert_eq!(
582            select_best_action(&losses),
583            BackpressureAction::CoalesceNonInteractive
584        );
585    }
586
587    #[test]
588    fn hard_cap_duration_forces_terminate() {
589        let policy = FlowControlPolicy::default();
590        let snapshot = FlowControlSnapshot {
591            queues: QueueDepthBytes {
592                input: 0,
593                output: policy.config.output_hard_cap_bytes,
594                render_frames: 1,
595            },
596            rates: RateWindowBps {
597                lambda_in: 0,
598                lambda_out: 1_000_000,
599                mu_in: 1,
600                mu_out: 200_000,
601            },
602            latency: LatencyWindowMs {
603                key_p50_ms: 10.0,
604                key_p95_ms: 60.0,
605            },
606            serviced_input_bytes: 128,
607            serviced_output_bytes: 64_000,
608            output_hard_cap_duration_ms: policy.config.hard_cap_terminate_ms,
609        };
610        let decision = policy.evaluate(snapshot);
611        assert_eq!(
612            decision.chosen_action,
613            Some(BackpressureAction::TerminateSession)
614        );
615        assert_eq!(decision.reason, DecisionReason::HardCapExceeded);
616        assert!(decision.should_pause_pty_reads);
617    }
618
619    #[test]
620    fn deterministic_stress_simulation_keeps_queues_bounded() {
621        let policy = FlowControlPolicy::default();
622        let dt_ms = 10_u64;
623        let steps = 6_000_u32; // 60s
624
625        let rates = RateWindowBps {
626            lambda_in: 4_000,
627            lambda_out: 1_000_000,
628            mu_in: 80_000,
629            mu_out: 300_000,
630        };
631
632        let mut q_in = 0_u32;
633        let mut q_out = 0_u32;
634        let mut hard_cap_duration_ms = 0_u64;
635        let mut max_q_in = 0_u32;
636        let mut max_q_out = 0_u32;
637        let mut saw_intervention = false;
638        let mut terminated = false;
639        let mut key_latencies = Vec::with_capacity(steps as usize);
640
641        for _ in 0..steps {
642            let key_latency_ms = latency_from_queue(q_in, rates.mu_in);
643            let snapshot = FlowControlSnapshot {
644                queues: QueueDepthBytes {
645                    input: q_in,
646                    output: q_out,
647                    render_frames: 1,
648                },
649                rates,
650                latency: LatencyWindowMs {
651                    key_p50_ms: (key_latency_ms / 2.0).max(1.0),
652                    key_p95_ms: key_latency_ms,
653                },
654                serviced_input_bytes: u64::from(bytes_for_interval(rates.mu_in, dt_ms)),
655                serviced_output_bytes: u64::from(bytes_for_interval(rates.mu_out, dt_ms)),
656                output_hard_cap_duration_ms: hard_cap_duration_ms,
657            };
658
659            let decision = policy.evaluate(snapshot);
660            if decision.chosen_action.is_some() {
661                saw_intervention = true;
662            }
663
664            let mut input_arrival = bytes_for_interval(rates.lambda_in, dt_ms);
665            let mut output_arrival = bytes_for_interval(rates.lambda_out, dt_ms);
666
667            match decision.chosen_action {
668                Some(BackpressureAction::CoalesceNonInteractive) => {
669                    input_arrival = input_arrival.saturating_mul(7) / 10;
670                    output_arrival = output_arrival.saturating_mul(8) / 10;
671                }
672                Some(BackpressureAction::ThrottleOutput) => {
673                    output_arrival = output_arrival.saturating_mul(18) / 100;
674                }
675                Some(BackpressureAction::DropNonInteractive) => {
676                    input_arrival /= 2;
677                }
678                Some(BackpressureAction::TerminateSession) => {
679                    terminated = true;
680                    break;
681                }
682                None => {}
683            }
684
685            if decision.should_pause_pty_reads {
686                output_arrival = 0;
687            }
688
689            q_in = q_in.saturating_add(input_arrival);
690            q_out = q_out.saturating_add(output_arrival);
691
692            if q_in > policy.config.input_hard_cap_bytes {
693                q_in = policy.config.input_hard_cap_bytes;
694            }
695            if q_out > policy.config.output_hard_cap_bytes {
696                q_out = policy.config.output_hard_cap_bytes;
697            }
698
699            let input_service = bytes_for_interval(rates.mu_in, dt_ms).min(q_in);
700            q_in -= input_service;
701
702            let output_budget = bytes_for_interval(rates.mu_out, dt_ms)
703                .min(decision.output_batch_budget_bytes)
704                .min(q_out);
705            q_out -= output_budget;
706
707            max_q_in = max_q_in.max(q_in);
708            max_q_out = max_q_out.max(q_out);
709            hard_cap_duration_ms = if q_out >= policy.config.output_hard_cap_bytes {
710                hard_cap_duration_ms.saturating_add(dt_ms)
711            } else {
712                0
713            };
714            key_latencies.push(latency_from_queue(q_in, rates.mu_in));
715        }
716
717        assert!(
718            saw_intervention,
719            "policy should intervene under output flood"
720        );
721        assert!(
722            !terminated,
723            "policy should recover before termination in this scenario"
724        );
725        assert!(max_q_in <= policy.config.input_hard_cap_bytes);
726        assert!(max_q_out <= policy.config.output_hard_cap_bytes);
727        let key_p95 = percentile(&key_latencies, 95);
728        assert!(
729            key_p95 <= 100.0,
730            "expected p95 <= 100ms, got {key_p95:.2}ms"
731        );
732    }
733
734    #[test]
735    fn interactive_events_are_never_dropped() {
736        let policy = FlowControlPolicy::default();
737        assert!(!policy.should_drop_input_event(
738            policy.config.input_hard_cap_bytes,
739            InputEventClass::Interactive
740        ));
741        assert!(policy.should_drop_input_event(
742            policy.config.input_hard_cap_bytes,
743            InputEventClass::NonInteractive
744        ));
745    }
746
747    fn bytes_for_interval(rate_bps: u32, dt_ms: u64) -> u32 {
748        let bytes = u128::from(rate_bps) * u128::from(dt_ms) / 1_000_u128;
749        u32::try_from(bytes).unwrap_or(u32::MAX)
750    }
751
752    fn latency_from_queue(queue_bytes: u32, service_bps: u32) -> f64 {
753        if service_bps == 0 {
754            return f64::INFINITY;
755        }
756        1_000.0 * (f64::from(queue_bytes) / f64::from(service_bps))
757    }
758
759    fn percentile(values: &[f64], pct: u8) -> f64 {
760        if values.is_empty() {
761            return 0.0;
762        }
763        let mut sorted = values.to_vec();
764        sorted.sort_by(f64::total_cmp);
765        let last = sorted.len() - 1;
766        let index = (last * usize::from(pct)) / 100;
767        sorted[index]
768    }
769
770    fn assert_close(actual: f64, expected: f64, epsilon: f64) {
771        let delta = (actual - expected).abs();
772        assert!(
773            delta <= epsilon,
774            "expected {expected}, got {actual}, delta={delta}"
775        );
776    }
777
778    // --- Helper: build a stable (no-pressure) snapshot ---
779    fn stable_snapshot() -> FlowControlSnapshot {
780        FlowControlSnapshot {
781            queues: QueueDepthBytes {
782                input: 1024,
783                output: 4096,
784                render_frames: 0,
785            },
786            rates: RateWindowBps {
787                lambda_in: 1_000,
788                lambda_out: 20_000,
789                mu_in: 10_000,
790                mu_out: 100_000,
791            },
792            latency: LatencyWindowMs {
793                key_p50_ms: 2.0,
794                key_p95_ms: 8.0,
795            },
796            serviced_input_bytes: 40_000,
797            serviced_output_bytes: 42_000,
798            output_hard_cap_duration_ms: 0,
799        }
800    }
801
802    // ---- ratio_u32 ----
803
804    #[test]
805    fn ratio_u32_normal_division() {
806        assert_close(ratio_u32(100, 200), 0.5, 1e-9);
807        assert_close(ratio_u32(200, 100), 2.0, 1e-9);
808        assert_close(ratio_u32(0, 100), 0.0, 1e-9);
809    }
810
811    #[test]
812    fn ratio_u32_zero_denominator_is_infinity() {
813        assert!(ratio_u32(100, 0).is_infinite());
814        assert!(ratio_u32(0, 0).is_infinite());
815    }
816
817    // ---- clamp01 ----
818
819    #[test]
820    fn clamp01_bounds() {
821        assert_close(clamp01(-1.0), 0.0, 1e-9);
822        assert_close(clamp01(0.5), 0.5, 1e-9);
823        assert_close(clamp01(1.5), 1.0, 1e-9);
824        assert_close(clamp01(0.0), 0.0, 1e-9);
825        assert_close(clamp01(1.0), 1.0, 1e-9);
826    }
827
828    // ---- jain_fairness_index additional cases ----
829
830    #[test]
831    fn jain_fairness_index_asymmetric() {
832        // When one side dominates, fairness should be low
833        let f = jain_fairness_index(1_000_000, 1);
834        assert!(f < 0.6, "highly asymmetric should be near 0.5: got {f}");
835        assert!(f >= 0.5, "Jain index for 2 flows is always >= 0.5: got {f}");
836    }
837
838    #[test]
839    fn jain_fairness_index_symmetry() {
840        // Order should not matter
841        let a = jain_fairness_index(100, 200);
842        let b = jain_fairness_index(200, 100);
843        assert_close(a, b, 1e-9);
844    }
845
846    #[test]
847    fn fairness_index_snapshot_convenience() {
848        let snap = stable_snapshot();
849        let direct = jain_fairness_index(snap.serviced_input_bytes, snap.serviced_output_bytes);
850        assert_close(snap.fairness_index(), direct, 1e-12);
851    }
852
853    // ---- should_replenish ----
854
855    #[test]
856    fn should_replenish_always_when_window_zero() {
857        let policy = FlowControlPolicy::default();
858        assert!(policy.should_replenish(0, 0, 0));
859    }
860
861    #[test]
862    fn should_replenish_at_50_percent_consumption() {
863        let policy = FlowControlPolicy::default();
864        // consumed*2 >= window → true
865        assert!(policy.should_replenish(500, 1000, 0));
866        // consumed*2 < window → false (if elapsed < interval)
867        assert!(!policy.should_replenish(400, 1000, 0));
868    }
869
870    #[test]
871    fn should_replenish_on_interval_timeout() {
872        let policy = FlowControlPolicy::default();
873        // Low consumption but elapsed >= replenish_interval_ms
874        assert!(policy.should_replenish(0, 10_000, policy.config.replenish_interval_ms));
875        assert!(!policy.should_replenish(0, 10_000, policy.config.replenish_interval_ms - 1));
876    }
877
878    // ---- should_drop_input_event ----
879
880    #[test]
881    fn non_interactive_dropped_only_at_hard_cap() {
882        let policy = FlowControlPolicy::default();
883        let below = policy.config.input_hard_cap_bytes - 1;
884        assert!(!policy.should_drop_input_event(below, InputEventClass::NonInteractive));
885        assert!(policy.should_drop_input_event(
886            policy.config.input_hard_cap_bytes,
887            InputEventClass::NonInteractive
888        ));
889    }
890
891    #[test]
892    fn interactive_never_dropped_even_at_max() {
893        let policy = FlowControlPolicy::default();
894        assert!(!policy.should_drop_input_event(u32::MAX, InputEventClass::Interactive));
895    }
896
897    // ---- BackpressureAction tie_break_rank ordering ----
898
899    #[test]
900    fn tie_break_ranks_are_ordered() {
901        assert!(
902            BackpressureAction::CoalesceNonInteractive.tie_break_rank()
903                < BackpressureAction::ThrottleOutput.tie_break_rank()
904        );
905        assert!(
906            BackpressureAction::ThrottleOutput.tie_break_rank()
907                < BackpressureAction::DropNonInteractive.tie_break_rank()
908        );
909        assert!(
910            BackpressureAction::DropNonInteractive.tie_break_rank()
911                < BackpressureAction::TerminateSession.tie_break_rank()
912        );
913    }
914
915    // ---- select_best_action ----
916
917    #[test]
918    fn select_best_action_picks_lowest_loss() {
919        let losses = [
920            ActionLoss {
921                action: BackpressureAction::CoalesceNonInteractive,
922                expected_loss: 50.0,
923                oom_risk: 0.0,
924                latency_risk: 0.0,
925                throughput_loss: 0.0,
926            },
927            ActionLoss {
928                action: BackpressureAction::ThrottleOutput,
929                expected_loss: 10.0,
930                oom_risk: 0.0,
931                latency_risk: 0.0,
932                throughput_loss: 0.0,
933            },
934            ActionLoss {
935                action: BackpressureAction::DropNonInteractive,
936                expected_loss: 30.0,
937                oom_risk: 0.0,
938                latency_risk: 0.0,
939                throughput_loss: 0.0,
940            },
941            ActionLoss {
942                action: BackpressureAction::TerminateSession,
943                expected_loss: 100.0,
944                oom_risk: 0.0,
945                latency_risk: 0.0,
946                throughput_loss: 0.0,
947            },
948        ];
949        assert_eq!(
950            select_best_action(&losses),
951            BackpressureAction::ThrottleOutput
952        );
953    }
954
955    // ---- output_batch_budget edge cases ----
956
957    #[test]
958    fn output_batch_budget_idle_no_pressure() {
959        let policy = FlowControlPolicy::default();
960        let budget = policy.output_batch_budget(0, 1.0, 10.0);
961        assert_eq!(budget, policy.config.output_batch_idle_bytes);
962    }
963
964    #[test]
965    fn output_batch_budget_with_input_no_pressure() {
966        let policy = FlowControlPolicy::default();
967        let budget = policy.output_batch_budget(100, 1.0, 10.0);
968        assert_eq!(budget, policy.config.output_batch_with_input_bytes);
969    }
970
971    #[test]
972    fn output_batch_budget_fairness_recovery_clamps() {
973        let policy = FlowControlPolicy::default();
974        // Fairness below floor triggers recovery
975        let budget = policy.output_batch_budget(100, 0.5, 10.0);
976        assert_eq!(budget, policy.config.output_batch_recovery_bytes);
977    }
978
979    #[test]
980    fn output_batch_budget_latency_recovery_clamps() {
981        let policy = FlowControlPolicy::default();
982        // p95 above budget triggers recovery
983        let budget = policy.output_batch_budget(0, 1.0, 200.0);
984        assert_eq!(budget, policy.config.output_batch_recovery_bytes);
985    }
986
987    #[test]
988    fn output_batch_budget_both_triggers_still_recovery() {
989        let policy = FlowControlPolicy::default();
990        // Both fairness and latency in violation
991        let budget = policy.output_batch_budget(50, 0.3, 200.0);
992        assert_eq!(budget, policy.config.output_batch_recovery_bytes);
993    }
994
995    // ---- is_pressured (via evaluate) ----
996
997    #[test]
998    fn pressured_when_input_at_soft_cap() {
999        let policy = FlowControlPolicy::default();
1000        let mut snap = stable_snapshot();
1001        snap.queues.input = policy.config.input_soft_cap_bytes;
1002        let decision = policy.evaluate(snap);
1003        assert!(
1004            decision.chosen_action.is_some(),
1005            "should intervene when input queue at soft cap"
1006        );
1007    }
1008
1009    #[test]
1010    fn pressured_when_output_at_soft_cap() {
1011        let policy = FlowControlPolicy::default();
1012        let mut snap = stable_snapshot();
1013        snap.queues.output = policy.config.output_soft_cap_bytes;
1014        let decision = policy.evaluate(snap);
1015        assert!(
1016            decision.chosen_action.is_some(),
1017            "should intervene when output queue at soft cap"
1018        );
1019    }
1020
1021    #[test]
1022    fn pressured_when_input_rate_exceeds_service() {
1023        let policy = FlowControlPolicy::default();
1024        let mut snap = stable_snapshot();
1025        snap.rates.lambda_in = 100_000;
1026        snap.rates.mu_in = 50_000; // rho > 1
1027        let decision = policy.evaluate(snap);
1028        assert!(
1029            decision.chosen_action.is_some(),
1030            "should intervene when input arrival > service"
1031        );
1032    }
1033
1034    #[test]
1035    fn pressured_when_output_rate_exceeds_service() {
1036        let policy = FlowControlPolicy::default();
1037        let mut snap = stable_snapshot();
1038        snap.rates.lambda_out = 200_000;
1039        snap.rates.mu_out = 100_000; // rho > 1
1040        let decision = policy.evaluate(snap);
1041        assert!(
1042            decision.chosen_action.is_some(),
1043            "should intervene when output arrival > service"
1044        );
1045    }
1046
1047    #[test]
1048    fn pressured_when_latency_budget_breached() {
1049        let policy = FlowControlPolicy::default();
1050        let mut snap = stable_snapshot();
1051        snap.latency.key_p95_ms = policy.config.key_latency_budget_ms + 10.0;
1052        let decision = policy.evaluate(snap);
1053        assert!(
1054            decision.chosen_action.is_some(),
1055            "should intervene when latency exceeds budget"
1056        );
1057        assert_eq!(decision.reason, DecisionReason::ProtectKeyLatencyBudget);
1058    }
1059
1060    #[test]
1061    fn pressured_when_fairness_below_floor() {
1062        let policy = FlowControlPolicy::default();
1063        let mut snap = stable_snapshot();
1064        // Make fairness very low: one side gets almost everything
1065        snap.serviced_input_bytes = 1;
1066        snap.serviced_output_bytes = 1_000_000;
1067        assert!(snap.fairness_index() < policy.config.fairness_floor);
1068        let decision = policy.evaluate(snap);
1069        assert!(
1070            decision.chosen_action.is_some(),
1071            "should intervene when fairness below floor"
1072        );
1073        assert_eq!(decision.reason, DecisionReason::ProtectKeyLatencyBudget);
1074    }
1075
1076    // ---- reason codes ----
1077
1078    #[test]
1079    fn reason_stable_when_no_pressure() {
1080        let policy = FlowControlPolicy::default();
1081        let decision = policy.evaluate(stable_snapshot());
1082        assert_eq!(decision.reason, DecisionReason::Stable);
1083    }
1084
1085    #[test]
1086    fn reason_queue_pressure_without_latency_fairness_issue() {
1087        let policy = FlowControlPolicy::default();
1088        let mut snap = stable_snapshot();
1089        // Queue pressure without latency/fairness issues
1090        snap.queues.output = policy.config.output_soft_cap_bytes;
1091        let decision = policy.evaluate(snap);
1092        assert_eq!(decision.reason, DecisionReason::QueuePressure);
1093    }
1094
1095    #[test]
1096    fn reason_hard_cap_exceeded_overrides_everything() {
1097        let policy = FlowControlPolicy::default();
1098        let mut snap = stable_snapshot();
1099        snap.output_hard_cap_duration_ms = policy.config.hard_cap_terminate_ms;
1100        let decision = policy.evaluate(snap);
1101        assert_eq!(decision.reason, DecisionReason::HardCapExceeded);
1102        assert_eq!(
1103            decision.chosen_action,
1104            Some(BackpressureAction::TerminateSession)
1105        );
1106    }
1107
1108    // ---- should_pause_pty_reads ----
1109
1110    #[test]
1111    fn pause_pty_reads_at_output_hard_cap() {
1112        let policy = FlowControlPolicy::default();
1113        let mut snap = stable_snapshot();
1114        snap.queues.output = policy.config.output_hard_cap_bytes;
1115        let decision = policy.evaluate(snap);
1116        assert!(decision.should_pause_pty_reads);
1117    }
1118
1119    #[test]
1120    fn no_pause_pty_reads_below_hard_cap() {
1121        let policy = FlowControlPolicy::default();
1122        let mut snap = stable_snapshot();
1123        snap.queues.output = policy.config.output_hard_cap_bytes - 1;
1124        let decision = policy.evaluate(snap);
1125        assert!(!decision.should_pause_pty_reads);
1126    }
1127
1128    // ---- score_action properties ----
1129
1130    #[test]
1131    fn terminate_has_fixed_throughput_loss() {
1132        let policy = FlowControlPolicy::default();
1133        let signals = PressureSignals {
1134            oom_signal: 0.5,
1135            latency_signal: 0.5,
1136            throughput_signal: 0.5,
1137        };
1138        let loss = policy.score_action(BackpressureAction::TerminateSession, signals);
1139        assert_close(
1140            loss.throughput_loss,
1141            policy.config.terminate_throughput_loss,
1142            1e-9,
1143        );
1144        assert_close(loss.oom_risk, 0.0, 1e-9);
1145        assert_close(loss.latency_risk, 0.0, 1e-9);
1146    }
1147
1148    #[test]
1149    fn zero_pressure_yields_minimal_non_terminate_losses() {
1150        let policy = FlowControlPolicy::default();
1151        let signals = PressureSignals {
1152            oom_signal: 0.0,
1153            latency_signal: 0.0,
1154            throughput_signal: 0.0,
1155        };
1156        for action in [
1157            BackpressureAction::CoalesceNonInteractive,
1158            BackpressureAction::ThrottleOutput,
1159            BackpressureAction::DropNonInteractive,
1160        ] {
1161            let loss = policy.score_action(action, signals);
1162            assert_close(loss.oom_risk, 0.0, 1e-9);
1163            assert_close(loss.latency_risk, 0.0, 1e-9);
1164            // throughput_loss has a baseline > 0 for non-terminate actions
1165            assert!(loss.throughput_loss > 0.0);
1166        }
1167    }
1168
1169    #[test]
1170    fn coalesce_always_cheapest_under_zero_pressure() {
1171        let policy = FlowControlPolicy::default();
1172        let signals = PressureSignals {
1173            oom_signal: 0.0,
1174            latency_signal: 0.0,
1175            throughput_signal: 0.0,
1176        };
1177        let coalesce = policy.score_action(BackpressureAction::CoalesceNonInteractive, signals);
1178        let throttle = policy.score_action(BackpressureAction::ThrottleOutput, signals);
1179        let drop = policy.score_action(BackpressureAction::DropNonInteractive, signals);
1180        assert!(
1181            coalesce.expected_loss <= throttle.expected_loss,
1182            "coalesce should be <= throttle at zero pressure"
1183        );
1184        assert!(
1185            throttle.expected_loss <= drop.expected_loss,
1186            "throttle should be <= drop at zero pressure"
1187        );
1188    }
1189
1190    #[test]
1191    fn high_pressure_increases_oom_and_latency_risk() {
1192        let policy = FlowControlPolicy::default();
1193        let low = PressureSignals {
1194            oom_signal: 0.1,
1195            latency_signal: 0.1,
1196            throughput_signal: 0.1,
1197        };
1198        let high = PressureSignals {
1199            oom_signal: 0.9,
1200            latency_signal: 0.9,
1201            throughput_signal: 0.9,
1202        };
1203        let action = BackpressureAction::CoalesceNonInteractive;
1204        let loss_low = policy.score_action(action, low);
1205        let loss_high = policy.score_action(action, high);
1206        assert!(loss_high.oom_risk > loss_low.oom_risk);
1207        assert!(loss_high.latency_risk > loss_low.latency_risk);
1208        assert!(loss_high.throughput_loss > loss_low.throughput_loss);
1209    }
1210
1211    // ---- pressure_signals ----
1212
1213    #[test]
1214    fn pressure_signals_all_zero_when_stable() {
1215        let policy = FlowControlPolicy::default();
1216        let snap = stable_snapshot();
1217        let fi = snap.fairness_index();
1218        let signals = policy.pressure_signals(snap, fi);
1219        assert_close(signals.oom_signal, 0.0, 1e-9);
1220        assert_close(signals.latency_signal, 0.0, 1e-9);
1221        assert_close(signals.throughput_signal, 0.0, 1e-9);
1222    }
1223
1224    #[test]
1225    fn oom_signal_rises_with_queue_depth() {
1226        let policy = FlowControlPolicy::default();
1227        let mut snap = stable_snapshot();
1228        // Put output queue near hard cap (>70%)
1229        snap.queues.output = (policy.config.output_hard_cap_bytes as f64 * 0.9) as u32;
1230        let fi = snap.fairness_index();
1231        let signals = policy.pressure_signals(snap, fi);
1232        assert!(
1233            signals.oom_signal > 0.0,
1234            "oom_signal should rise at 90% of hard cap: got {}",
1235            signals.oom_signal
1236        );
1237    }
1238
1239    #[test]
1240    fn oom_signal_rises_with_utilization_above_one() {
1241        let policy = FlowControlPolicy::default();
1242        let mut snap = stable_snapshot();
1243        snap.rates.lambda_out = 200_000;
1244        snap.rates.mu_out = 100_000; // rho = 2.0
1245        let fi = snap.fairness_index();
1246        let signals = policy.pressure_signals(snap, fi);
1247        assert!(
1248            signals.oom_signal > 0.0,
1249            "oom_signal should rise when rho > 1: got {}",
1250            signals.oom_signal
1251        );
1252    }
1253
1254    #[test]
1255    fn latency_signal_rises_when_p95_exceeds_budget() {
1256        let policy = FlowControlPolicy::default();
1257        let mut snap = stable_snapshot();
1258        snap.latency.key_p95_ms = policy.config.key_latency_budget_ms * 2.0;
1259        let fi = snap.fairness_index();
1260        let signals = policy.pressure_signals(snap, fi);
1261        assert!(
1262            signals.latency_signal > 0.0,
1263            "latency_signal should rise when p95 > budget: got {}",
1264            signals.latency_signal
1265        );
1266    }
1267
1268    #[test]
1269    fn latency_signal_rises_with_fairness_shortfall() {
1270        let policy = FlowControlPolicy::default();
1271        let mut snap = stable_snapshot();
1272        snap.serviced_input_bytes = 1;
1273        snap.serviced_output_bytes = 1_000_000;
1274        let fi = snap.fairness_index();
1275        assert!(fi < policy.config.fairness_floor);
1276        let signals = policy.pressure_signals(snap, fi);
1277        assert!(
1278            signals.latency_signal > 0.0,
1279            "latency_signal should rise when fairness < floor: got {}",
1280            signals.latency_signal
1281        );
1282    }
1283
1284    #[test]
1285    fn throughput_signal_rises_with_output_utilization() {
1286        let policy = FlowControlPolicy::default();
1287        let mut snap = stable_snapshot();
1288        snap.rates.lambda_out = 200_000;
1289        snap.rates.mu_out = 100_000; // rho_out = 2.0
1290        let fi = snap.fairness_index();
1291        let signals = policy.pressure_signals(snap, fi);
1292        assert!(
1293            signals.throughput_signal > 0.0,
1294            "throughput_signal should rise when rho_out > 1: got {}",
1295            signals.throughput_signal
1296        );
1297    }
1298
1299    #[test]
1300    fn throughput_signal_rises_with_output_soft_ratio() {
1301        let policy = FlowControlPolicy::default();
1302        let mut snap = stable_snapshot();
1303        snap.queues.output = policy.config.output_soft_cap_bytes * 2;
1304        let fi = snap.fairness_index();
1305        let signals = policy.pressure_signals(snap, fi);
1306        assert!(
1307            signals.throughput_signal > 0.0,
1308            "throughput_signal should rise when output > soft cap: got {}",
1309            signals.throughput_signal
1310        );
1311    }
1312
1313    // ---- evaluate integration: losses array always has 4 entries ----
1314
1315    #[test]
1316    fn evaluate_losses_array_covers_all_actions() {
1317        let policy = FlowControlPolicy::default();
1318        let decision = policy.evaluate(stable_snapshot());
1319        assert_eq!(decision.losses.len(), 4);
1320        let actions: Vec<_> = decision.losses.iter().map(|l| l.action).collect();
1321        assert!(actions.contains(&BackpressureAction::CoalesceNonInteractive));
1322        assert!(actions.contains(&BackpressureAction::ThrottleOutput));
1323        assert!(actions.contains(&BackpressureAction::DropNonInteractive));
1324        assert!(actions.contains(&BackpressureAction::TerminateSession));
1325    }
1326
1327    // ---- evaluate integration: hard-cap just below threshold ----
1328
1329    #[test]
1330    fn hard_cap_just_below_threshold_does_not_terminate() {
1331        let policy = FlowControlPolicy::default();
1332        let mut snap = stable_snapshot();
1333        snap.output_hard_cap_duration_ms = policy.config.hard_cap_terminate_ms - 1;
1334        snap.queues.output = policy.config.output_hard_cap_bytes;
1335        let decision = policy.evaluate(snap);
1336        assert_ne!(decision.reason, DecisionReason::HardCapExceeded);
1337        assert_ne!(
1338            decision.chosen_action,
1339            Some(BackpressureAction::TerminateSession)
1340        );
1341    }
1342
1343    // ---- LossWeights and FlowControlConfig defaults ----
1344
1345    #[test]
1346    fn default_weights_hierarchy() {
1347        let w = LossWeights::default();
1348        // OOM >> latency >> throughput
1349        assert!(w.oom > w.latency);
1350        assert!(w.latency > w.throughput);
1351    }
1352
1353    #[test]
1354    fn default_config_cap_hierarchy() {
1355        let c = FlowControlConfig::default();
1356        assert!(c.input_soft_cap_bytes < c.input_hard_cap_bytes);
1357        assert!(c.output_soft_cap_bytes < c.output_hard_cap_bytes);
1358        assert!(c.output_batch_recovery_bytes < c.output_batch_with_input_bytes);
1359        assert!(c.output_batch_with_input_bytes < c.output_batch_idle_bytes);
1360    }
1361
1362    // ---- custom config propagation ----
1363
1364    #[test]
1365    fn custom_config_changes_behavior() {
1366        let config = FlowControlConfig {
1367            input_soft_cap_bytes: 100,
1368            input_hard_cap_bytes: 200,
1369            output_soft_cap_bytes: 100,
1370            output_hard_cap_bytes: 200,
1371            ..FlowControlConfig::default()
1372        };
1373        let policy = FlowControlPolicy::new(config);
1374        // Drop non-interactive at custom hard cap
1375        assert!(policy.should_drop_input_event(200, InputEventClass::NonInteractive));
1376        assert!(!policy.should_drop_input_event(199, InputEventClass::NonInteractive));
1377    }
1378}