1use std::collections::VecDeque;
33use std::time::{Duration, Instant};
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum BbrState {
38 Startup,
40 ProbeBW,
42 Drain,
44 ProbeRTT,
46 FastRecovery,
48}
49
50#[derive(Debug, Clone, Copy)]
52pub struct DeliverySample {
53 pub delivered_bytes: u64,
55 pub sent_at: Instant,
57 pub acked_at: Instant,
59 pub packet_bytes: u64,
61 pub is_app_limited: bool,
63 pub ack_delay_us: u64,
67}
68
69#[derive(Debug)]
71struct WindowFilter {
72 window: VecDeque<(Instant, u64)>,
73 window_size: Duration,
74}
75
76impl WindowFilter {
77 fn new(window_size: Duration) -> Self {
78 Self {
79 window: VecDeque::new(),
80 window_size,
81 }
82 }
83
84 fn update_max(&mut self, now: Instant, value: u64) -> u64 {
85 while let Some(&(ts, _)) = self.window.front() {
87 if now.duration_since(ts) > self.window_size {
88 self.window.pop_front();
89 } else {
90 break;
91 }
92 }
93 while let Some(&(_, v)) = self.window.back() {
95 if v <= value {
96 self.window.pop_back();
97 } else {
98 break;
99 }
100 }
101 self.window.push_back((now, value));
102 self.window.front().map(|&(_, v)| v).unwrap_or(value)
104 }
105
106 fn update_min(&mut self, now: Instant, value: u64) -> u64 {
107 while let Some(&(ts, _)) = self.window.front() {
108 if now.duration_since(ts) > self.window_size {
109 self.window.pop_front();
110 } else {
111 break;
112 }
113 }
114 while let Some(&(_, v)) = self.window.back() {
115 if v >= value {
116 self.window.pop_back();
117 } else {
118 break;
119 }
120 }
121 self.window.push_back((now, value));
122 self.window.front().map(|&(_, v)| v).unwrap_or(value)
123 }
124}
125
126const PROBE_BW_GAINS: [f64; 4] = [1.25, 0.75, 1.0, 1.0];
130
131const STARTUP_GROWTH_THRESHOLD: f64 = 0.25;
133
134const STARTUP_ROUNDS_LIMIT: u32 = 3;
136
137const PROBE_RTT_INTERVAL: Duration = Duration::from_secs(10);
139
140const PROBE_RTT_DURATION: Duration = Duration::from_millis(200);
142
143const PROBE_RTT_CWND_PACKETS: u64 = 4;
145
146const MIN_PACKET_SIZE: u64 = 1400;
148
149const FAST_RECOVERY_PACING_GAIN: f64 = 0.5;
151
152const FAST_RECOVERY_EXIT_FRACTION: f64 = 1.0;
154
155pub struct BandwidthEstimator {
159 state: BbrState,
161 btl_bw: u64,
163 min_rtt: Duration,
165 bw_filter: WindowFilter,
167 rtt_filter: WindowFilter,
169 delivered_bytes: u64,
171 last_delivery: Instant,
173 pacing_gain: f64,
175 cwnd_gain: f64,
177 round_count: u32,
179 filled_pipe: bool,
181 prev_bw: u64,
183 rounds_without_growth: u32,
185
186 inflight_bytes: u64,
189
190 last_probe_rtt_time: Instant,
193 probe_rtt_entered: Option<Instant>,
195 prior_state: BbrState,
197
198 app_limited: bool,
201 app_limited_at_delivered: u64,
203
204 fast_recovery_entered: Option<Instant>,
207 recovery_lost_bytes: u64,
209}
210
211impl BandwidthEstimator {
212 pub fn new() -> Self {
214 let now = Instant::now();
215 Self {
216 state: BbrState::Startup,
217 btl_bw: 0,
218 min_rtt: Duration::from_millis(100), bw_filter: WindowFilter::new(Duration::from_secs(10)),
220 rtt_filter: WindowFilter::new(Duration::from_secs(10)),
221 delivered_bytes: 0,
222 last_delivery: now,
223 pacing_gain: 2.0, cwnd_gain: 2.0,
225 round_count: 0,
226 filled_pipe: false,
227 prev_bw: 0,
228 rounds_without_growth: 0,
229 inflight_bytes: 0,
230 last_probe_rtt_time: now,
231 probe_rtt_entered: None,
232 prior_state: BbrState::ProbeBW,
233 app_limited: false,
234 app_limited_at_delivered: 0,
235 fast_recovery_entered: None,
236 recovery_lost_bytes: 0,
237 }
238 }
239
240 pub fn on_send(&mut self, bytes: u64) {
244 self.inflight_bytes = self.inflight_bytes.saturating_add(bytes);
245 }
246
247 pub fn on_ack(&mut self, sample: DeliverySample) -> u64 {
251 let now = sample.acked_at;
252
253 self.inflight_bytes = self.inflight_bytes.saturating_sub(sample.packet_bytes);
255
256 self.delivered_bytes += sample.packet_bytes;
258 self.last_delivery = now;
259
260 let send_elapsed = sample.acked_at.duration_since(sample.sent_at);
264 let ack_delay = Duration::from_micros(sample.ack_delay_us);
265 let rtt_propagation = send_elapsed.saturating_sub(ack_delay);
266
267 let rtt_us = rtt_propagation.as_micros() as u64;
269 if rtt_us > 0 {
270 let min_rtt_us = self.rtt_filter.update_min(now, rtt_us);
271 self.min_rtt = Duration::from_micros(min_rtt_us);
272 }
273
274 let delivery_rate = if !send_elapsed.is_zero() {
276 (sample.packet_bytes as f64 / send_elapsed.as_secs_f64()) as u64
277 } else {
278 0
279 };
280
281 if delivery_rate > 0 && !sample.is_app_limited {
285 self.btl_bw = self.bw_filter.update_max(now, delivery_rate);
286 }
287
288 if self.app_limited && self.delivered_bytes > self.app_limited_at_delivered {
290 self.app_limited = false;
291 }
292
293 self.update_state(now);
295
296 self.pacing_rate()
298 }
299
300 pub fn on_loss(&mut self, bytes: u64) {
305 self.inflight_bytes = self.inflight_bytes.saturating_sub(bytes);
306 self.recovery_lost_bytes = self.recovery_lost_bytes.saturating_add(bytes);
307
308 if self.state != BbrState::FastRecovery && self.state != BbrState::ProbeRTT {
310 self.prior_state = self.state;
311 self.fast_recovery_entered = Some(Instant::now());
312 self.transition_to(BbrState::FastRecovery);
313 }
314 }
315
316 pub fn set_app_limited(&mut self) {
322 self.app_limited = true;
323 self.app_limited_at_delivered = self.delivered_bytes;
324 }
325
326 pub fn is_app_limited(&self) -> bool {
328 self.app_limited
329 }
330
331 pub fn pacing_rate(&self) -> u64 {
333 let base = self.btl_bw.max(1);
334 (base as f64 * self.pacing_gain) as u64
335 }
336
337 pub fn cwnd(&self) -> u64 {
339 if self.state == BbrState::ProbeRTT {
340 return PROBE_RTT_CWND_PACKETS * MIN_PACKET_SIZE;
342 }
343 let bdp = self.bdp();
344 (bdp as f64 * self.cwnd_gain).max((PROBE_RTT_CWND_PACKETS * MIN_PACKET_SIZE) as f64) as u64
345 }
346
347 pub fn bdp(&self) -> u64 {
349 (self.btl_bw as f64 * self.min_rtt.as_secs_f64()) as u64
350 }
351
352 pub fn inflight_bytes(&self) -> u64 {
354 self.inflight_bytes
355 }
356
357 pub fn bottleneck_bandwidth(&self) -> u64 {
359 self.btl_bw
360 }
361
362 pub fn min_rtt(&self) -> Duration {
364 self.min_rtt
365 }
366
367 pub fn state(&self) -> BbrState {
369 self.state
370 }
371
372 pub fn delivered_bytes(&self) -> u64 {
374 self.delivered_bytes
375 }
376
377 pub fn round_count(&self) -> u32 {
379 self.round_count
380 }
381
382 fn update_state(&mut self, now: Instant) {
386 if self.state != BbrState::ProbeRTT
388 && self.state != BbrState::Startup
389 && self.state != BbrState::FastRecovery
390 && now.duration_since(self.last_probe_rtt_time) >= PROBE_RTT_INTERVAL
391 {
392 self.prior_state = self.state;
393 self.transition_to(BbrState::ProbeRTT);
394 self.probe_rtt_entered = Some(now);
395 return;
396 }
397
398 match self.state {
399 BbrState::Startup => {
400 self.round_count += 1;
401
402 if self.prev_bw > 0 {
404 let growth = (self.btl_bw as f64 - self.prev_bw as f64) / self.prev_bw as f64;
405
406 if growth < STARTUP_GROWTH_THRESHOLD {
407 self.rounds_without_growth += 1;
408 } else {
409 self.rounds_without_growth = 0;
410 }
411
412 if self.rounds_without_growth >= STARTUP_ROUNDS_LIMIT {
413 self.filled_pipe = true;
414 self.transition_to(BbrState::Drain);
415 }
416 }
417 self.prev_bw = self.btl_bw;
418 }
419 BbrState::Drain => {
420 let bdp = self.bdp();
422 if self.inflight_bytes <= bdp || bdp == 0 {
423 self.transition_to(BbrState::ProbeBW);
424 }
425 }
426 BbrState::ProbeBW => {
427 let cycle_idx = (self.round_count as usize) % PROBE_BW_GAINS.len();
429 self.pacing_gain = PROBE_BW_GAINS[cycle_idx];
430 self.cwnd_gain = 2.0;
431 self.round_count += 1;
432 }
433 BbrState::ProbeRTT => {
434 if let Some(entered) = self.probe_rtt_entered {
436 if now.duration_since(entered) >= PROBE_RTT_DURATION {
437 self.last_probe_rtt_time = now;
438 self.probe_rtt_entered = None;
439 self.transition_to(self.prior_state);
440 }
441 } else {
442 self.transition_to(BbrState::ProbeBW);
444 }
445 }
446 BbrState::FastRecovery => {
447 let bdp = self.bdp();
450 let should_exit = self.inflight_bytes
451 <= (bdp as f64 * FAST_RECOVERY_EXIT_FRACTION) as u64
452 || bdp == 0;
453
454 if should_exit {
455 self.recovery_lost_bytes = 0;
456 self.fast_recovery_entered = None;
457 self.transition_to(self.prior_state);
458 }
459 }
460 }
461 }
462
463 fn transition_to(&mut self, new_state: BbrState) {
465 match new_state {
466 BbrState::Startup => {
467 self.pacing_gain = 2.0;
468 self.cwnd_gain = 2.0;
469 }
470 BbrState::Drain => {
471 self.pacing_gain = 0.75;
472 self.cwnd_gain = 2.0;
473 }
474 BbrState::ProbeBW => {
475 self.pacing_gain = 1.0;
476 self.cwnd_gain = 2.0;
477 }
478 BbrState::ProbeRTT => {
479 self.pacing_gain = 1.0;
480 self.cwnd_gain = 1.0;
481 }
482 BbrState::FastRecovery => {
483 self.pacing_gain = FAST_RECOVERY_PACING_GAIN;
486 self.cwnd_gain = 1.0;
487 }
488 }
489 self.state = new_state;
490 }
491}
492
493impl Default for BandwidthEstimator {
494 fn default() -> Self {
495 Self::new()
496 }
497}
498
499impl std::fmt::Debug for BandwidthEstimator {
500 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
501 f.debug_struct("BandwidthEstimator")
502 .field("state", &self.state)
503 .field("btl_bw_kbps", &(self.btl_bw / 1024))
504 .field("min_rtt_ms", &self.min_rtt.as_millis())
505 .field("pacing_gain", &self.pacing_gain)
506 .field("inflight_bytes", &self.inflight_bytes)
507 .field("delivered_bytes", &self.delivered_bytes)
508 .field("app_limited", &self.app_limited)
509 .finish()
510 }
511}
512
513#[cfg(test)]
514mod tests {
515 use super::*;
516
517 fn make_sample(sent_at: Instant, rtt_ms: u64, packet_bytes: u64) -> DeliverySample {
518 DeliverySample {
519 delivered_bytes: 0,
520 sent_at,
521 acked_at: sent_at + Duration::from_millis(rtt_ms),
522 packet_bytes,
523 is_app_limited: false,
524 ack_delay_us: 0, }
526 }
527
528 fn make_app_limited_sample(sent_at: Instant, rtt_ms: u64, packet_bytes: u64) -> DeliverySample {
529 DeliverySample {
530 delivered_bytes: 0,
531 sent_at,
532 acked_at: sent_at + Duration::from_millis(rtt_ms),
533 packet_bytes,
534 is_app_limited: true,
535 ack_delay_us: 0,
536 }
537 }
538
539 #[test]
540 fn test_estimator_starts_in_startup() {
541 let est = BandwidthEstimator::new();
542 assert_eq!(est.state(), BbrState::Startup);
543 assert_eq!(est.delivered_bytes(), 0);
544 assert_eq!(est.inflight_bytes(), 0);
545 assert!(!est.is_app_limited());
546 }
547
548 #[test]
549 fn test_bandwidth_increases_with_acks() {
550 let mut est = BandwidthEstimator::new();
551 let now = Instant::now();
552
553 for i in 0..10 {
555 let sent = now + Duration::from_millis(i * 10);
556 est.on_send(1400);
557 let sample = make_sample(sent, 10, 1400);
558 est.on_ack(sample);
559 }
560
561 assert!(
563 est.bottleneck_bandwidth() > 0,
564 "btl_bw = {} should be > 0",
565 est.bottleneck_bandwidth()
566 );
567 assert_eq!(est.delivered_bytes(), 14_000);
568 }
569
570 #[test]
571 fn test_min_rtt_tracking() {
572 let mut est = BandwidthEstimator::new();
573 let now = Instant::now();
574
575 let s1 = make_sample(now, 100, 1400);
577 est.on_ack(s1);
578 assert!(est.min_rtt() <= Duration::from_millis(101));
579
580 let s2 = make_sample(now + Duration::from_millis(200), 5, 1400);
581 est.on_ack(s2);
582 assert!(
583 est.min_rtt() <= Duration::from_millis(6),
584 "min_rtt = {:?}",
585 est.min_rtt()
586 );
587 }
588
589 #[test]
590 fn test_pacing_rate_positive() {
591 let mut est = BandwidthEstimator::new();
592 let now = Instant::now();
593
594 let sample = make_sample(now, 20, 1400);
595 est.on_ack(sample);
596
597 assert!(est.pacing_rate() > 0);
599 }
600
601 #[test]
602 fn test_cwnd_at_least_minimum() {
603 let est = BandwidthEstimator::new();
604 let cwnd = est.cwnd();
606 assert!(
607 cwnd >= 4 * 1400,
608 "cwnd = {} should be >= {}",
609 cwnd,
610 4 * 1400
611 );
612 }
613
614 #[test]
615 fn test_startup_to_drain_transition() {
616 let mut est = BandwidthEstimator::new();
617 let now = Instant::now();
618
619 for i in 0..20 {
621 let sent = now + Duration::from_millis(i * 10);
622 est.on_send(1400);
623 let sample = make_sample(sent, 10, 1400);
624 est.on_ack(sample);
625 }
626
627 assert!(
629 est.state() != BbrState::Startup || est.round_count < 20,
630 "expected startup exit, state = {:?}, rounds = {}",
631 est.state(),
632 est.round_count
633 );
634 }
635
636 #[test]
639 fn test_inflight_tracking() {
640 let mut est = BandwidthEstimator::new();
641
642 est.on_send(1400);
644 est.on_send(1400);
645 est.on_send(1400);
646 assert_eq!(est.inflight_bytes(), 4200);
647
648 let now = Instant::now();
650 est.on_ack(make_sample(now, 10, 1400));
651 assert_eq!(est.inflight_bytes(), 2800);
652
653 est.on_loss(1400);
655 assert_eq!(est.inflight_bytes(), 1400);
656
657 est.on_ack(make_sample(now + Duration::from_millis(10), 10, 1400));
659 assert_eq!(est.inflight_bytes(), 0);
660 }
661
662 #[test]
663 fn test_inflight_cant_go_negative() {
664 let mut est = BandwidthEstimator::new();
665 est.on_loss(5000);
666 assert_eq!(est.inflight_bytes(), 0); }
668
669 #[test]
670 fn test_app_limited_filtering() {
671 let mut est = BandwidthEstimator::new();
672 let now = Instant::now();
673
674 for i in 0..5 {
676 let sent = now + Duration::from_millis(i * 10);
677 est.on_send(1400);
678 est.on_ack(make_sample(sent, 10, 1400));
679 }
680 let real_bw = est.bottleneck_bandwidth();
681 assert!(real_bw > 0);
682
683 est.set_app_limited();
686 assert!(est.is_app_limited());
687
688 for i in 5..10 {
689 let sent = now + Duration::from_millis(i * 1000);
690 est.on_ack(make_app_limited_sample(sent, 1000, 100)); }
692
693 assert!(
695 est.bottleneck_bandwidth() >= real_bw,
696 "BW should not decrease from app-limited samples: {} < {}",
697 est.bottleneck_bandwidth(),
698 real_bw
699 );
700 }
701
702 #[test]
703 fn test_drain_waits_for_bdp() {
704 let mut est = BandwidthEstimator::new();
705 let now = Instant::now();
706
707 for i in 0..20 {
709 let sent = now + Duration::from_millis(i * 10);
710 est.on_send(1400);
711 est.on_ack(make_sample(sent, 10, 1400));
712 }
713
714 if est.state() == BbrState::Drain {
716 est.inflight_bytes = est.bdp() * 3;
718 let sent = now + Duration::from_millis(300);
719 est.on_ack(make_sample(sent, 10, 1400));
720 if est.inflight_bytes > est.bdp() {
722 assert_eq!(
723 est.state(),
724 BbrState::Drain,
725 "should stay in Drain while inflight ({}) > BDP ({})",
726 est.inflight_bytes,
727 est.bdp()
728 );
729 }
730 }
731 }
732
733 #[test]
734 fn test_bdp_calculation() {
735 let mut est = BandwidthEstimator::new();
736 let now = Instant::now();
737
738 for i in 0..5 {
740 let sent = now + Duration::from_millis(i * 10);
741 est.on_send(1400);
742 est.on_ack(make_sample(sent, 10, 1400));
743 }
744
745 let bdp = est.bdp();
746 assert!(bdp > 0, "BDP should be positive, got {}", bdp);
750 }
751
752 #[test]
753 fn test_cwnd_minimum_in_probe_rtt() {
754 let mut est = BandwidthEstimator::new();
755 est.state = BbrState::ProbeRTT;
757 let cwnd = est.cwnd();
758 assert_eq!(
759 cwnd,
760 PROBE_RTT_CWND_PACKETS * MIN_PACKET_SIZE,
761 "ProbeRTT CWND should be {} (4 packets), got {}",
762 PROBE_RTT_CWND_PACKETS * MIN_PACKET_SIZE,
763 cwnd
764 );
765 }
766}