use super::*;
use crate::minmax::Minmax;
use std::time::Duration;
use super::CongestionControlOps;
pub(crate) static BBR: CongestionControlOps = CongestionControlOps {
on_init,
on_packet_sent,
on_packets_acked,
congestion_event,
checkpoint,
rollback,
has_custom_pacing,
debug_fmt,
};
const BTLBW_FILTER_LEN: Duration = Duration::from_secs(10);
const PROBE_RTT_INTERVAL: Duration = Duration::from_secs(10);
const RTPROP_FILTER_LEN: Duration = PROBE_RTT_INTERVAL;
const BBR_HIGH_GAIN: f64 = 2.89;
const BBR_MIN_PIPE_CWND_PKTS: usize = 4;
const BBR_GAIN_CYCLE_LEN: usize = 8;
const PROBE_RTT_DURATION: Duration = Duration::from_millis(200);
const PACING_GAIN_CYCLE: [f64; BBR_GAIN_CYCLE_LEN] =
[5.0 / 4.0, 3.0 / 4.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0];
const BTLBW_GROWTH_TARGET: f64 = 1.25;
#[derive(Debug, PartialEq, Eq)]
enum BBRStateMachine {
Startup,
Drain,
ProbeBW,
ProbeRTT,
}
pub struct State {
state: BBRStateMachine,
pacing_rate: u64,
btlbw: u64,
btlbwfilter: Minmax<u64>,
rtprop: Duration,
rtprop_stamp: Instant,
rtprop_expired: bool,
pacing_gain: f64,
cwnd_gain: f64,
filled_pipe: bool,
round_count: u64,
round_start: bool,
next_round_delivered: usize,
probe_rtt_done_stamp: Option<Instant>,
probe_rtt_round_done: bool,
packet_conservation: bool,
prior_cwnd: usize,
idle_restart: bool,
full_bw: u64,
full_bw_count: usize,
cycle_stamp: Instant,
cycle_index: usize,
target_cwnd: usize,
in_recovery: bool,
start_time: Instant,
newly_lost_bytes: usize,
newly_acked_bytes: usize,
prior_bytes_in_flight: usize,
}
impl State {
pub fn new() -> Self {
let now = Instant::now();
State {
state: BBRStateMachine::Startup,
pacing_rate: 0,
btlbw: 0,
btlbwfilter: Minmax::new(0),
rtprop: Duration::ZERO,
rtprop_stamp: now,
rtprop_expired: false,
pacing_gain: 0.0,
cwnd_gain: 0.0,
filled_pipe: false,
round_count: 0,
round_start: false,
next_round_delivered: 0,
probe_rtt_done_stamp: None,
probe_rtt_round_done: false,
packet_conservation: false,
prior_cwnd: 0,
idle_restart: false,
full_bw: 0,
full_bw_count: 0,
cycle_stamp: now,
cycle_index: 0,
target_cwnd: 0,
in_recovery: false,
start_time: now,
newly_lost_bytes: 0,
newly_acked_bytes: 0,
prior_bytes_in_flight: 0,
}
}
}
fn bbr_enter_recovery(r: &mut Congestion, in_flight: usize, now: Instant) {
r.bbr_state.prior_cwnd = per_ack::bbr_save_cwnd(r);
r.congestion_window = in_flight.max(r.max_datagram_size);
r.congestion_recovery_start_time = Some(now);
r.bbr_state.packet_conservation = true;
r.bbr_state.in_recovery = true;
r.bbr_state.newly_lost_bytes = 0;
r.bbr_state.next_round_delivered = r.delivery_rate.delivered();
}
fn bbr_exit_recovery(r: &mut Congestion) {
r.congestion_recovery_start_time = None;
r.bbr_state.packet_conservation = false;
r.bbr_state.in_recovery = false;
per_ack::bbr_restore_cwnd(r);
}
fn on_init(r: &mut Congestion) {
init::bbr_init(r);
}
fn on_packet_sent(
r: &mut Congestion, _sent_bytes: usize, bytes_in_flight: usize, _now: Instant,
) {
per_transmit::bbr_on_transmit(r, bytes_in_flight);
}
fn on_packets_acked(
r: &mut Congestion, bytes_in_flight: usize, packets: &mut Vec<Acked>,
now: Instant, _rtt_stats: &RttStats,
) {
r.bbr_state.prior_bytes_in_flight = bytes_in_flight;
r.bbr_state.newly_acked_bytes =
packets.drain(..).fold(0, |acked_bytes, p| {
r.bbr_state.prior_bytes_in_flight -= p.size;
per_ack::bbr_update_model_and_state(r, &p, bytes_in_flight, now);
acked_bytes + p.size
});
if let Some(pkt) = packets.last() {
if !r.in_congestion_recovery(pkt.time_sent) && r.bbr_state.in_recovery {
bbr_exit_recovery(r);
}
}
per_ack::bbr_update_control_parameters(r, bytes_in_flight, now);
r.bbr_state.newly_lost_bytes = 0;
}
fn congestion_event(
r: &mut Congestion, bytes_in_flight: usize, lost_bytes: usize,
largest_lost_pkt: &Sent, now: Instant,
) {
r.bbr_state.newly_lost_bytes = lost_bytes;
if !r.in_congestion_recovery(largest_lost_pkt.time_sent) {
bbr_enter_recovery(r, bytes_in_flight - lost_bytes, now);
}
}
fn checkpoint(_r: &mut Congestion) {}
fn rollback(_r: &mut Congestion) -> bool {
false
}
fn has_custom_pacing() -> bool {
true
}
fn debug_fmt(r: &Congestion, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let bbr = &r.bbr_state;
write!(
f,
"bbr={{ state={:?} btlbw={} rtprop={:?} pacing_rate={} pacing_gain={} cwnd_gain={} target_cwnd={} send_quantum={} filled_pipe={} round_count={} }}",
bbr.state, bbr.btlbw, bbr.rtprop, bbr.pacing_rate, bbr.pacing_gain, bbr.cwnd_gain, bbr.target_cwnd, r.send_quantum(), bbr.filled_pipe, bbr.round_count
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::packet;
use crate::ranges;
use crate::recovery::congestion::recovery::LegacyRecovery;
use crate::recovery::congestion::test_sender::TestSender;
use crate::recovery::HandshakeStatus;
use crate::recovery::RecoveryOps;
use crate::OnAckReceivedOutcome;
use smallvec::smallvec;
fn test_sender() -> TestSender {
TestSender::new(CongestionControlAlgorithm::BBR, false)
}
#[test]
fn bbr_init() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(CongestionControlAlgorithm::BBR);
let r = LegacyRecovery::new(&cfg);
assert_eq!(
r.cwnd(),
r.max_datagram_size * cfg.initial_congestion_window_packets
);
assert_eq!(r.bytes_in_flight(), 0);
assert_eq!(r.congestion.bbr_state.state, BBRStateMachine::Startup);
}
#[test]
fn bbr_startup() {
let mut sender = test_sender();
let mss = sender.max_datagram_size;
let rtt = Duration::from_millis(50);
sender.update_rtt(rtt);
sender.advance_time(rtt);
for _ in 0..5 {
sender.send_packet(mss);
}
sender.advance_time(rtt);
let cwnd_prev = sender.congestion_window;
sender.ack_n_packets(5, mss);
assert_eq!(sender.bbr_state.state, BBRStateMachine::Startup);
assert_eq!(sender.congestion_window, cwnd_prev + mss * 5);
assert_eq!(sender.bytes_in_flight, 0);
assert_eq!(
sender.delivery_rate().to_bytes_per_second(),
((mss * 5) as f64 / rtt.as_secs_f64()) as u64
);
assert_eq!(
sender.bbr_state.btlbw,
sender.delivery_rate().to_bytes_per_second()
);
}
#[test]
fn bbr_congestion_event() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(CongestionControlAlgorithm::BBR);
let mut r = LegacyRecovery::new(&cfg);
let now = Instant::now();
let mss = r.max_datagram_size;
for pn in 0..5 {
let pkt = Sent {
pkt_num: pn,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: mss,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
is_pmtud_probe: false,
};
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
}
let rtt = Duration::from_millis(50);
let now = now + rtt;
let mut acked = ranges::RangeSet::default();
acked.insert(4..5);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
None,
"",
)
.unwrap(),
OnAckReceivedOutcome {
lost_packets: 2,
lost_bytes: 2 * mss,
acked_bytes: mss,
spurious_losses: 0,
},
);
assert_eq!(r.cwnd(), mss * 4);
assert_eq!(r.bytes_in_flight(), mss * 2);
}
#[test]
fn bbr_drain() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(CongestionControlAlgorithm::BBR);
let mut r = LegacyRecovery::new(&cfg);
let now = Instant::now();
let mss = r.max_datagram_size;
let mut pn = 0;
for _ in 0..3 {
let pkt = Sent {
pkt_num: pn,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: mss,
ack_eliciting: true,
in_flight: true,
delivered: r.congestion.delivery_rate.delivered(),
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
is_pmtud_probe: false,
};
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
pn += 1;
let rtt = Duration::from_millis(50);
let now = now + rtt;
let mut acked = ranges::RangeSet::default();
acked.insert(0..pn);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
None,
"",
)
.unwrap(),
OnAckReceivedOutcome {
lost_packets: 0,
lost_bytes: 0,
acked_bytes: mss,
spurious_losses: 0,
},
);
}
for _ in 0..7 {
let pkt = Sent {
pkt_num: pn,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: mss,
ack_eliciting: true,
in_flight: true,
delivered: r.congestion.delivery_rate.delivered(),
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
is_pmtud_probe: false,
};
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
pn += 1;
}
let rtt = Duration::from_millis(50);
let now = now + rtt;
let mut acked = ranges::RangeSet::default();
acked.insert(0..pn - 6);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
None,
"",
)
.unwrap(),
OnAckReceivedOutcome {
lost_packets: 0,
lost_bytes: 0,
acked_bytes: mss,
spurious_losses: 0,
},
);
assert!(r.congestion.bbr_state.filled_pipe);
assert_eq!(r.congestion.bbr_state.state, BBRStateMachine::Drain);
assert!(r.congestion.bbr_state.pacing_gain < 1.0);
}
#[test]
fn bbr_probe_bw() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(CongestionControlAlgorithm::BBR);
let mut r = LegacyRecovery::new(&cfg);
let now = Instant::now();
let mss = r.max_datagram_size;
for (pn, _) in (0..4).enumerate() {
let pkt = Sent {
pkt_num: pn as u64,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: mss,
ack_eliciting: true,
in_flight: true,
delivered: r.congestion.delivery_rate.delivered(),
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
is_pmtud_probe: false,
};
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
let rtt = Duration::from_millis(50);
let now = now + rtt;
let mut acked = ranges::RangeSet::default();
acked.insert(0..pn as u64 + 1);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
None,
"",
)
.unwrap(),
OnAckReceivedOutcome {
lost_packets: 0,
lost_bytes: 0,
acked_bytes: mss,
spurious_losses: 0,
},
);
}
assert!(r.congestion.bbr_state.filled_pipe);
assert_eq!(r.congestion.bbr_state.state, BBRStateMachine::ProbeBW);
assert!(r.congestion.bbr_state.pacing_gain >= 1.0);
}
#[test]
fn bbr_probe_rtt() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(CongestionControlAlgorithm::BBR);
let mut r = LegacyRecovery::new(&cfg);
let now = Instant::now();
let mss = r.max_datagram_size;
let mut pn = 0;
for _ in 0..4 {
let pkt = Sent {
pkt_num: pn,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: mss,
ack_eliciting: true,
in_flight: true,
delivered: r.congestion.delivery_rate.delivered(),
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
is_pmtud_probe: false,
};
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
pn += 1;
let rtt = Duration::from_millis(50);
let now = now + rtt;
let mut acked = ranges::RangeSet::default();
acked.insert(0..pn);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
None,
"",
)
.unwrap(),
OnAckReceivedOutcome {
lost_packets: 0,
lost_bytes: 0,
acked_bytes: mss,
spurious_losses: 0,
},
);
}
assert_eq!(r.congestion.bbr_state.state, BBRStateMachine::ProbeBW);
let now = now + RTPROP_FILTER_LEN;
let pkt = Sent {
pkt_num: pn,
frames: smallvec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: mss,
ack_eliciting: true,
in_flight: true,
delivered: r.congestion.delivery_rate.delivered(),
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: false,
is_pmtud_probe: false,
};
r.on_packet_sent(
pkt,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
"",
);
pn += 1;
let rtt = Duration::from_millis(100);
let now = now + rtt;
let mut acked = ranges::RangeSet::default();
acked.insert(0..pn);
assert_eq!(
r.on_ack_received(
&acked,
25,
packet::Epoch::Application,
HandshakeStatus::default(),
now,
None,
"",
)
.unwrap(),
OnAckReceivedOutcome {
lost_packets: 0,
lost_bytes: 0,
acked_bytes: mss,
spurious_losses: 0,
},
);
assert_eq!(r.congestion.bbr_state.state, BBRStateMachine::ProbeRTT);
assert_eq!(r.congestion.bbr_state.pacing_gain, 1.0);
}
}
mod init;
mod pacing;
mod per_ack;
mod per_transmit;