use crate::minmax::Minmax;
use crate::packet;
use crate::recovery::*;
use std::time::Duration;
use std::time::Instant;
pub static BBR: CongestionControlOps = CongestionControlOps {
on_init,
reset,
on_packet_sent,
on_packets_acked,
congestion_event,
collapse_cwnd,
checkpoint,
rollback,
has_custom_pacing,
debug_fmt,
};
const BTLBW_FILTER_LEN: Duration = Duration::from_secs(10);
const PROBE_RTT_INTERVAL: Duration = Duration::from_secs(10);
const RTPROP_FILTER_LEN: Duration = PROBE_RTT_INTERVAL;
const BBR_HIGH_GAIN: f64 = 2.89;
const BBR_MIN_PIPE_CWND_PKTS: usize = 4;
const BBR_GAIN_CYCLE_LEN: usize = 8;
const PROBE_RTT_DURATION: Duration = Duration::from_millis(200);
const PACING_GAIN_CYCLE: [f64; BBR_GAIN_CYCLE_LEN] =
[5.0 / 4.0, 3.0 / 4.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0];
const BTLBW_GROWTH_TARGET: f64 = 1.25;
#[derive(Debug, PartialEq, Eq)]
enum BBRStateMachine {
Startup,
Drain,
ProbeBW,
ProbeRTT,
}
pub struct State {
state: BBRStateMachine,
pacing_rate: u64,
btlbw: u64,
btlbwfilter: Minmax<u64>,
rtprop: Duration,
rtprop_stamp: Instant,
rtprop_expired: bool,
pacing_gain: f64,
cwnd_gain: f64,
filled_pipe: bool,
round_count: u64,
round_start: bool,
next_round_delivered: usize,
probe_rtt_done_stamp: Option<Instant>,
probe_rtt_round_done: bool,
packet_conservation: bool,
prior_cwnd: usize,
idle_restart: bool,
full_bw: u64,
full_bw_count: usize,
cycle_stamp: Instant,
cycle_index: usize,
target_cwnd: usize,
in_recovery: bool,
start_time: Instant,
newly_lost_bytes: usize,
newly_acked_bytes: usize,
prior_bytes_in_flight: usize,
}
impl State {
pub fn new() -> Self {
let now = Instant::now();
State {
state: BBRStateMachine::Startup,
pacing_rate: 0,
btlbw: 0,
btlbwfilter: Minmax::new(0),
rtprop: Duration::ZERO,
rtprop_stamp: now,
rtprop_expired: false,
pacing_gain: 0.0,
cwnd_gain: 0.0,
filled_pipe: false,
round_count: 0,
round_start: false,
next_round_delivered: 0,
probe_rtt_done_stamp: None,
probe_rtt_round_done: false,
packet_conservation: false,
prior_cwnd: 0,
idle_restart: false,
full_bw: 0,
full_bw_count: 0,
cycle_stamp: now,
cycle_index: 0,
target_cwnd: 0,
in_recovery: false,
start_time: now,
newly_lost_bytes: 0,
newly_acked_bytes: 0,
prior_bytes_in_flight: 0,
}
}
}
fn bbr_enter_recovery(r: &mut Recovery, now: Instant) {
r.bbr_state.prior_cwnd = per_ack::bbr_save_cwnd(r);
r.congestion_window = r.bytes_in_flight.max(r.max_datagram_size);
r.congestion_recovery_start_time = Some(now);
r.bbr_state.packet_conservation = true;
r.bbr_state.in_recovery = true;
r.bbr_state.newly_lost_bytes = 0;
r.bbr_state.next_round_delivered = r.delivery_rate.delivered();
}
fn bbr_exit_recovery(r: &mut Recovery) {
r.congestion_recovery_start_time = None;
r.bbr_state.packet_conservation = false;
r.bbr_state.in_recovery = false;
per_ack::bbr_restore_cwnd(r);
}
fn on_init(r: &mut Recovery) {
init::bbr_init(r);
}
fn reset(r: &mut Recovery) {
r.bbr_state = State::new();
init::bbr_init(r);
}
fn on_packet_sent(r: &mut Recovery, sent_bytes: usize, _now: Instant) {
per_transmit::bbr_on_transmit(r);
r.bytes_in_flight += sent_bytes;
}
fn on_packets_acked(
r: &mut Recovery, packets: &mut Vec<Acked>, _epoch: packet::Epoch,
now: Instant,
) {
r.bbr_state.newly_acked_bytes =
packets.drain(..).fold(0, |acked_bytes, p| {
r.bbr_state.prior_bytes_in_flight = r.bytes_in_flight;
per_ack::bbr_update_model_and_state(r, &p, now);
r.bytes_in_flight = r.bytes_in_flight.saturating_sub(p.size);
acked_bytes + p.size
});
if let Some(pkt) = packets.last() {
if !r.in_congestion_recovery(pkt.time_sent) && r.bbr_state.in_recovery {
bbr_exit_recovery(r);
}
}
per_ack::bbr_update_control_parameters(r, now);
r.bbr_state.newly_lost_bytes = 0;
}
fn congestion_event(
r: &mut Recovery, lost_bytes: usize, largest_lost_pkt: &Sent,
_epoch: packet::Epoch, now: Instant,
) {
r.bbr_state.newly_lost_bytes = lost_bytes;
if !r.in_congestion_recovery(largest_lost_pkt.time_sent) {
bbr_enter_recovery(r, now);
}
}
fn collapse_cwnd(r: &mut Recovery) {
r.bbr_state.prior_cwnd = per_ack::bbr_save_cwnd(r);
reno::collapse_cwnd(r);
}
fn checkpoint(_r: &mut Recovery) {}
fn rollback(_r: &mut Recovery) -> bool {
false
}
fn has_custom_pacing() -> bool {
true
}
fn debug_fmt(r: &Recovery, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let bbr = &r.bbr_state;
write!(
f,
"bbr={{ state={:?} btlbw={} rtprop={:?} pacing_rate={} pacing_gain={} cwnd_gain={} target_cwnd={} send_quantum={} filled_pipe={} round_count={} }}",
bbr.state, bbr.btlbw, bbr.rtprop, bbr.pacing_rate, bbr.pacing_gain, bbr.cwnd_gain, bbr.target_cwnd, r.send_quantum(), bbr.filled_pipe, bbr.round_count
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::recovery;
use smallvec::smallvec;
#[test]
fn bbr_init() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::BBR);
let mut r = Recovery::new(&cfg);
r.on_init();
assert_eq!(
r.cwnd(),
r.max_datagram_size * cfg.initial_congestion_window_packets
);
assert_eq!(r.bytes_in_flight, 0);
assert_eq!(r.bbr_state.state, BBRStateMachine::Startup);
}
#[test]
fn bbr_send() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::BBR);
let mut r = Recovery::new(&cfg);
let now = Instant::now();
r.on_init();
r.on_packet_sent_cc(1000, now);
assert_eq!(r.bytes_in_flight, 1000);
}
#[test]
fn bbr_startup() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::BBR);
let mut r = Recovery::new(&cfg);
let now = Instant::now();
let mss = r.max_datagram_size;
r.on_init();
for pn in 0..5 {
let pkt = Sent {
pkt_num: pn,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: mss,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
};
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
}
let rtt = Duration::from_millis(50);
let now = now + rtt;
let cwnd_prev = r.cwnd();
let mut acked = ranges::RangeSet::default();
acked.insert(0..5);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
&mut Vec::new(),
),
Ok((0, 0)),
);
assert_eq!(r.bbr_state.state, BBRStateMachine::Startup);
assert_eq!(r.cwnd(), cwnd_prev + mss * 5);
assert_eq!(r.bytes_in_flight, 0);
assert_eq!(
r.delivery_rate(),
((mss * 5) as f64 / rtt.as_secs_f64()) as u64
);
assert_eq!(r.bbr_state.btlbw, r.delivery_rate());
}
#[test]
fn bbr_congestion_event() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::BBR);
let mut r = Recovery::new(&cfg);
let now = Instant::now();
let mss = r.max_datagram_size;
r.on_init();
for pn in 0..5 {
let pkt = Sent {
pkt_num: pn,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: mss,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
};
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
}
let rtt = Duration::from_millis(50);
let now = now + rtt;
let mut acked = ranges::RangeSet::default();
acked.insert(4..5);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
&mut Vec::new(),
),
Ok((2, 2400)),
);
assert_eq!(r.cwnd(), mss * 4);
assert_eq!(r.bytes_in_flight, mss * 2);
}
#[test]
fn bbr_drain() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::BBR);
let mut r = Recovery::new(&cfg);
let now = Instant::now();
let mss = r.max_datagram_size;
r.on_init();
let mut pn = 0;
for _ in 0..3 {
let pkt = Sent {
pkt_num: pn,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: mss,
ack_eliciting: true,
in_flight: true,
delivered: r.delivery_rate.delivered(),
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
};
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
pn += 1;
let rtt = Duration::from_millis(50);
let now = now + rtt;
let mut acked = ranges::RangeSet::default();
acked.insert(0..pn);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
&mut Vec::new(),
),
Ok((0, 0)),
);
}
for _ in 0..5 {
let pkt = Sent {
pkt_num: pn,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: mss,
ack_eliciting: true,
in_flight: true,
delivered: r.delivery_rate.delivered(),
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
};
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
pn += 1;
}
let rtt = Duration::from_millis(50);
let now = now + rtt;
let mut acked = ranges::RangeSet::default();
acked.insert(0..pn - 4);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
&mut Vec::new(),
),
Ok((0, 0)),
);
assert!(r.bbr_state.filled_pipe);
assert_eq!(r.bbr_state.state, BBRStateMachine::Drain);
assert!(r.bbr_state.pacing_gain < 1.0);
}
#[test]
fn bbr_probe_bw() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::BBR);
let mut r = Recovery::new(&cfg);
let now = Instant::now();
let mss = r.max_datagram_size;
r.on_init();
for (pn, _) in (0..4).enumerate() {
let pkt = Sent {
pkt_num: pn as u64,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: mss,
ack_eliciting: true,
in_flight: true,
delivered: r.delivery_rate.delivered(),
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
};
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
let rtt = Duration::from_millis(50);
let now = now + rtt;
let mut acked = ranges::RangeSet::default();
acked.insert(0..pn as u64 + 1);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
&mut Vec::new(),
),
Ok((0, 0)),
);
}
assert!(r.bbr_state.filled_pipe);
assert_eq!(r.bbr_state.state, BBRStateMachine::ProbeBW);
assert!(r.bbr_state.pacing_gain >= 1.0);
}
#[test]
fn bbr_probe_rtt() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::BBR);
let mut r = Recovery::new(&cfg);
let now = Instant::now();
let mss = r.max_datagram_size;
r.on_init();
let mut pn = 0;
for _ in 0..4 {
let pkt = Sent {
pkt_num: pn,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: mss,
ack_eliciting: true,
in_flight: true,
delivered: r.delivery_rate.delivered(),
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
};
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
pn += 1;
let rtt = Duration::from_millis(50);
let now = now + rtt;
let mut acked = ranges::RangeSet::default();
acked.insert(0..pn);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
&mut Vec::new(),
),
Ok((0, 0)),
);
}
assert_eq!(r.bbr_state.state, BBRStateMachine::ProbeBW);
let now = now + RTPROP_FILTER_LEN;
let pkt = Sent {
pkt_num: pn,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: mss,
ack_eliciting: true,
in_flight: true,
delivered: r.delivery_rate.delivered(),
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
};
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
pn += 1;
let rtt = Duration::from_millis(100);
let now = now + rtt;
let mut acked = ranges::RangeSet::default();
acked.insert(0..pn);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
&mut Vec::new(),
),
Ok((0, 0)),
);
assert_eq!(r.bbr_state.state, BBRStateMachine::ProbeRTT);
assert_eq!(r.bbr_state.pacing_gain, 1.0);
}
}
mod init;
mod pacing;
mod per_ack;
mod per_transmit;