extern crate rand;
use std::time::{Duration, Instant};
use log::*;
use rand::Rng;
use super::delivery_rate::DeliveryRateEstimator;
use super::minmax::MinMax;
use super::{CongestionController, CongestionStats};
use crate::connection::rtt::RttEstimator;
use crate::connection::space::{RateSamplePacketState, SentPacket};
use crate::RecoveryConfig;
#[derive(Debug)]
pub struct BbrConfig {
min_cwnd: u64,
initial_cwnd: u64,
initial_rtt: Option<Duration>,
probe_rtt_duration: Duration,
probe_rtt_based_on_bdp: bool,
probe_rtt_cwnd_gain: f64,
rtprop_filter_len: Duration,
probe_bw_cwnd_gain: f64,
max_datagram_size: u64,
}
impl BbrConfig {
pub fn from(conf: &RecoveryConfig) -> Self {
let max_datagram_size = conf.max_datagram_size as u64;
let min_cwnd = conf.min_congestion_window.saturating_mul(max_datagram_size);
let initial_cwnd = conf
.initial_congestion_window
.saturating_mul(max_datagram_size);
Self {
min_cwnd,
initial_cwnd,
initial_rtt: Some(conf.initial_rtt),
probe_rtt_duration: conf.bbr_probe_rtt_duration,
probe_rtt_based_on_bdp: conf.bbr_probe_rtt_based_on_bdp,
probe_rtt_cwnd_gain: conf.bbr_probe_rtt_cwnd_gain,
rtprop_filter_len: conf.bbr_rtprop_filter_len,
probe_bw_cwnd_gain: conf.bbr_probe_bw_cwnd_gain,
max_datagram_size,
}
}
}
impl Default for BbrConfig {
fn default() -> Self {
Self {
min_cwnd: 4 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_cwnd: 80 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_rtt: Some(crate::INITIAL_RTT),
probe_rtt_duration: PROBE_RTT_DURATION,
probe_rtt_based_on_bdp: false,
probe_rtt_cwnd_gain: 0.75,
rtprop_filter_len: RTPROP_FILTER_LEN,
probe_bw_cwnd_gain: 2.0,
max_datagram_size: crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
}
}
}
const BTLBW_FILTER_LEN: u64 = 10;
const RTPROP_FILTER_LEN: Duration = Duration::from_secs(10);
const HIGH_GAIN: f64 = 2.89;
const BTLBW_GROWTH_RATE: f64 = 0.25;
const FULL_BW_COUNT_THRESHOLD: u64 = 3;
const GAIN_CYCLE_LEN: usize = 8;
const PACING_GAIN_CYCLE: [f64; GAIN_CYCLE_LEN] = [1.25, 0.75, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0];
const PROBE_RTT_INTERVAL: Duration = Duration::from_secs(10);
const PROBE_RTT_DURATION: Duration = Duration::from_millis(200);
const SEND_QUANTUM_THRESHOLD_PACING_RATE: u64 = 1_200_000 / 8;
#[derive(Debug, PartialEq, Eq)]
enum BbrStateMachine {
Startup,
Drain,
ProbeBW,
ProbeRTT,
}
#[derive(Debug, Default)]
struct RoundTripCounter {
pub round_count: u64,
pub is_round_start: bool,
pub next_round_delivered: u64,
}
#[derive(Debug, Default)]
struct FullPipeEstimator {
is_filled_pipe: bool,
full_bw: u64,
full_bw_count: u64,
}
#[derive(Debug)]
struct AckState {
now: Instant,
newly_lost_bytes: u64,
newly_acked_bytes: u64,
packet_delivered: u64,
last_ack_packet_sent_time: Instant,
prior_bytes_in_flight: u64,
}
impl Default for AckState {
fn default() -> Self {
let now = Instant::now();
Self {
now,
newly_lost_bytes: 0,
newly_acked_bytes: 0,
packet_delivered: 0,
last_ack_packet_sent_time: now,
prior_bytes_in_flight: 0,
}
}
}
#[derive(Debug)]
pub struct Bbr {
config: BbrConfig,
stats: CongestionStats,
state: BbrStateMachine,
pacing_rate: u64,
send_quantum: u64,
cwnd: u64,
btlbw: u64,
btlbwfilter: MinMax,
delivery_rate_estimator: DeliveryRateEstimator,
rtprop: Duration,
rtprop_stamp: Instant,
is_rtprop_expired: bool,
pacing_gain: f64,
cwnd_gain: f64,
round: RoundTripCounter,
full_pipe: FullPipeEstimator,
probe_rtt_done_stamp: Option<Instant>,
probe_rtt_round_done: bool,
packet_conservation: bool,
prior_cwnd: u64,
is_idle_restart: bool,
cycle_stamp: Instant,
cycle_index: usize,
target_cwnd: u64,
in_recovery: bool,
ack_state: AckState,
recovery_epoch_start: Option<Instant>,
}
impl Bbr {
pub fn new(config: BbrConfig) -> Self {
let now = Instant::now();
let initial_cwnd = config.initial_cwnd;
let mut bbr = Self {
config,
stats: Default::default(),
state: BbrStateMachine::Startup,
pacing_rate: 0,
send_quantum: 0,
cwnd: initial_cwnd,
btlbw: 0,
btlbwfilter: MinMax::new(BTLBW_FILTER_LEN),
delivery_rate_estimator: DeliveryRateEstimator::default(),
rtprop: Duration::MAX,
rtprop_stamp: now,
is_rtprop_expired: false,
pacing_gain: HIGH_GAIN,
cwnd_gain: HIGH_GAIN,
round: Default::default(),
full_pipe: Default::default(),
probe_rtt_done_stamp: None,
probe_rtt_round_done: false,
packet_conservation: false,
prior_cwnd: 0,
is_idle_restart: false,
cycle_stamp: now,
cycle_index: 0,
target_cwnd: 0,
in_recovery: false,
ack_state: AckState::default(),
recovery_epoch_start: None,
};
bbr.init();
bbr
}
fn init(&mut self) {
self.rtprop = self.config.initial_rtt.unwrap_or(Duration::MAX);
self.rtprop_stamp = Instant::now();
self.probe_rtt_done_stamp = None;
self.probe_rtt_round_done = false;
self.packet_conservation = false;
self.prior_cwnd = 0;
self.is_idle_restart = false;
self.init_round_counting();
self.init_full_pipe();
self.init_pacing_rate();
self.enter_startup();
}
fn init_round_counting(&mut self) {
self.round.next_round_delivered = 0;
self.round.round_count = 0;
self.round.is_round_start = false;
}
fn init_full_pipe(&mut self) {
self.full_pipe.is_filled_pipe = false;
self.full_pipe.full_bw = 0;
self.full_pipe.full_bw_count = 0;
}
fn init_pacing_rate(&mut self) {
let srtt = match self.config.initial_rtt {
Some(rtt) => rtt,
_ => Duration::from_millis(1),
};
let nominal_bandwidth = self.config.initial_cwnd as f64 / srtt.as_secs_f64();
self.pacing_rate = (self.pacing_gain * nominal_bandwidth) as u64;
}
fn enter_startup(&mut self) {
self.state = BbrStateMachine::Startup;
self.pacing_gain = HIGH_GAIN;
self.cwnd_gain = HIGH_GAIN;
}
fn check_full_pipe(&mut self) {
if self.is_filled_pipe()
|| !self.is_round_start()
|| self.delivery_rate_estimator.is_sample_app_limited()
{
return;
}
if self.btlbw >= (self.full_pipe.full_bw as f64 * (1.0_f64 + BTLBW_GROWTH_RATE)) as u64 {
self.full_pipe.full_bw = self.btlbw;
self.full_pipe.full_bw_count = 0;
return;
}
self.full_pipe.full_bw_count += 1;
if self.full_pipe.full_bw_count >= FULL_BW_COUNT_THRESHOLD {
self.full_pipe.is_filled_pipe = true;
}
}
fn update_round(&mut self) {
if self.ack_state.packet_delivered >= self.round.next_round_delivered {
self.round.next_round_delivered = self.delivery_rate_estimator.delivered();
self.round.round_count += 1;
self.round.is_round_start = true;
self.packet_conservation = false;
} else {
self.round.is_round_start = false;
}
}
pub fn is_filled_pipe(&self) -> bool {
self.full_pipe.is_filled_pipe
}
pub fn is_round_start(&self) -> bool {
self.round.is_round_start
}
fn set_pacing_rate_with_gain(&mut self, pacing_gain: f64) {
let rate = (pacing_gain * self.btlbw as f64) as u64;
if self.is_filled_pipe() || rate > self.pacing_rate {
self.pacing_rate = rate;
}
}
fn enter_drain(&mut self) {
self.state = BbrStateMachine::Drain;
self.pacing_gain = 1.0 / HIGH_GAIN; self.cwnd_gain = HIGH_GAIN; }
fn inflight(&self, gain: f64) -> u64 {
if self.rtprop == Duration::MAX {
return self.config.initial_cwnd;
}
let quanta = 3 * self.send_quantum;
let estimated_bdp = self.btlbw as f64 * self.rtprop.as_secs_f64();
(gain * estimated_bdp) as u64 + quanta
}
fn update_target_cwnd(&mut self) {
self.target_cwnd = self.inflight(self.cwnd_gain);
}
fn check_drain(&mut self, bytes_in_flight: u64, now: Instant) {
if self.state == BbrStateMachine::Startup && self.is_filled_pipe() {
self.enter_drain();
}
if self.state == BbrStateMachine::Drain && bytes_in_flight <= self.inflight(1.0) {
self.enter_probe_bw(now);
}
}
fn enter_probe_bw(&mut self, now: Instant) {
self.state = BbrStateMachine::ProbeBW;
self.pacing_gain = 1.0;
self.cwnd_gain = self.config.probe_bw_cwnd_gain;
self.cycle_index = GAIN_CYCLE_LEN - 1 - rand::thread_rng().gen_range(0..GAIN_CYCLE_LEN - 1);
self.advance_cycle_phase(now);
}
fn check_cycle_phase(&mut self, now: Instant) {
if self.state == BbrStateMachine::ProbeBW && self.is_next_cycle_phase(now) {
self.advance_cycle_phase(now);
}
}
fn advance_cycle_phase(&mut self, now: Instant) {
self.cycle_stamp = now;
self.cycle_index = (self.cycle_index + 1) % GAIN_CYCLE_LEN;
self.pacing_gain = PACING_GAIN_CYCLE[self.cycle_index];
}
fn is_next_cycle_phase(&mut self, now: Instant) -> bool {
let is_full_length = now.saturating_duration_since(self.cycle_stamp) > self.rtprop;
if self.pacing_gain > 1.0 {
return is_full_length
&& (self.ack_state.newly_lost_bytes > 0
|| self.ack_state.prior_bytes_in_flight >= self.inflight(self.pacing_gain));
} else if self.pacing_gain < 1.0 {
return is_full_length || self.ack_state.prior_bytes_in_flight <= self.inflight(1.0);
}
is_full_length
}
fn handle_restart_from_idle(&mut self, bytes_in_flight: u64) {
if bytes_in_flight == 0 && self.delivery_rate_estimator.is_app_limited() {
self.is_idle_restart = true;
if self.state == BbrStateMachine::ProbeBW {
self.set_pacing_rate_with_gain(1.0);
}
}
}
fn save_cwnd(&mut self) {
self.prior_cwnd = if !self.in_recovery && self.state != BbrStateMachine::ProbeRTT {
self.cwnd
} else {
self.cwnd.max(self.prior_cwnd)
}
}
fn restore_cwnd(&mut self) {
self.cwnd = self.cwnd.max(self.prior_cwnd)
}
fn probe_rtt_cwnd(&self) -> u64 {
if self.config.probe_rtt_based_on_bdp {
return self.inflight(self.config.probe_rtt_cwnd_gain);
}
self.config.min_cwnd
}
fn check_probe_rtt(&mut self, now: Instant, bytes_in_flight: u64) {
if self.state != BbrStateMachine::ProbeRTT
&& self.is_rtprop_expired
&& !self.is_idle_restart
{
self.enter_probe_rtt();
self.save_cwnd();
self.probe_rtt_done_stamp = None;
}
if self.state == BbrStateMachine::ProbeRTT {
self.handle_probe_rtt(now, bytes_in_flight);
}
self.is_idle_restart = false;
}
fn enter_probe_rtt(&mut self) {
self.state = BbrStateMachine::ProbeRTT;
self.pacing_gain = 1.0;
self.cwnd_gain = 1.0;
}
fn handle_probe_rtt(&mut self, now: Instant, bytes_in_flight: u64) {
self.delivery_rate_estimator.set_app_limited(true);
if let Some(probe_rtt_done_stamp) = self.probe_rtt_done_stamp {
if self.is_round_start() {
self.probe_rtt_round_done = true;
}
if self.probe_rtt_round_done && now >= probe_rtt_done_stamp {
self.rtprop_stamp = now;
self.restore_cwnd();
self.exit_probe_rtt(now);
}
} else if bytes_in_flight <= self.probe_rtt_cwnd() {
self.probe_rtt_done_stamp = Some(now + self.config.probe_rtt_duration);
self.probe_rtt_round_done = false;
self.round.next_round_delivered = self.delivery_rate_estimator.delivered();
}
}
fn exit_probe_rtt(&mut self, now: Instant) {
if self.is_filled_pipe() {
self.enter_probe_bw(now);
} else {
self.enter_startup();
}
}
fn update_model_and_state(&mut self, now: Instant) {
self.update_btlbw();
self.check_cycle_phase(now);
self.check_full_pipe();
self.check_drain(self.stats.bytes_in_flight, now);
self.update_rtprop(now);
self.check_probe_rtt(now, self.stats.bytes_in_flight);
}
fn update_control_parameters(&mut self) {
self.set_pacing_rate();
self.set_send_quantum();
self.set_cwnd();
}
fn update_btlbw(&mut self) {
self.update_round();
if self.delivery_rate_estimator.delivery_rate() >= self.btlbw
|| !self.delivery_rate_estimator.is_sample_app_limited()
{
self.btlbwfilter.update_max(
self.round.round_count,
self.delivery_rate_estimator.delivery_rate(),
);
self.btlbw = self.btlbwfilter.get();
}
}
fn update_rtprop(&mut self, now: Instant) {
let sample_rtt = self.delivery_rate_estimator.sample_rtt();
self.is_rtprop_expired =
now.saturating_duration_since(self.rtprop_stamp) > self.config.rtprop_filter_len;
if !sample_rtt.is_zero() && (sample_rtt <= self.rtprop || self.is_rtprop_expired) {
self.rtprop = sample_rtt;
self.rtprop_stamp = now;
}
}
fn set_pacing_rate(&mut self) {
self.set_pacing_rate_with_gain(self.pacing_gain);
}
fn set_send_quantum(&mut self) {
let floor = if self.pacing_rate < SEND_QUANTUM_THRESHOLD_PACING_RATE {
self.config.max_datagram_size
} else {
2 * self.config.max_datagram_size
};
self.send_quantum = (self.pacing_rate / 1000).clamp(floor, 64 * 1024);
}
fn modulate_cwnd_for_recovery(&mut self, bytes_in_flight: u64) {
if self.ack_state.newly_lost_bytes > 0 {
self.cwnd = self
.cwnd
.saturating_sub(self.ack_state.newly_lost_bytes)
.max(self.config.min_cwnd);
}
if self.packet_conservation {
self.cwnd = self
.cwnd
.max(bytes_in_flight + self.ack_state.newly_acked_bytes);
}
}
fn modulate_cwnd_for_probe_rtt(&mut self) {
if self.state == BbrStateMachine::ProbeRTT {
self.cwnd = self.probe_rtt_cwnd();
}
}
fn set_cwnd(&mut self) {
let bytes_in_flight = self.stats.bytes_in_flight;
self.update_target_cwnd();
self.modulate_cwnd_for_recovery(bytes_in_flight);
if !self.packet_conservation {
if self.is_filled_pipe() {
self.cwnd = self
.target_cwnd
.min(self.cwnd + self.ack_state.newly_acked_bytes);
} else if self.cwnd < self.target_cwnd
|| self.delivery_rate_estimator.delivered() < self.config.initial_cwnd
{
self.cwnd += self.ack_state.newly_acked_bytes;
}
self.cwnd = self.cwnd.max(self.config.min_cwnd);
}
self.modulate_cwnd_for_probe_rtt();
}
fn enter_recovery(&mut self, now: Instant) {
self.save_cwnd();
self.recovery_epoch_start = Some(now);
self.cwnd = self.stats.bytes_in_flight
+ self
.ack_state
.newly_acked_bytes
.max(self.config.max_datagram_size);
self.packet_conservation = true;
self.in_recovery = true;
self.round.next_round_delivered = self.delivery_rate_estimator.delivered();
}
fn exit_recovery(&mut self) {
self.recovery_epoch_start = None;
self.packet_conservation = false;
self.in_recovery = false;
self.restore_cwnd();
}
}
impl CongestionController for Bbr {
fn name(&self) -> &str {
"BBR"
}
fn on_sent(&mut self, now: Instant, packet: &mut SentPacket, bytes_in_flight: u64) {
self.delivery_rate_estimator.on_packet_sent(
packet,
self.stats.bytes_in_flight,
self.stats.bytes_lost_in_total,
);
self.handle_restart_from_idle(self.stats.bytes_in_flight);
self.stats.bytes_in_flight += packet.sent_size as u64;
}
fn begin_ack(&mut self, now: Instant, bytes_in_flight: u64) {
self.ack_state.newly_acked_bytes = 0;
self.ack_state.newly_lost_bytes = 0;
self.ack_state.packet_delivered = 0;
self.ack_state.last_ack_packet_sent_time = now;
self.ack_state.prior_bytes_in_flight = self.stats.bytes_in_flight;
self.ack_state.now = now;
}
fn on_ack(
&mut self,
packet: &mut SentPacket,
now: Instant,
_app_limited: bool,
_rtt: &RttEstimator,
bytes_in_flight: u64,
) {
self.delivery_rate_estimator.update_rate_sample(packet);
self.stats.bytes_in_flight = self
.stats
.bytes_in_flight
.saturating_sub(packet.sent_size as u64);
self.stats.bytes_acked_in_total = self
.stats
.bytes_acked_in_total
.saturating_add(packet.sent_size as u64);
if self.in_slow_start() {
self.stats.bytes_acked_in_slow_start = self
.stats
.bytes_acked_in_slow_start
.saturating_add(packet.sent_size as u64);
}
self.ack_state.newly_acked_bytes += packet.sent_size as u64;
self.ack_state.last_ack_packet_sent_time = packet.time_sent;
self.ack_state.packet_delivered = self
.ack_state
.packet_delivered
.max(packet.rate_sample_state.delivered);
}
fn end_ack(&mut self) {
self.delivery_rate_estimator.generate_rate_sample();
if self.in_recovery && !self.in_recovery(self.ack_state.last_ack_packet_sent_time) {
self.exit_recovery();
}
self.update_model_and_state(self.ack_state.now);
self.update_control_parameters();
}
fn on_congestion_event(
&mut self,
now: Instant,
packet: &SentPacket,
in_persistent_congestion: bool,
lost_bytes: u64,
bytes_in_flight: u64,
) {
self.stats.bytes_in_flight = self.stats.bytes_in_flight.saturating_sub(lost_bytes);
self.stats.bytes_lost_in_total = self.stats.bytes_lost_in_total.saturating_add(lost_bytes);
self.ack_state.newly_lost_bytes =
self.ack_state.newly_lost_bytes.saturating_add(lost_bytes);
match in_persistent_congestion {
true => {
self.cwnd = self.config.min_cwnd;
self.recovery_epoch_start = None;
}
false => {
if !self.in_recovery && !self.in_recovery(packet.time_sent) {
self.enter_recovery(now);
}
}
}
}
fn congestion_window(&self) -> u64 {
self.cwnd.max(self.config.min_cwnd)
}
fn pacing_rate(&self) -> Option<u64> {
Some(self.pacing_rate)
}
fn initial_window(&self) -> u64 {
self.config.initial_cwnd
}
fn minimal_window(&self) -> u64 {
self.config.min_cwnd
}
fn in_recovery(&self, sent_time: Instant) -> bool {
self.recovery_epoch_start.is_some_and(|t| sent_time <= t)
}
fn in_slow_start(&self) -> bool {
self.state == BbrStateMachine::Startup
}
fn stats(&self) -> &CongestionStats {
&self.stats
}
}
#[cfg(test)]
mod tests {
}