use super::*;
use crate::minmax::Minmax;
use std::time::Duration;
use std::time::Instant;
use super::CongestionControlOps;
pub(crate) static BBR2: CongestionControlOps = CongestionControlOps {
on_init,
on_packet_sent,
on_packets_acked,
congestion_event,
checkpoint,
rollback,
has_custom_pacing,
debug_fmt,
};
const PACING_MARGIN_PERCENT: f64 = 0.01;
const STARTUP_PACING_GAIN: f64 = 2.77;
const PROBE_DOWN_PACING_GAIN: f64 = 3_f64 / 4_f64;
const PROBE_UP_PACING_GAIN: f64 = 5_f64 / 4_f64;
const PACING_GAIN: f64 = 1.0;
const STARTUP_CWND_GAIN: f64 = 2.77;
const CWND_GAIN: f64 = 2.0;
const LOSS_THRESH: f64 = 0.02;
const FULL_LOSS_COUNT: u32 = 8;
const BETA: f64 = 0.7;
const HEADROOM: f64 = 0.85;
const MIN_PIPE_CWND_PKTS: usize = 4;
const MIN_RTT_FILTER_LEN: u32 = 1;
const PROBE_RTT_CWND_GAIN: f64 = 0.5;
const PROBE_RTT_DURATION: Duration = Duration::from_millis(200);
const PROBE_RTT_INTERVAL: Duration = Duration::from_secs(86400);
const MAX_BW_GROWTH_THRESHOLD: f64 = 1.25;
const MAX_BW_COUNT: usize = 3;
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
enum BBR2StateMachine {
Startup,
Drain,
ProbeBWDOWN,
ProbeBWCRUISE,
ProbeBWREFILL,
ProbeBWUP,
ProbeRTT,
}
#[derive(Debug, PartialEq, Eq)]
enum BBR2AckPhase {
Init,
ProbeFeedback,
ProbeStarting,
ProbeStopping,
Refilling,
}
pub struct State {
tx_in_flight: usize,
lost: usize,
newly_acked_bytes: usize,
newly_lost_bytes: usize,
pacing_rate: u64,
init_pacing_rate: u64,
pacing_gain: f64,
cwnd_gain: f64,
packet_conservation: bool,
state: BBR2StateMachine,
round_count: u64,
round_start: bool,
next_round_delivered: usize,
idle_restart: bool,
max_bw: u64,
bw_hi: u64,
bw_lo: u64,
bw: u64,
min_rtt: Duration,
bdp: usize,
extra_acked: usize,
offload_budget: usize,
max_inflight: usize,
inflight_hi: usize,
inflight_lo: usize,
bw_latest: u64,
inflight_latest: usize,
max_bw_filter: Minmax<u64>,
cycle_count: u64,
extra_acked_interval_start: Instant,
extra_acked_delivered: usize,
extra_acked_filter: Minmax<usize>,
filled_pipe: bool,
full_bw: u64,
full_bw_count: usize,
min_rtt_stamp: Instant,
probe_rtt_min_delay: Duration,
probe_rtt_min_stamp: Instant,
probe_rtt_expired: bool,
in_recovery: bool,
start_time: Instant,
prior_cwnd: usize,
bw_probe_samples: bool,
probe_up_cnt: usize,
prior_bytes_in_flight: usize,
probe_rtt_done_stamp: Option<Instant>,
probe_rtt_round_done: bool,
bw_probe_wait: Duration,
rounds_since_probe: usize,
cycle_stamp: Instant,
ack_phase: BBR2AckPhase,
bw_probe_up_rounds: usize,
bw_probe_up_acks: usize,
loss_round_start: bool,
loss_round_delivered: usize,
loss_in_round: bool,
loss_events_in_round: usize,
}
impl State {
pub fn new() -> Self {
let now = Instant::now();
State {
tx_in_flight: 0,
lost: 0,
newly_acked_bytes: 0,
newly_lost_bytes: 0,
pacing_rate: 0,
init_pacing_rate: 0,
pacing_gain: 0.0,
cwnd_gain: 0.0,
packet_conservation: false,
state: BBR2StateMachine::Startup,
round_count: 0,
round_start: false,
next_round_delivered: 0,
idle_restart: false,
max_bw: 0,
bw_hi: u64::MAX,
bw_lo: u64::MAX,
bw: 0,
min_rtt: Duration::MAX,
bdp: 0,
extra_acked: 0,
offload_budget: 0,
max_inflight: 0,
inflight_hi: usize::MAX,
inflight_lo: usize::MAX,
bw_latest: 0,
inflight_latest: 0,
max_bw_filter: Minmax::new(0),
cycle_count: 0,
extra_acked_interval_start: now,
extra_acked_delivered: 0,
extra_acked_filter: Minmax::new(0),
filled_pipe: false,
full_bw: 0,
full_bw_count: 0,
min_rtt_stamp: now,
probe_rtt_min_delay: Duration::MAX,
probe_rtt_min_stamp: now,
probe_rtt_expired: false,
in_recovery: false,
start_time: now,
prior_cwnd: 0,
bw_probe_samples: false,
probe_up_cnt: 0,
prior_bytes_in_flight: 0,
probe_rtt_done_stamp: None,
probe_rtt_round_done: false,
bw_probe_wait: Duration::ZERO,
rounds_since_probe: 0,
cycle_stamp: now,
ack_phase: BBR2AckPhase::Init,
bw_probe_up_rounds: 0,
bw_probe_up_acks: 0,
loss_round_start: false,
loss_round_delivered: 0,
loss_in_round: false,
loss_events_in_round: 0,
}
}
}
fn bbr2_enter_recovery(r: &mut Congestion, in_flight: usize, now: Instant) {
r.bbr2_state.prior_cwnd = per_ack::bbr2_save_cwnd(r);
r.congestion_window =
in_flight + r.bbr2_state.newly_acked_bytes.max(r.max_datagram_size);
r.congestion_recovery_start_time = Some(now);
r.bbr2_state.packet_conservation = true;
r.bbr2_state.in_recovery = true;
r.bbr2_state.next_round_delivered = r.delivery_rate.delivered();
}
fn bbr2_exit_recovery(r: &mut Congestion) {
r.congestion_recovery_start_time = None;
r.bbr2_state.packet_conservation = false;
r.bbr2_state.in_recovery = false;
per_ack::bbr2_restore_cwnd(r);
}
fn on_init(r: &mut Congestion) {
init::bbr2_init(r);
}
fn on_packet_sent(
r: &mut Congestion, _sent_bytes: usize, bytes_in_flight: usize, now: Instant,
) {
per_transmit::bbr2_on_transmit(r, bytes_in_flight, now);
}
fn on_packets_acked(
r: &mut Congestion, bytes_in_flight: usize, packets: &mut Vec<Acked>,
now: Instant, _rtt_stats: &RttStats,
) {
r.bbr2_state.newly_acked_bytes = 0;
let time_sent = packets.last().map(|pkt| pkt.time_sent);
r.bbr2_state.prior_bytes_in_flight = bytes_in_flight;
let mut bytes_in_flight = bytes_in_flight;
for p in packets.drain(..) {
per_ack::bbr2_update_model_and_state(r, &p, bytes_in_flight, now);
r.bbr2_state.prior_bytes_in_flight = bytes_in_flight;
bytes_in_flight -= p.size;
r.bbr2_state.newly_acked_bytes += p.size;
}
if let Some(ts) = time_sent {
if !r.in_congestion_recovery(ts) {
bbr2_exit_recovery(r);
}
}
per_ack::bbr2_update_control_parameters(r, bytes_in_flight, now);
r.bbr2_state.newly_lost_bytes = 0;
}
fn congestion_event(
r: &mut Congestion, bytes_in_flight: usize, lost_bytes: usize,
largest_lost_pkt: &Sent, now: Instant,
) {
r.bbr2_state.newly_lost_bytes = lost_bytes;
per_loss::bbr2_update_on_loss(r, largest_lost_pkt, lost_bytes, now);
if !r.in_congestion_recovery(largest_lost_pkt.time_sent) {
bbr2_enter_recovery(r, bytes_in_flight - lost_bytes, now);
}
}
fn checkpoint(_r: &mut Congestion) {}
fn rollback(_r: &mut Congestion) -> bool {
false
}
fn has_custom_pacing() -> bool {
true
}
fn rate_kbps(rate: u64) -> isize {
if rate == u64::MAX {
-1
} else {
(rate * 8 / 1000) as isize
}
}
fn debug_fmt(r: &Congestion, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let bbr = &r.bbr2_state;
write!(f, "bbr2={{ ")?;
write!(
f,
"state={:?} in_recovery={} ack_phase={:?} filled_pipe={} full_bw_count={} loss_events_in_round={} ",
bbr.state, bbr.in_recovery, bbr.ack_phase, bbr.filled_pipe, bbr.full_bw_count, bbr.loss_events_in_round
)?;
write!(
f,
"send_quantum={} extra_acked={} min_rtt={:?} round_start={} ",
r.send_quantum, bbr.extra_acked, bbr.min_rtt, bbr.round_start
)?;
write!(
f,
"max_bw={}kbps bw_lo={}kbps bw={}kbps bw_hi={}kbps full_bw={}kbps ",
rate_kbps(bbr.max_bw),
rate_kbps(bbr.bw_lo),
rate_kbps(bbr.bw),
rate_kbps(bbr.bw_hi),
rate_kbps(bbr.full_bw)
)?;
write!(
f,
"inflight_lo={} inflight_hi={} max_inflight={} ",
bbr.inflight_lo, bbr.inflight_hi, bbr.max_inflight
)?;
write!(
f,
"probe_up_cnt={} bw_probe_samples={} ",
bbr.probe_up_cnt, bbr.bw_probe_samples
)?;
write!(f, "}}")
}
#[cfg(test)]
mod tests {
use super::*;
use smallvec::smallvec;
use crate::packet;
use crate::ranges;
use crate::recovery::congestion::recovery::LegacyRecovery;
use crate::recovery::HandshakeStatus;
use crate::recovery::RecoveryOps;
use crate::CongestionControlAlgorithm;
use crate::OnAckReceivedOutcome;
#[test]
fn bbr_init() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(CongestionControlAlgorithm::BBR2);
let r = LegacyRecovery::new(&cfg);
assert_eq!(
r.cwnd(),
r.max_datagram_size * r.congestion.initial_congestion_window_packets
);
assert_eq!(r.bytes_in_flight(), 0);
assert_eq!(r.congestion.bbr2_state.state, BBR2StateMachine::Startup);
}
#[test]
fn bbr2_startup() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(CongestionControlAlgorithm::BBR2);
let mut r = LegacyRecovery::new(&cfg);
let now = Instant::now();
let mss = r.max_datagram_size;
for pn in 0..5 {
let pkt = Sent {
pkt_num: pn,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: mss,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
is_pmtud_probe: false,
};
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
}
let rtt = Duration::from_millis(50);
let now = now + rtt;
let cwnd_prev = r.cwnd();
let mut acked = ranges::RangeSet::default();
acked.insert(0..5);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
None,
"",
)
.unwrap(),
OnAckReceivedOutcome {
lost_packets: 0,
lost_bytes: 0,
acked_bytes: mss * 5,
spurious_losses: 0,
}
);
assert_eq!(r.congestion.bbr2_state.state, BBR2StateMachine::Startup);
assert_eq!(r.cwnd(), cwnd_prev + mss * 5);
assert_eq!(r.bytes_in_flight(), 0);
assert_eq!(
r.delivery_rate().to_bytes_per_second(),
((mss * 5) as f64 / rtt.as_secs_f64()) as u64
);
assert_eq!(
r.congestion.bbr2_state.full_bw,
r.delivery_rate().to_bytes_per_second()
);
}
#[test]
fn bbr2_congestion_event() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(CongestionControlAlgorithm::BBR2);
let mut r = LegacyRecovery::new(&cfg);
let now = Instant::now();
let mss = r.max_datagram_size;
for pn in 0..5 {
let pkt = Sent {
pkt_num: pn,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: mss,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
is_pmtud_probe: false,
};
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
}
let rtt = Duration::from_millis(50);
let now = now + rtt;
let mut acked = ranges::RangeSet::default();
acked.insert(4..5);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
None,
"",
)
.unwrap(),
OnAckReceivedOutcome {
lost_packets: 2,
lost_bytes: 2 * mss,
acked_bytes: mss,
spurious_losses: 0,
}
);
assert!(r.congestion.bbr2_state.in_recovery);
assert_eq!(r.bytes_in_flight(), mss * 2);
assert_eq!(r.congestion.bbr2_state.newly_acked_bytes, mss);
assert_eq!(r.cwnd(), mss * 3);
}
#[test]
fn bbr2_probe_bw() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(CongestionControlAlgorithm::BBR2);
let mut r = LegacyRecovery::new(&cfg);
let now = Instant::now();
let mss = r.max_datagram_size;
let mut pn = 0;
for _ in 0..3 {
let pkt = Sent {
pkt_num: pn,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: mss,
ack_eliciting: true,
in_flight: true,
delivered: r.congestion.delivery_rate.delivered(),
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
is_pmtud_probe: false,
};
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
pn += 1;
let rtt = Duration::from_millis(50);
let now = now + rtt;
let mut acked = ranges::RangeSet::default();
acked.insert(0..pn);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
None,
"",
)
.unwrap(),
OnAckReceivedOutcome {
lost_packets: 0,
lost_bytes: 0,
acked_bytes: mss,
spurious_losses: 0,
}
);
}
for _ in 0..5 {
let pkt = Sent {
pkt_num: pn,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: mss,
ack_eliciting: true,
in_flight: true,
delivered: r.congestion.delivery_rate.delivered(),
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
is_pmtud_probe: false,
};
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
pn += 1;
}
let rtt = Duration::from_millis(50);
let now = now + rtt;
let mut acked = ranges::RangeSet::default();
acked.insert(0..pn - 4);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
None,
"",
)
.unwrap(),
OnAckReceivedOutcome {
lost_packets: 0,
lost_bytes: 0,
acked_bytes: mss,
spurious_losses: 0,
}
);
assert_eq!(r.congestion.bbr2_state.state, BBR2StateMachine::Drain);
assert!(r.congestion.bbr2_state.filled_pipe);
assert!(r.congestion.bbr2_state.pacing_gain < 1.0);
}
#[test]
fn bbr2_probe_rtt() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(CongestionControlAlgorithm::BBR2);
let mut r = LegacyRecovery::new(&cfg);
let now = Instant::now();
let mss = r.max_datagram_size;
let mut pn = 0;
for _ in 0..4 {
let pkt = Sent {
pkt_num: pn,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: mss,
ack_eliciting: true,
in_flight: true,
delivered: r.congestion.delivery_rate.delivered(),
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
is_pmtud_probe: false,
};
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
pn += 1;
let rtt = Duration::from_millis(50);
let now = now + rtt;
let mut acked = ranges::RangeSet::default();
acked.insert(0..pn);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
None,
"",
)
.unwrap(),
OnAckReceivedOutcome {
lost_packets: 0,
lost_bytes: 0,
acked_bytes: mss,
spurious_losses: 0,
}
);
}
assert_eq!(
r.congestion.bbr2_state.state,
BBR2StateMachine::ProbeBWCRUISE
);
let now = now + PROBE_RTT_INTERVAL;
let pkt = Sent {
pkt_num: pn,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: mss,
ack_eliciting: true,
in_flight: true,
delivered: r.congestion.delivery_rate.delivered(),
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
is_pmtud_probe: false,
};
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
pn += 1;
let rtt = Duration::from_millis(100);
let now = now + rtt;
let mut acked = ranges::RangeSet::default();
acked.insert(0..pn);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
None,
"",
)
.unwrap(),
OnAckReceivedOutcome {
lost_packets: 0,
lost_bytes: 0,
acked_bytes: mss,
spurious_losses: 0,
}
);
assert_eq!(r.congestion.bbr2_state.state, BBR2StateMachine::ProbeRTT);
assert_eq!(r.congestion.bbr2_state.pacing_gain, 1.0);
}
}
mod init;
mod pacing;
mod per_ack;
mod per_loss;
mod per_transmit;