Skip to main content

phantom_protocol/transport/
bandwidth_estimator.rs

1//! BBR-like Bandwidth Estimator
2//!
3//! Implements a simplified BBR (Bottleneck Bandwidth and Round-trip propagation time)
4//! state machine on top of KCP's ACK feedback.
5//!
6//! # BBR States
7//!
8//! ```text
9//!   ┌─────────┐        ┌──────────┐        ┌────────┐
10//!   │ Startup │───────▶│  Drain   │───────▶│ ProbeBW│
11//!   └─────────┘        └──────────┘        └────────┘
12//!                         ▲    │              │
13//!                         │    └──────────────┘
14//!                         │       ▲
15//!                      ┌──┴───────┴──┐
16//!                      │  ProbeRTT   │  (every 10s for 200ms)
17//!                      └─────────────┘
18//! ```
19//!
20//! - **Startup:** Double sending rate exponentially until bottleneck bandwidth is found
21//! - **Drain:** Reduce rate until inflight ≤ BDP (drain queues built during Startup)
22//! - **ProbeBW:** Cycle through pacing gains (1.25, 0.75, 1.0, 1.0) to probe bandwidth
23//! - **ProbeRTT:** Every 10s, reduce CWND to 4 packets for 200ms to measure true min RTT
24//!
25//! # Integration
26//!
27//! The estimator feeds the `Pacer` with target rates:
28//! ```text
29//!   BandwidthEstimator ──rate──▶ Pacer ──paced_send──▶ UdpTransport
30//! ```
31
32use std::collections::VecDeque;
33use std::time::{Duration, Instant};
34
35/// BBR state machine states
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum BbrState {
38    /// Exponentially probe for bandwidth
39    Startup,
40    /// Probe for more bandwidth (cycle through pacing gains)
41    ProbeBW,
42    /// Drain queues that built up during Startup
43    Drain,
44    /// Probe for shorter RTT (reduce CWND to 4 packets for 200ms)
45    ProbeRTT,
46    /// Explicit packet loss detected — reduce rate and CWND proportionally
47    FastRecovery,
48}
49
50/// A single delivery sample (attached to each ACKed packet)
51#[derive(Debug, Clone, Copy)]
52pub struct DeliverySample {
53    /// Bytes delivered at time of sending
54    pub delivered_bytes: u64,
55    /// Timestamp when packet was sent
56    pub sent_at: Instant,
57    /// Timestamp when ACK was received
58    pub acked_at: Instant,
59    /// Bytes in this packet
60    pub packet_bytes: u64,
61    /// Whether the sender was application-limited when this packet was sent
62    pub is_app_limited: bool,
63    /// ACK delay reported by the receiver (microseconds).
64    /// The receiver measures time between packet receipt and ACK send;
65    /// subtracting this from the observed RTT gives the propagation delay.
66    pub ack_delay_us: u64,
67}
68
69/// Sliding window to track min/max of a value
70#[derive(Debug)]
71struct WindowFilter {
72    window: VecDeque<(Instant, u64)>,
73    window_size: Duration,
74}
75
76impl WindowFilter {
77    fn new(window_size: Duration) -> Self {
78        Self {
79            window: VecDeque::new(),
80            window_size,
81        }
82    }
83
84    fn update_max(&mut self, now: Instant, value: u64) -> u64 {
85        // Remove expired entries
86        while let Some(&(ts, _)) = self.window.front() {
87            if now.duration_since(ts) > self.window_size {
88                self.window.pop_front();
89            } else {
90                break;
91            }
92        }
93        // Remove entries smaller than the new value (they're dominated)
94        while let Some(&(_, v)) = self.window.back() {
95            if v <= value {
96                self.window.pop_back();
97            } else {
98                break;
99            }
100        }
101        self.window.push_back((now, value));
102        // The maximum is always at the front
103        self.window.front().map(|&(_, v)| v).unwrap_or(value)
104    }
105
106    fn update_min(&mut self, now: Instant, value: u64) -> u64 {
107        while let Some(&(ts, _)) = self.window.front() {
108            if now.duration_since(ts) > self.window_size {
109                self.window.pop_front();
110            } else {
111                break;
112            }
113        }
114        while let Some(&(_, v)) = self.window.back() {
115            if v >= value {
116                self.window.pop_back();
117            } else {
118                break;
119            }
120        }
121        self.window.push_back((now, value));
122        self.window.front().map(|&(_, v)| v).unwrap_or(value)
123    }
124}
125
126// ─── Constants ──────────────────────────────────────────────────────────────
127
128/// Probe cycle gains for ProbeBW phase (BBR cycle: 1.25, 0.75, 1.0, 1.0)
129const PROBE_BW_GAINS: [f64; 4] = [1.25, 0.75, 1.0, 1.0];
130
131/// Startup growth threshold — if BW growth < 25%, consider pipe filled
132const STARTUP_GROWTH_THRESHOLD: f64 = 0.25;
133
134/// Rounds without growth before exiting Startup
135const STARTUP_ROUNDS_LIMIT: u32 = 3;
136
137/// ProbeRTT interval — enter ProbeRTT every 10 seconds
138const PROBE_RTT_INTERVAL: Duration = Duration::from_secs(10);
139
140/// ProbeRTT duration — stay in ProbeRTT for 200ms
141const PROBE_RTT_DURATION: Duration = Duration::from_millis(200);
142
143/// Minimum CWND in ProbeRTT mode (4 packets)
144const PROBE_RTT_CWND_PACKETS: u64 = 4;
145
146/// Minimum packet size assumption (bytes)
147const MIN_PACKET_SIZE: u64 = 1400;
148
149/// FastRecovery: pacing gain during loss recovery (BBRv3: backs off to 0.5)
150const FAST_RECOVERY_PACING_GAIN: f64 = 0.5;
151
152/// FastRecovery: exit when inflight < BDP * this fraction  
153const FAST_RECOVERY_EXIT_FRACTION: f64 = 1.0;
154
155// ─── Estimator ──────────────────────────────────────────────────────────────
156
157/// BBR-like Bandwidth Estimator
158pub struct BandwidthEstimator {
159    /// Current BBR state
160    state: BbrState,
161    /// Estimated bottleneck bandwidth (bytes/sec)
162    btl_bw: u64,
163    /// Minimum observed RTT
164    min_rtt: Duration,
165    /// Sliding window for max bandwidth (10 round-trips)
166    bw_filter: WindowFilter,
167    /// Sliding window for min RTT (10 seconds)
168    rtt_filter: WindowFilter,
169    /// Total bytes delivered (monotonically increasing)
170    delivered_bytes: u64,
171    /// Timestamp of last delivery
172    last_delivery: Instant,
173    /// Pacing gain multiplier (1.0 = 100%, 1.25 = probe, 0.75 = drain)
174    pacing_gain: f64,
175    /// CWND gain multiplier
176    cwnd_gain: f64,
177    /// Round counter (ticks on each ACK in Startup, cycles in ProbeBW)
178    round_count: u32,
179    /// Whether we've found the bottleneck bandwidth
180    filled_pipe: bool,
181    /// Previous bandwidth sample for startup exit decision
182    prev_bw: u64,
183    /// Number of rounds with insufficient BW increase (startup exit condition)
184    rounds_without_growth: u32,
185
186    // ── Inflight tracking ──
187    /// Current bytes in flight
188    inflight_bytes: u64,
189
190    // ── ProbeRTT timer ──
191    /// Timestamp of last ProbeRTT exit (or Startup start)
192    last_probe_rtt_time: Instant,
193    /// When we entered ProbeRTT (for duration tracking)
194    probe_rtt_entered: Option<Instant>,
195    /// State to return to after ProbeRTT
196    prior_state: BbrState,
197
198    // ── App-limited detection ──
199    /// Whether the sender is currently application-limited
200    app_limited: bool,
201    /// Delivered bytes at the time app-limited was last set
202    app_limited_at_delivered: u64,
203
204    // ── FastRecovery ──
205    /// When we entered FastRecovery (for duration-based exit)
206    fast_recovery_entered: Option<Instant>,
207    /// Total bytes lost during this recovery window
208    recovery_lost_bytes: u64,
209}
210
211impl BandwidthEstimator {
212    /// Create a new estimator starting in Startup state.
213    pub fn new() -> Self {
214        let now = Instant::now();
215        Self {
216            state: BbrState::Startup,
217            btl_bw: 0,
218            min_rtt: Duration::from_millis(100), // Conservative initial RTT
219            bw_filter: WindowFilter::new(Duration::from_secs(10)),
220            rtt_filter: WindowFilter::new(Duration::from_secs(10)),
221            delivered_bytes: 0,
222            last_delivery: now,
223            pacing_gain: 2.0, // Startup: double the rate
224            cwnd_gain: 2.0,
225            round_count: 0,
226            filled_pipe: false,
227            prev_bw: 0,
228            rounds_without_growth: 0,
229            inflight_bytes: 0,
230            last_probe_rtt_time: now,
231            probe_rtt_entered: None,
232            prior_state: BbrState::ProbeBW,
233            app_limited: false,
234            app_limited_at_delivered: 0,
235            fast_recovery_entered: None,
236            recovery_lost_bytes: 0,
237        }
238    }
239
240    // ── Public API ──────────────────────────────────────────────────────────
241
242    /// Notify the estimator that `bytes` were sent (increases inflight).
243    pub fn on_send(&mut self, bytes: u64) {
244        self.inflight_bytes = self.inflight_bytes.saturating_add(bytes);
245    }
246
247    /// Process an ACK and update bandwidth estimates.
248    ///
249    /// Returns the new recommended pacing rate (bytes/sec).
250    pub fn on_ack(&mut self, sample: DeliverySample) -> u64 {
251        let now = sample.acked_at;
252
253        // Update inflight tracking
254        self.inflight_bytes = self.inflight_bytes.saturating_sub(sample.packet_bytes);
255
256        // Update delivered bytes counter
257        self.delivered_bytes += sample.packet_bytes;
258        self.last_delivery = now;
259
260        // Calculate delivery rate for this sample.
261        // `send_elapsed` is the full observed RTT. We subtract the receiver's ack_delay
262        // to get the true propagation delay (RTprop), matching QUIC RFC 9002 §5.3.
263        let send_elapsed = sample.acked_at.duration_since(sample.sent_at);
264        let ack_delay = Duration::from_micros(sample.ack_delay_us);
265        let rtt_propagation = send_elapsed.saturating_sub(ack_delay);
266
267        // Update min RTT using the propagation delay (RTprop)
268        let rtt_us = rtt_propagation.as_micros() as u64;
269        if rtt_us > 0 {
270            let min_rtt_us = self.rtt_filter.update_min(now, rtt_us);
271            self.min_rtt = Duration::from_micros(min_rtt_us);
272        }
273
274        // Calculate bandwidth: delivered_bytes / time
275        let delivery_rate = if !send_elapsed.is_zero() {
276            (sample.packet_bytes as f64 / send_elapsed.as_secs_f64()) as u64
277        } else {
278            0
279        };
280
281        // App-limited filtering: only update BW filter with non-app-limited samples.
282        // App-limited samples underestimate the true available bandwidth because
283        // the sender wasn't sending at line rate.
284        if delivery_rate > 0 && !sample.is_app_limited {
285            self.btl_bw = self.bw_filter.update_max(now, delivery_rate);
286        }
287
288        // Check if we've exited app-limited phase
289        if self.app_limited && self.delivered_bytes > self.app_limited_at_delivered {
290            self.app_limited = false;
291        }
292
293        // Run state machine
294        self.update_state(now);
295
296        // Return pacing rate
297        self.pacing_rate()
298    }
299
300    /// Notify a packet loss — triggers BBRv3 Fast Recovery.
301    ///
302    /// Unlike earlier BBR versions that ignored loss, BBRv3 immediately
303    /// backs off pacing rate and CWND relative to the lost bytes fraction.
304    pub fn on_loss(&mut self, bytes: u64) {
305        self.inflight_bytes = self.inflight_bytes.saturating_sub(bytes);
306        self.recovery_lost_bytes = self.recovery_lost_bytes.saturating_add(bytes);
307
308        // Only enter FastRecovery if not already in it or ProbeRTT
309        if self.state != BbrState::FastRecovery && self.state != BbrState::ProbeRTT {
310            self.prior_state = self.state;
311            self.fast_recovery_entered = Some(Instant::now());
312            self.transition_to(BbrState::FastRecovery);
313        }
314    }
315
316    /// Mark the sender as application-limited (not sending at line rate).
317    ///
318    /// Call this when there is no data to send but the CWND has room.
319    /// Samples produced during app-limited periods won't update the BW filter,
320    /// preventing bandwidth underestimation.
321    pub fn set_app_limited(&mut self) {
322        self.app_limited = true;
323        self.app_limited_at_delivered = self.delivered_bytes;
324    }
325
326    /// Whether the sender is currently considered application-limited.
327    pub fn is_app_limited(&self) -> bool {
328        self.app_limited
329    }
330
331    /// Get current recommended pacing rate (bytes/sec).
332    pub fn pacing_rate(&self) -> u64 {
333        let base = self.btl_bw.max(1);
334        (base as f64 * self.pacing_gain) as u64
335    }
336
337    /// Get recommended congestion window size (bytes).
338    pub fn cwnd(&self) -> u64 {
339        if self.state == BbrState::ProbeRTT {
340            // During ProbeRTT, reduce CWND to minimum
341            return PROBE_RTT_CWND_PACKETS * MIN_PACKET_SIZE;
342        }
343        let bdp = self.bdp();
344        (bdp as f64 * self.cwnd_gain).max((PROBE_RTT_CWND_PACKETS * MIN_PACKET_SIZE) as f64) as u64
345    }
346
347    /// Get the Bandwidth-Delay Product (BDP) in bytes.
348    pub fn bdp(&self) -> u64 {
349        (self.btl_bw as f64 * self.min_rtt.as_secs_f64()) as u64
350    }
351
352    /// Get current bytes in flight.
353    pub fn inflight_bytes(&self) -> u64 {
354        self.inflight_bytes
355    }
356
357    /// Get estimated bottleneck bandwidth (bytes/sec).
358    pub fn bottleneck_bandwidth(&self) -> u64 {
359        self.btl_bw
360    }
361
362    /// Get minimum observed RTT.
363    pub fn min_rtt(&self) -> Duration {
364        self.min_rtt
365    }
366
367    /// Get current BBR state.
368    pub fn state(&self) -> BbrState {
369        self.state
370    }
371
372    /// Get total bytes delivered.
373    pub fn delivered_bytes(&self) -> u64 {
374        self.delivered_bytes
375    }
376
377    /// Get the round count.
378    pub fn round_count(&self) -> u32 {
379        self.round_count
380    }
381
382    // ── State Machine ───────────────────────────────────────────────────────
383
384    /// Run BBR state machine transitions.
385    fn update_state(&mut self, now: Instant) {
386        // ── ProbeRTT check: global timer, any state can enter except Startup and FastRecovery ──
387        if self.state != BbrState::ProbeRTT
388            && self.state != BbrState::Startup
389            && self.state != BbrState::FastRecovery
390            && now.duration_since(self.last_probe_rtt_time) >= PROBE_RTT_INTERVAL
391        {
392            self.prior_state = self.state;
393            self.transition_to(BbrState::ProbeRTT);
394            self.probe_rtt_entered = Some(now);
395            return;
396        }
397
398        match self.state {
399            BbrState::Startup => {
400                self.round_count += 1;
401
402                // Check if pipe is filled (BW growth < threshold)
403                if self.prev_bw > 0 {
404                    let growth = (self.btl_bw as f64 - self.prev_bw as f64) / self.prev_bw as f64;
405
406                    if growth < STARTUP_GROWTH_THRESHOLD {
407                        self.rounds_without_growth += 1;
408                    } else {
409                        self.rounds_without_growth = 0;
410                    }
411
412                    if self.rounds_without_growth >= STARTUP_ROUNDS_LIMIT {
413                        self.filled_pipe = true;
414                        self.transition_to(BbrState::Drain);
415                    }
416                }
417                self.prev_bw = self.btl_bw;
418            }
419            BbrState::Drain => {
420                // Stay in Drain until inflight ≤ BDP
421                let bdp = self.bdp();
422                if self.inflight_bytes <= bdp || bdp == 0 {
423                    self.transition_to(BbrState::ProbeBW);
424                }
425            }
426            BbrState::ProbeBW => {
427                // Cycle through pacing gains
428                let cycle_idx = (self.round_count as usize) % PROBE_BW_GAINS.len();
429                self.pacing_gain = PROBE_BW_GAINS[cycle_idx];
430                self.cwnd_gain = 2.0;
431                self.round_count += 1;
432            }
433            BbrState::ProbeRTT => {
434                // Stay for PROBE_RTT_DURATION, then exit
435                if let Some(entered) = self.probe_rtt_entered {
436                    if now.duration_since(entered) >= PROBE_RTT_DURATION {
437                        self.last_probe_rtt_time = now;
438                        self.probe_rtt_entered = None;
439                        self.transition_to(self.prior_state);
440                    }
441                } else {
442                    // Safety: shouldn't happen, but exit gracefully
443                    self.transition_to(BbrState::ProbeBW);
444                }
445            }
446            BbrState::FastRecovery => {
447                // Exit FastRecovery when inflight drops below BDP (pipe drained)
448                // or after one min_rtt has elapsed with no new losses
449                let bdp = self.bdp();
450                let should_exit = self.inflight_bytes
451                    <= (bdp as f64 * FAST_RECOVERY_EXIT_FRACTION) as u64
452                    || bdp == 0;
453
454                if should_exit {
455                    self.recovery_lost_bytes = 0;
456                    self.fast_recovery_entered = None;
457                    self.transition_to(self.prior_state);
458                }
459            }
460        }
461    }
462
463    /// Transition to a new BBR state.
464    fn transition_to(&mut self, new_state: BbrState) {
465        match new_state {
466            BbrState::Startup => {
467                self.pacing_gain = 2.0;
468                self.cwnd_gain = 2.0;
469            }
470            BbrState::Drain => {
471                self.pacing_gain = 0.75;
472                self.cwnd_gain = 2.0;
473            }
474            BbrState::ProbeBW => {
475                self.pacing_gain = 1.0;
476                self.cwnd_gain = 2.0;
477            }
478            BbrState::ProbeRTT => {
479                self.pacing_gain = 1.0;
480                self.cwnd_gain = 1.0;
481            }
482            BbrState::FastRecovery => {
483                // BBRv3 recovery: drop pacing to 50% of bottleneck bandwidth,
484                // and tighten CWND to 1x BDP instead of 2x (no inflating).
485                self.pacing_gain = FAST_RECOVERY_PACING_GAIN;
486                self.cwnd_gain = 1.0;
487            }
488        }
489        self.state = new_state;
490    }
491}
492
493impl Default for BandwidthEstimator {
494    fn default() -> Self {
495        Self::new()
496    }
497}
498
499impl std::fmt::Debug for BandwidthEstimator {
500    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
501        f.debug_struct("BandwidthEstimator")
502            .field("state", &self.state)
503            .field("btl_bw_kbps", &(self.btl_bw / 1024))
504            .field("min_rtt_ms", &self.min_rtt.as_millis())
505            .field("pacing_gain", &self.pacing_gain)
506            .field("inflight_bytes", &self.inflight_bytes)
507            .field("delivered_bytes", &self.delivered_bytes)
508            .field("app_limited", &self.app_limited)
509            .finish()
510    }
511}
512
513#[cfg(test)]
514mod tests {
515    use super::*;
516
517    fn make_sample(sent_at: Instant, rtt_ms: u64, packet_bytes: u64) -> DeliverySample {
518        DeliverySample {
519            delivered_bytes: 0,
520            sent_at,
521            acked_at: sent_at + Duration::from_millis(rtt_ms),
522            packet_bytes,
523            is_app_limited: false,
524            ack_delay_us: 0, // No ACK delay in tests
525        }
526    }
527
528    fn make_app_limited_sample(sent_at: Instant, rtt_ms: u64, packet_bytes: u64) -> DeliverySample {
529        DeliverySample {
530            delivered_bytes: 0,
531            sent_at,
532            acked_at: sent_at + Duration::from_millis(rtt_ms),
533            packet_bytes,
534            is_app_limited: true,
535            ack_delay_us: 0,
536        }
537    }
538
539    #[test]
540    fn test_estimator_starts_in_startup() {
541        let est = BandwidthEstimator::new();
542        assert_eq!(est.state(), BbrState::Startup);
543        assert_eq!(est.delivered_bytes(), 0);
544        assert_eq!(est.inflight_bytes(), 0);
545        assert!(!est.is_app_limited());
546    }
547
548    #[test]
549    fn test_bandwidth_increases_with_acks() {
550        let mut est = BandwidthEstimator::new();
551        let now = Instant::now();
552
553        // Simulate several ACKs at 10ms RTT, 1400 byte packets
554        for i in 0..10 {
555            let sent = now + Duration::from_millis(i * 10);
556            est.on_send(1400);
557            let sample = make_sample(sent, 10, 1400);
558            est.on_ack(sample);
559        }
560
561        // Should have positive bandwidth estimate
562        assert!(
563            est.bottleneck_bandwidth() > 0,
564            "btl_bw = {} should be > 0",
565            est.bottleneck_bandwidth()
566        );
567        assert_eq!(est.delivered_bytes(), 14_000);
568    }
569
570    #[test]
571    fn test_min_rtt_tracking() {
572        let mut est = BandwidthEstimator::new();
573        let now = Instant::now();
574
575        // RTT high first, then low
576        let s1 = make_sample(now, 100, 1400);
577        est.on_ack(s1);
578        assert!(est.min_rtt() <= Duration::from_millis(101));
579
580        let s2 = make_sample(now + Duration::from_millis(200), 5, 1400);
581        est.on_ack(s2);
582        assert!(
583            est.min_rtt() <= Duration::from_millis(6),
584            "min_rtt = {:?}",
585            est.min_rtt()
586        );
587    }
588
589    #[test]
590    fn test_pacing_rate_positive() {
591        let mut est = BandwidthEstimator::new();
592        let now = Instant::now();
593
594        let sample = make_sample(now, 20, 1400);
595        est.on_ack(sample);
596
597        // Pacing rate should be positive
598        assert!(est.pacing_rate() > 0);
599    }
600
601    #[test]
602    fn test_cwnd_at_least_minimum() {
603        let est = BandwidthEstimator::new();
604        // Even with zero BW, CWND should have a minimum floor
605        let cwnd = est.cwnd();
606        assert!(
607            cwnd >= 4 * 1400,
608            "cwnd = {} should be >= {}",
609            cwnd,
610            4 * 1400
611        );
612    }
613
614    #[test]
615    fn test_startup_to_drain_transition() {
616        let mut est = BandwidthEstimator::new();
617        let now = Instant::now();
618
619        // Send many ACKs with constant bandwidth to trigger pipe-filled detection
620        for i in 0..20 {
621            let sent = now + Duration::from_millis(i * 10);
622            est.on_send(1400);
623            let sample = make_sample(sent, 10, 1400);
624            est.on_ack(sample);
625        }
626
627        // After enough rounds with no BW growth, should exit startup
628        assert!(
629            est.state() != BbrState::Startup || est.round_count < 20,
630            "expected startup exit, state = {:?}, rounds = {}",
631            est.state(),
632            est.round_count
633        );
634    }
635
636    // ── New tests for Phase 5 improvements ──
637
638    #[test]
639    fn test_inflight_tracking() {
640        let mut est = BandwidthEstimator::new();
641
642        // Send 3 packets
643        est.on_send(1400);
644        est.on_send(1400);
645        est.on_send(1400);
646        assert_eq!(est.inflight_bytes(), 4200);
647
648        // ACK 1
649        let now = Instant::now();
650        est.on_ack(make_sample(now, 10, 1400));
651        assert_eq!(est.inflight_bytes(), 2800);
652
653        // Loss 1
654        est.on_loss(1400);
655        assert_eq!(est.inflight_bytes(), 1400);
656
657        // ACK last
658        est.on_ack(make_sample(now + Duration::from_millis(10), 10, 1400));
659        assert_eq!(est.inflight_bytes(), 0);
660    }
661
662    #[test]
663    fn test_inflight_cant_go_negative() {
664        let mut est = BandwidthEstimator::new();
665        est.on_loss(5000);
666        assert_eq!(est.inflight_bytes(), 0); // saturating_sub
667    }
668
669    #[test]
670    fn test_app_limited_filtering() {
671        let mut est = BandwidthEstimator::new();
672        let now = Instant::now();
673
674        // Feed real bandwidth samples first (1Mbps)
675        for i in 0..5 {
676            let sent = now + Duration::from_millis(i * 10);
677            est.on_send(1400);
678            est.on_ack(make_sample(sent, 10, 1400));
679        }
680        let real_bw = est.bottleneck_bandwidth();
681        assert!(real_bw > 0);
682
683        // Now feed app-limited samples with very low bandwidth
684        // These should NOT reduce the BW estimate
685        est.set_app_limited();
686        assert!(est.is_app_limited());
687
688        for i in 5..10 {
689            let sent = now + Duration::from_millis(i * 1000);
690            est.on_ack(make_app_limited_sample(sent, 1000, 100)); // very slow
691        }
692
693        // BW should NOT have decreased
694        assert!(
695            est.bottleneck_bandwidth() >= real_bw,
696            "BW should not decrease from app-limited samples: {} < {}",
697            est.bottleneck_bandwidth(),
698            real_bw
699        );
700    }
701
702    #[test]
703    fn test_drain_waits_for_bdp() {
704        let mut est = BandwidthEstimator::new();
705        let now = Instant::now();
706
707        // Drive into Drain state
708        for i in 0..20 {
709            let sent = now + Duration::from_millis(i * 10);
710            est.on_send(1400);
711            est.on_ack(make_sample(sent, 10, 1400));
712        }
713
714        // Artificially set high inflight
715        if est.state() == BbrState::Drain {
716            // Add lots of inflight
717            est.inflight_bytes = est.bdp() * 3;
718            let sent = now + Duration::from_millis(300);
719            est.on_ack(make_sample(sent, 10, 1400));
720            // Should still be in Drain (inflight > BDP)
721            if est.inflight_bytes > est.bdp() {
722                assert_eq!(
723                    est.state(),
724                    BbrState::Drain,
725                    "should stay in Drain while inflight ({}) > BDP ({})",
726                    est.inflight_bytes,
727                    est.bdp()
728                );
729            }
730        }
731    }
732
733    #[test]
734    fn test_bdp_calculation() {
735        let mut est = BandwidthEstimator::new();
736        let now = Instant::now();
737
738        // Feed samples: 1400 bytes / 10ms = 140,000 bytes/sec
739        for i in 0..5 {
740            let sent = now + Duration::from_millis(i * 10);
741            est.on_send(1400);
742            est.on_ack(make_sample(sent, 10, 1400));
743        }
744
745        let bdp = est.bdp();
746        // BDP = btl_bw * min_rtt
747        // btl_bw ≈ 140,000 B/s, min_rtt ≈ 10ms
748        // BDP ≈ 140,000 * 0.01 = 1,400 bytes
749        assert!(bdp > 0, "BDP should be positive, got {}", bdp);
750    }
751
752    #[test]
753    fn test_cwnd_minimum_in_probe_rtt() {
754        let mut est = BandwidthEstimator::new();
755        // Force ProbeRTT state
756        est.state = BbrState::ProbeRTT;
757        let cwnd = est.cwnd();
758        assert_eq!(
759            cwnd,
760            PROBE_RTT_CWND_PACKETS * MIN_PACKET_SIZE,
761            "ProbeRTT CWND should be {} (4 packets), got {}",
762            PROBE_RTT_CWND_PACKETS * MIN_PACKET_SIZE,
763            cwnd
764        );
765    }
766}