use std::collections::VecDeque;
use std::time::{Duration, Instant};
const MAX_PACKET_SIZE: usize = 1200;
const PROBE_BW_GAIN_CYCLE: [f64; 8] = [1.25, 0.75, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0];
const STARTUP_GAIN: f64 = 2.885;
const DRAIN_GAIN: f64 = 1.0 / STARTUP_GAIN;
const PROBE_RTT_DURATION: Duration = Duration::from_millis(200);
const RTPROP_FILTER_LEN: Duration = Duration::from_secs(10);
const BTLBW_FILTER_LEN: usize = 10;
const MIN_CWND: u32 = 4;
const INITIAL_CWND: u32 = 10;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BbrState {
Startup,
Drain,
ProbeBw,
ProbeRtt,
}
#[derive(Debug)]
struct MaxFilter {
samples: VecDeque<(Instant, u64)>,
window_len: usize,
}
impl MaxFilter {
fn new(window_len: usize) -> Self {
Self {
samples: VecDeque::with_capacity(window_len),
window_len,
}
}
fn update(&mut self, value: u64, now: Instant) -> u64 {
while self.samples.len() >= self.window_len {
self.samples.pop_front();
}
while let Some(&(_, v)) = self.samples.back() {
if v <= value {
self.samples.pop_back();
} else {
break;
}
}
self.samples.push_back((now, value));
self.samples.front().map(|&(_, v)| v).unwrap_or(value)
}
#[inline]
fn get(&self) -> u64 {
self.samples.front().map(|&(_, v)| v).unwrap_or(0)
}
fn reset(&mut self) {
self.samples.clear();
}
}
#[derive(Debug)]
struct MinFilter {
value: Duration,
timestamp: Instant,
window: Duration,
}
impl MinFilter {
fn new(window: Duration) -> Self {
Self {
value: Duration::from_secs(1), timestamp: Instant::now(),
window,
}
}
fn update(&mut self, rtt: Duration, now: Instant) -> Duration {
if now.duration_since(self.timestamp) > self.window {
self.value = rtt;
self.timestamp = now;
} else if rtt <= self.value {
self.value = rtt;
self.timestamp = now;
}
self.value
}
#[inline]
fn get(&self) -> Duration {
self.value
}
fn expired(&self, now: Instant) -> bool {
now.duration_since(self.timestamp) > self.window
}
}
#[derive(Debug)]
pub struct BbrCongestionControl {
state: BbrState,
btl_bw: MaxFilter, rt_prop: MinFilter,
pacing_rate: u64, pacing_gain: f64,
cwnd_gain: f64,
cwnd: u32,
cycle_index: usize,
cycle_start: Instant,
probe_rtt_done_stamp: Option<Instant>,
probe_rtt_round_done: bool,
delivered: u64, delivered_time: Instant,
last_delivered: u64,
last_delivered_time: Instant,
round_count: u64,
round_start: bool,
next_round_delivered: u64,
loss_in_round: bool,
filled_pipe: bool,
full_bw: u64,
full_bw_count: u32,
inflight: u32,
startup_exit_time: Option<Instant>,
}
impl BbrCongestionControl {
pub fn new() -> Self {
let now = Instant::now();
Self {
state: BbrState::Startup,
btl_bw: MaxFilter::new(BTLBW_FILTER_LEN),
rt_prop: MinFilter::new(RTPROP_FILTER_LEN),
pacing_rate: 0,
pacing_gain: STARTUP_GAIN,
cwnd_gain: STARTUP_GAIN,
cwnd: INITIAL_CWND,
cycle_index: 0,
cycle_start: now,
probe_rtt_done_stamp: None,
probe_rtt_round_done: false,
delivered: 0,
delivered_time: now,
last_delivered: 0,
last_delivered_time: now,
round_count: 0,
round_start: false,
next_round_delivered: 0,
loss_in_round: false,
filled_pipe: false,
full_bw: 0,
full_bw_count: 0,
inflight: 0,
startup_exit_time: None,
}
}
#[inline]
pub fn on_send(&mut self, _bytes: usize) {
self.inflight = self.inflight.saturating_add(1);
}
pub fn on_ack(&mut self, bytes_acked: usize, rtt: Duration) {
let now = Instant::now();
self.inflight = self.inflight.saturating_sub(1);
self.delivered += bytes_acked as u64;
self.rt_prop.update(rtt, now);
let delivery_rate = self.calculate_delivery_rate(now);
if delivery_rate > 0 {
self.btl_bw.update(delivery_rate, now);
}
self.update_round(now);
self.check_full_pipe();
match self.state {
BbrState::Startup => self.update_startup(now),
BbrState::Drain => self.update_drain(now),
BbrState::ProbeBw => self.update_probe_bw(now),
BbrState::ProbeRtt => self.update_probe_rtt(now),
}
self.update_pacing_rate();
self.update_cwnd();
self.last_delivered = self.delivered;
self.last_delivered_time = now;
self.loss_in_round = false;
}
pub fn on_loss(&mut self, _bytes_lost: usize) {
self.inflight = self.inflight.saturating_sub(1);
self.loss_in_round = true;
}
pub fn on_timeout(&mut self) {
self.cwnd = (self.cwnd / 2).max(MIN_CWND);
self.inflight = 0;
}
#[inline]
pub fn available_window(&self) -> usize {
self.cwnd.saturating_sub(self.inflight) as usize
}
#[inline]
pub fn cwnd(&self) -> u32 {
self.cwnd
}
#[inline]
pub fn pacing_rate(&self) -> u64 {
self.pacing_rate
}
#[inline]
pub fn bandwidth(&self) -> u64 {
self.btl_bw.get()
}
#[inline]
pub fn min_rtt(&self) -> Duration {
self.rt_prop.get()
}
#[inline]
pub fn state(&self) -> BbrState {
self.state
}
#[inline]
pub fn pacing_interval(&self) -> Duration {
if self.pacing_rate == 0 {
Duration::from_micros(100) } else {
let interval_ns = (MAX_PACKET_SIZE as u64 * 1_000_000_000) / self.pacing_rate;
Duration::from_nanos(interval_ns.max(1))
}
}
fn calculate_delivery_rate(&self, now: Instant) -> u64 {
let bytes = self.delivered.saturating_sub(self.last_delivered);
let elapsed = now.duration_since(self.last_delivered_time);
if elapsed.is_zero() || bytes == 0 {
return 0;
}
(bytes * 1_000_000_000) / elapsed.as_nanos() as u64
}
fn update_round(&mut self, _now: Instant) {
if self.delivered >= self.next_round_delivered {
self.round_count += 1;
self.round_start = true;
self.next_round_delivered = self.delivered;
} else {
self.round_start = false;
}
}
fn check_full_pipe(&mut self) {
if self.filled_pipe || !self.round_start {
return;
}
let bw = self.btl_bw.get();
if bw >= self.full_bw + self.full_bw / 4 {
self.full_bw = bw;
self.full_bw_count = 0;
return;
}
self.full_bw_count += 1;
if self.full_bw_count >= 3 {
self.filled_pipe = true;
}
}
fn update_startup(&mut self, now: Instant) {
if self.filled_pipe {
self.enter_drain(now);
}
}
fn enter_drain(&mut self, now: Instant) {
self.state = BbrState::Drain;
self.pacing_gain = DRAIN_GAIN;
self.cwnd_gain = STARTUP_GAIN; self.startup_exit_time = Some(now);
}
fn update_drain(&mut self, now: Instant) {
let bdp = self.bdp();
if self.inflight as u64 <= bdp {
self.enter_probe_bw(now);
}
}
fn enter_probe_bw(&mut self, now: Instant) {
self.state = BbrState::ProbeBw;
self.cycle_index = 0;
self.cycle_start = now;
self.pacing_gain = PROBE_BW_GAIN_CYCLE[0];
self.cwnd_gain = 2.0;
}
fn update_probe_bw(&mut self, now: Instant) {
let rt_prop = self.rt_prop.get();
if now.duration_since(self.cycle_start) >= rt_prop {
self.cycle_index = (self.cycle_index + 1) % PROBE_BW_GAIN_CYCLE.len();
self.cycle_start = now;
self.pacing_gain = PROBE_BW_GAIN_CYCLE[self.cycle_index];
}
if self.rt_prop.expired(now) {
self.enter_probe_rtt(now);
}
}
fn enter_probe_rtt(&mut self, now: Instant) {
self.state = BbrState::ProbeRtt;
self.pacing_gain = 1.0;
self.cwnd_gain = 1.0;
self.probe_rtt_done_stamp = None;
self.probe_rtt_round_done = false;
self.cwnd = MIN_CWND;
self.rt_prop = MinFilter::new(RTPROP_FILTER_LEN);
self.rt_prop.timestamp = now;
}
fn update_probe_rtt(&mut self, now: Instant) {
if self.probe_rtt_done_stamp.is_none() {
if self.inflight <= MIN_CWND {
self.probe_rtt_done_stamp = Some(now + PROBE_RTT_DURATION);
self.probe_rtt_round_done = false;
self.next_round_delivered = self.delivered;
}
} else if let Some(done_stamp) = self.probe_rtt_done_stamp {
if self.round_start {
self.probe_rtt_round_done = true;
}
if self.probe_rtt_round_done && now >= done_stamp {
self.restore_after_probe_rtt(now);
}
}
}
fn restore_after_probe_rtt(&mut self, now: Instant) {
if self.filled_pipe {
self.enter_probe_bw(now);
} else {
self.state = BbrState::Startup;
self.pacing_gain = STARTUP_GAIN;
self.cwnd_gain = STARTUP_GAIN;
}
}
fn update_pacing_rate(&mut self) {
let bw = self.btl_bw.get();
if bw > 0 {
self.pacing_rate = (bw as f64 * self.pacing_gain) as u64;
} else {
self.pacing_rate = (1_000_000.0 * self.pacing_gain) as u64;
}
}
fn update_cwnd(&mut self) {
let bdp = self.bdp();
let target = ((bdp as f64 * self.cwnd_gain) as u32).max(MIN_CWND);
if self.cwnd < target {
self.cwnd = target.min(self.cwnd + 1);
}
self.cwnd = self.cwnd.clamp(MIN_CWND, 1024);
}
fn bdp(&self) -> u64 {
let bw = self.btl_bw.get();
let rtt = self.rt_prop.get();
if bw == 0 || rtt.is_zero() {
return MIN_CWND as u64;
}
let bdp_bytes = (bw as u128 * rtt.as_nanos()) / 1_000_000_000;
(bdp_bytes as u64 / MAX_PACKET_SIZE as u64).max(MIN_CWND as u64)
}
}
impl Default for BbrCongestionControl {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bbr_creation() {
let bbr = BbrCongestionControl::new();
assert_eq!(bbr.state, BbrState::Startup);
assert_eq!(bbr.cwnd, INITIAL_CWND);
}
#[test]
fn test_bbr_startup_to_drain() {
let mut bbr = BbrCongestionControl::new();
for i in 0..50 {
bbr.on_send(1200);
bbr.on_ack(1200, Duration::from_millis(10));
if i > 20 && bbr.filled_pipe {
break;
}
}
assert!(bbr.filled_pipe || bbr.state != BbrState::Startup);
}
#[test]
fn test_bbr_pacing_rate() {
let mut bbr = BbrCongestionControl::new();
for _ in 0..10 {
bbr.on_send(1200);
std::thread::sleep(Duration::from_micros(100));
bbr.on_ack(1200, Duration::from_millis(5));
}
assert!(bbr.pacing_rate > 0);
}
#[test]
fn test_bbr_min_rtt() {
let mut bbr = BbrCongestionControl::new();
bbr.on_send(1200);
bbr.on_ack(1200, Duration::from_millis(10));
bbr.on_send(1200);
bbr.on_ack(1200, Duration::from_millis(5));
bbr.on_send(1200);
bbr.on_ack(1200, Duration::from_millis(8));
assert!(bbr.min_rtt() <= Duration::from_millis(6));
}
#[test]
fn test_bbr_available_window() {
let mut bbr = BbrCongestionControl::new();
let initial_window = bbr.available_window();
assert!(initial_window > 0);
bbr.on_send(1200);
bbr.on_send(1200);
assert!(bbr.available_window() < initial_window);
bbr.on_ack(1200, Duration::from_millis(10));
assert!(bbr.available_window() > initial_window - 2);
}
#[test]
fn test_bbr_loss_handling() {
let mut bbr = BbrCongestionControl::new();
for _ in 0..5 {
bbr.on_send(1200);
}
bbr.on_loss(1200);
bbr.on_loss(1200);
assert!(bbr.cwnd >= MIN_CWND);
}
#[test]
fn test_pacing_interval() {
let mut bbr = BbrCongestionControl::new();
let interval = bbr.pacing_interval();
assert!(interval > Duration::ZERO);
for _ in 0..10 {
bbr.on_send(1200);
std::thread::sleep(Duration::from_micros(50));
bbr.on_ack(1200, Duration::from_millis(1));
}
let new_interval = bbr.pacing_interval();
assert!(new_interval > Duration::ZERO);
}
}