extern crate rand;
use std::time::{Duration, Instant};
use log::*;
use rand::Rng;
use super::delivery_rate::DeliveryRateEstimator;
use super::minmax::MinMax;
use super::{CongestionController, CongestionStats};
use crate::connection::rtt::RttEstimator;
use crate::connection::space::{RateSamplePacketState, SentPacket};
use crate::RecoveryConfig;
#[derive(Debug)]
pub struct Bbr3Config {
min_cwnd: u64,
initial_cwnd: u64,
initial_rtt: Option<Duration>,
max_datagram_size: u64,
full_bw_count_threshold: u64,
full_bw_growth_rate: f64,
probe_rtt_duration: Duration,
probe_rtt_interval: Duration,
loss_threshold: f64,
full_loss_count: u64,
beta: f64,
headroom: f64,
}
impl Bbr3Config {
pub fn from(conf: &RecoveryConfig) -> Self {
let max_datagram_size = conf.max_datagram_size as u64;
let min_cwnd = conf.min_congestion_window.saturating_mul(max_datagram_size);
let initial_cwnd = conf
.initial_congestion_window
.saturating_mul(max_datagram_size);
Self {
min_cwnd,
initial_cwnd,
initial_rtt: Some(conf.initial_rtt),
max_datagram_size,
..Self::default()
}
}
}
impl Default for Bbr3Config {
fn default() -> Self {
Self {
min_cwnd: 4 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_cwnd: 80 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_rtt: Some(crate::INITIAL_RTT),
max_datagram_size: crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
full_bw_count_threshold: FULL_BW_COUNT_THRESHOLD,
full_bw_growth_rate: FULL_BW_GROWTH_RATE,
probe_rtt_duration: PROBE_RTT_DURATION,
probe_rtt_interval: PROBE_RTT_INTERVAL,
loss_threshold: LOSS_THRESH,
full_loss_count: FULL_LOSS_COUNT,
beta: BETA,
headroom: HEADROOM,
}
}
}
const STARTUP_PACING_GAIN: f64 = 2.77;
const PACING_MARGIN_PERCENT: f64 = 0.01;
const LOSS_THRESH: f64 = 0.02;
const FULL_LOSS_COUNT: u64 = 6;
const BETA: f64 = 0.7;
const HEADROOM: f64 = 0.85;
const MIN_PIPE_CWND_IN_SMSS: u64 = 4;
const EXTRA_ACKED_FILTER_LEN: u64 = 10;
const MIN_RTT_FILTER_LEN: Duration = Duration::from_secs(10);
const PROBE_RTT_DURATION: Duration = Duration::from_millis(200);
const PROBE_RTT_INTERVAL: Duration = Duration::from_secs(5);
const FULL_BW_COUNT_THRESHOLD: u64 = 3;
const FULL_BW_GROWTH_RATE: f64 = 0.25;
const PROBE_BW_MAX_ROUNDS: u64 = 63;
const PROBE_BW_RAND_ROUNDS: u64 = 2;
const PROBE_BW_MIN_WAIT_TIME_IN_MSEC: u64 = 2000;
const PROBE_BW_MAX_WAIT_TIME_IN_MSEC: u64 = 3000;
const SEND_QUANTUM_THRESHOLD_PACING_RATE: u64 = 1_200_000 / 8;
#[derive(Debug, PartialEq, Eq)]
enum State {
Startup,
Drain,
ProbeBwDown,
ProbeBwCruise,
ProbeBwRefill,
ProbeBwUp,
ProbeRTT,
}
#[derive(Debug, Default)]
struct RoundTripCounter {
pub round_count: u64,
pub is_round_start: bool,
pub next_round_delivered: u64,
}
#[derive(Debug, Default)]
struct FullPipeEstimator {
is_filled_pipe: bool,
full_bw: u64,
full_bw_count: u64,
}
#[derive(Debug)]
struct AckState {
now: Instant,
newly_lost_bytes: u64,
newly_acked_bytes: u64,
packet_delivered: u64,
last_ack_packet_sent_time: Instant,
prior_bytes_in_flight: u64,
tx_in_flight: u64,
lost: u64,
}
impl Default for AckState {
fn default() -> Self {
let now = Instant::now();
Self {
now,
newly_lost_bytes: 0,
newly_acked_bytes: 0,
packet_delivered: 0,
last_ack_packet_sent_time: now,
prior_bytes_in_flight: 0,
tx_in_flight: 0,
lost: 0,
}
}
}
#[derive(Default, Debug)]
struct SimpleMaxFilter {
bw: [u64; 2],
}
impl SimpleMaxFilter {
fn new() -> Self {
Self {
bw: [Default::default(); 2],
}
}
fn max_bw(&self) -> u64 {
self.bw[0].max(self.bw[1])
}
}
#[derive(Debug, PartialEq, Eq)]
enum AckProbePhase {
Init,
Stopping,
Refilling,
Starting,
Feedback,
}
#[derive(Debug)]
pub struct Bbr3 {
config: Bbr3Config,
stats: CongestionStats,
pacing_rate: u64,
send_quantum: u64,
cwnd: u64,
pacing_gain: f64,
cwnd_gain: f64,
packet_conservation: bool,
state: State,
round: RoundTripCounter,
idle_restart: bool,
max_bw: u64,
bw_hi: u64,
bw_lo: u64,
bw: u64,
min_rtt: Duration,
bdp: u64,
extra_acked: u64,
offload_budget: u64,
max_inflight: u64,
inflight_hi: u64,
inflight_lo: u64,
bw_latest: u64,
inflight_latest: u64,
max_bw_filter: SimpleMaxFilter,
cycle_count: u64,
cycle_stamp: Instant,
ack_phase: AckProbePhase,
extra_acked_interval_start: Option<Instant>,
extra_acked_delivered: u64,
extra_acked_filter: MinMax,
full_pipe: FullPipeEstimator,
min_rtt_stamp: Instant,
probe_rtt_min_delay: Duration,
probe_rtt_min_stamp: Instant,
probe_rtt_expired: bool,
probe_rtt_done_stamp: Option<Instant>,
probe_rtt_round_done: bool,
delivery_rate_estimator: DeliveryRateEstimator,
ack_state: AckState,
rounds_since_bw_probe: u64,
bw_probe_wait: Duration,
bw_probe_up_cnt: u64,
bw_probe_up_acks: u64,
bw_probe_up_rounds: u64,
bw_probe_samples: bool,
loss_round_start: bool,
loss_in_round: bool,
prior_cwnd: u64,
in_recovery: bool,
loss_round_delivered: u64,
loss_events_in_round: u64,
recovery_epoch_start: Option<Instant>,
}
impl Bbr3 {
pub fn new(config: Bbr3Config) -> Self {
let now = Instant::now();
let initial_cwnd = config.initial_cwnd;
let mut bbr3 = Self {
config,
stats: Default::default(),
pacing_rate: 0,
send_quantum: 0,
cwnd: initial_cwnd,
pacing_gain: 2.77,
cwnd_gain: 2.0,
packet_conservation: false,
state: State::Startup,
round: Default::default(),
idle_restart: false,
max_bw: 0,
bw_hi: 0,
bw_lo: 0,
bw: 0,
min_rtt: Duration::MAX,
bdp: 0,
extra_acked: 0,
offload_budget: 0,
max_inflight: 0,
inflight_hi: 0,
inflight_lo: 0,
bw_latest: 0,
inflight_latest: 0,
max_bw_filter: SimpleMaxFilter::new(),
cycle_count: 0,
cycle_stamp: now,
ack_phase: AckProbePhase::Init,
extra_acked_interval_start: Some(now),
extra_acked_delivered: 0,
extra_acked_filter: MinMax::new(EXTRA_ACKED_FILTER_LEN),
full_pipe: Default::default(),
min_rtt_stamp: now,
probe_rtt_min_delay: Duration::MAX,
probe_rtt_min_stamp: now,
probe_rtt_expired: false,
probe_rtt_done_stamp: Some(now),
probe_rtt_round_done: false,
delivery_rate_estimator: DeliveryRateEstimator::default(),
ack_state: Default::default(),
rounds_since_bw_probe: 0,
bw_probe_wait: Duration::MAX,
bw_probe_up_cnt: 0,
bw_probe_up_acks: 0,
bw_probe_up_rounds: 0,
bw_probe_samples: false,
loss_round_start: false,
loss_in_round: false,
prior_cwnd: 0,
in_recovery: false,
loss_round_delivered: 0,
loss_events_in_round: 0,
recovery_epoch_start: Some(now),
};
bbr3.init();
bbr3
}
fn init(&mut self) {
let now = Instant::now();
self.min_rtt = std::cmp::max(
self.config.initial_rtt.unwrap_or(crate::INITIAL_RTT),
Duration::from_micros(1),
);
self.min_rtt_stamp = now;
self.probe_rtt_done_stamp = None;
self.probe_rtt_round_done = false;
self.prior_cwnd = 0;
self.idle_restart = false;
self.extra_acked_interval_start = Some(now);
self.extra_acked_delivered = 0;
self.ack_phase = AckProbePhase::Init;
self.bw_hi = u64::MAX;
self.inflight_hi = u64::MAX;
self.reset_congestion_signals();
self.reset_lower_bounds();
self.init_round_counting();
self.init_full_pipe();
self.init_pacing_rate();
self.enter_startup();
}
fn init_round_counting(&mut self) {
self.round.next_round_delivered = 0;
self.round.round_count = 0;
self.round.is_round_start = false;
}
fn start_round(&mut self) {
self.round.next_round_delivered = self.delivery_rate_estimator.delivered();
}
fn update_round(&mut self) {
if self.ack_state.packet_delivered >= self.round.next_round_delivered {
self.start_round();
self.round.round_count += 1;
self.rounds_since_bw_probe += 1;
self.round.is_round_start = true;
self.packet_conservation = false;
} else {
self.round.is_round_start = false;
}
}
pub fn is_filled_pipe(&self) -> bool {
self.full_pipe.is_filled_pipe
}
pub fn is_round_start(&self) -> bool {
self.round.is_round_start
}
fn save_cwnd(&mut self) {
self.prior_cwnd = if !self.in_recovery && self.state != State::ProbeRTT {
self.cwnd
} else {
self.cwnd.max(self.prior_cwnd)
}
}
fn restore_cwnd(&mut self) {
self.cwnd = self.cwnd.max(self.prior_cwnd)
}
fn congestion_window(&self) -> u64 {
self.cwnd.max(self.config.min_cwnd)
}
fn pacing_rate(&self) -> Option<u64> {
Some(self.pacing_rate)
}
fn initial_window(&self) -> u64 {
self.config.initial_cwnd
}
fn minimal_window(&self) -> u64 {
self.config.min_cwnd
}
fn in_recovery(&self, sent_time: Instant) -> bool {
self.recovery_epoch_start.is_some_and(|t| sent_time <= t)
}
fn in_slow_start(&self) -> bool {
self.state == State::Startup
}
fn stats(&self) -> &CongestionStats {
&self.stats
}
fn update_model_and_state(&mut self, now: Instant, bytes_in_flight: u64) {
self.update_latest_delivery_signals();
self.update_congestion_signals();
self.update_ack_aggregation(now);
self.check_startup_done();
self.check_drain(now, bytes_in_flight);
self.update_probe_bw_cycle_phase(now, bytes_in_flight);
self.update_min_rtt(now);
self.check_probe_rtt(now, bytes_in_flight);
self.advance_latest_delivery_signals();
self.bound_bw_for_model();
}
fn update_control_parameters(&mut self) {
self.set_pacing_rate();
self.set_send_quantum();
self.set_cwnd();
}
fn update_on_loss(&mut self, now: Instant, packet: &SentPacket) {
self.handle_lost_packet(now, packet);
}
fn update_gains(&mut self) {
match self.state {
State::Startup => {
self.pacing_gain = 2.77;
self.cwnd_gain = 2.0;
}
State::Drain => {
self.pacing_gain = 0.5;
self.cwnd_gain = 2.0;
}
State::ProbeBwDown => {
self.pacing_gain = 0.9;
self.cwnd_gain = 2.0;
}
State::ProbeBwCruise => {
self.pacing_gain = 1.0;
self.cwnd_gain = 2.0;
}
State::ProbeBwRefill => {
self.pacing_gain = 1.0;
self.cwnd_gain = 2.0;
}
State::ProbeBwUp => {
self.pacing_gain = 1.25;
self.cwnd_gain = 2.25;
}
State::ProbeRTT => {
self.pacing_gain = 1.0;
self.cwnd_gain = 0.5;
}
}
}
fn enter_startup(&mut self) {
self.state = State::Startup;
self.update_gains();
}
fn check_startup_done(&mut self) {
self.check_startup_full_bandwidth();
self.check_startup_high_loss();
if self.state == State::Startup && self.full_pipe.is_filled_pipe {
self.enter_drain();
}
}
fn init_full_pipe(&mut self) {
self.full_pipe.is_filled_pipe = false;
self.full_pipe.full_bw = 0;
self.full_pipe.full_bw_count = 0;
}
fn check_startup_full_bandwidth(&mut self) {
if self.is_filled_pipe()
|| !self.is_round_start()
|| self.delivery_rate_estimator.is_sample_app_limited()
{
return;
}
if self.max_bw
>= (self.full_pipe.full_bw as f64 * (1.0_f64 + self.config.full_bw_growth_rate)) as u64
{
self.full_pipe.full_bw = self.max_bw;
self.full_pipe.full_bw_count = 0;
return;
}
self.full_pipe.full_bw_count += 1;
if self.full_pipe.full_bw_count >= self.config.full_bw_count_threshold {
self.full_pipe.is_filled_pipe = true;
}
}
fn handle_queue_too_high_in_startup(&mut self) {
self.full_pipe.is_filled_pipe = true;
self.inflight_hi = self.inflight(1.0).max(self.inflight_latest);
}
fn check_startup_high_loss(&mut self) {
if self.ack_state.lost > 0 && self.loss_events_in_round < 0xf {
self.loss_events_in_round += 1;
}
if self.loss_round_start
&& self.in_recovery
&& self.loss_events_in_round >= self.config.full_loss_count
&& self.is_inflight_too_high()
{
self.handle_queue_too_high_in_startup()
}
if self.loss_round_start {
self.loss_events_in_round = 0;
}
}
fn enter_drain(&mut self) {
self.state = State::Drain;
self.update_gains();
}
fn check_drain(&mut self, now: Instant, bytes_in_flight: u64) {
if self.state == State::Drain && bytes_in_flight <= self.inflight(1.0) {
self.enter_probe_bw(now);
}
}
fn check_time_to_probe_bw(&mut self, now: Instant) -> bool {
if self.has_elapsed_in_phase(now, self.bw_probe_wait)
|| self.is_reno_coexistence_probe_time()
{
self.start_probe_bw_refill();
return true;
}
false
}
fn pick_probe_wait(&mut self) {
self.rounds_since_bw_probe = rand::thread_rng().gen_range(0..PROBE_BW_RAND_ROUNDS);
self.bw_probe_wait = Duration::from_millis(
rand::thread_rng()
.gen_range(PROBE_BW_MIN_WAIT_TIME_IN_MSEC..PROBE_BW_MAX_WAIT_TIME_IN_MSEC),
);
}
fn is_reno_coexistence_probe_time(&self) -> bool {
let reno_rounds = self.target_inflight();
let rounds = reno_rounds.min(PROBE_BW_MAX_ROUNDS);
self.rounds_since_bw_probe >= rounds
}
fn target_inflight(&self) -> u64 {
self.bdp.min(self.cwnd)
}
fn enter_probe_bw(&mut self, now: Instant) {
self.start_probe_bw_down(now);
}
fn start_probe_bw_down(&mut self, now: Instant) {
self.reset_congestion_signals();
self.bw_probe_up_cnt = u64::MAX;
self.pick_probe_wait();
self.cycle_stamp = now;
self.ack_phase = AckProbePhase::Stopping;
self.start_round();
self.state = State::ProbeBwDown;
}
fn start_probe_bw_cruise(&mut self) {
self.state = State::ProbeBwCruise;
}
fn start_probe_bw_refill(&mut self) {
self.reset_lower_bounds();
self.bw_probe_up_rounds = 0;
self.bw_probe_up_acks = 0;
self.ack_phase = AckProbePhase::Refilling;
self.start_round();
self.state = State::ProbeBwRefill;
}
fn start_probe_bw_up(&mut self, now: Instant) {
self.ack_phase = AckProbePhase::Starting;
self.start_round();
self.full_pipe.full_bw = self.delivery_rate_estimator.delivery_rate();
self.cycle_stamp = now;
self.state = State::ProbeBwUp;
self.raise_inflight_hi_slope();
}
fn update_probe_bw_cycle_phase(&mut self, now: Instant, bytes_in_flight: u64) {
if !self.filled_pipe() {
return;
}
self.adapt_upper_bounds(now);
if !self.is_in_a_probe_bw_state() {
return;
}
match self.state {
State::ProbeBwDown => {
if self.check_time_to_probe_bw(now) {
return;
}
if self.check_time_to_cruise(bytes_in_flight) {
self.start_probe_bw_cruise();
}
}
State::ProbeBwCruise => {
self.check_time_to_probe_bw(now);
}
State::ProbeBwRefill => {
if self.is_round_start() {
self.bw_probe_samples = true;
self.start_probe_bw_up(now);
}
}
State::ProbeBwUp => {
if self.has_elapsed_in_phase(now, self.min_rtt)
&& bytes_in_flight > self.inflight(self.pacing_gain)
{
self.start_probe_bw_down(now);
}
}
_ => {}
}
}
fn is_in_a_probe_bw_state(&self) -> bool {
matches!(
self.state,
State::ProbeBwDown | State::ProbeBwCruise | State::ProbeBwRefill | State::ProbeBwUp
)
}
fn check_time_to_cruise(&mut self, bytes_in_flight: u64) -> bool {
if bytes_in_flight > self.inflight_with_headroom() {
return false;
}
if bytes_in_flight <= self.inflight(1.0) {
return true;
}
false
}
fn has_elapsed_in_phase(&self, now: Instant, interval: Duration) -> bool {
now > self.cycle_stamp + interval
}
fn inflight_with_headroom(&self) -> u64 {
if self.inflight_hi == u64::MAX {
return u64::MAX;
}
let headroom = ((self.config.headroom * self.inflight_hi as f64) as u64).max(1);
self.inflight_hi
.saturating_sub(headroom)
.max(self.config.min_cwnd)
}
fn raise_inflight_hi_slope(&mut self) {
let growth_this_round = 1 << self.bw_probe_up_rounds;
self.bw_probe_up_rounds = self.bw_probe_up_rounds.saturating_add(1).min(30);
self.bw_probe_up_cnt = (self.cwnd / growth_this_round).max(1);
}
fn probe_inflight_hi_upward(&mut self, is_cwnd_limited: bool) {
if !is_cwnd_limited || self.cwnd < self.inflight_hi {
return;
}
self.bw_probe_up_acks += self.ack_state.newly_acked_bytes;
if self.bw_probe_up_acks >= self.bw_probe_up_cnt {
let delta = self.bw_probe_up_acks / self.bw_probe_up_cnt;
self.bw_probe_up_acks = self
.bw_probe_up_acks
.saturating_sub(delta * self.bw_probe_up_cnt);
self.inflight_hi = self
.inflight_hi
.saturating_add(delta * self.config.max_datagram_size);
}
if self.is_round_start() {
self.raise_inflight_hi_slope();
}
}
fn adapt_upper_bounds(&mut self, now: Instant) {
if self.ack_phase == AckProbePhase::Starting && self.is_round_start() {
self.ack_phase = AckProbePhase::Feedback;
}
if self.ack_phase == AckProbePhase::Stopping && self.is_round_start() {
self.bw_probe_samples = false;
self.ack_phase = AckProbePhase::Init;
if self.is_in_a_probe_bw_state() && !self.is_app_limited() {
self.advance_max_bw_filter();
}
}
if !self.check_inflight_too_high(now) {
if self.inflight_hi == u64::MAX {
return;
}
if self.ack_state.tx_in_flight > self.inflight_hi {
self.inflight_hi = self.ack_state.tx_in_flight;
}
if self.delivery_rate_estimator.delivery_rate() > self.bw_hi {
self.bw_hi = self.delivery_rate_estimator.delivery_rate();
}
if self.state == State::ProbeBwUp {
self.probe_inflight_hi_upward(true);
}
}
}
fn is_app_limited(&self) -> bool {
self.delivery_rate_estimator.is_app_limited()
}
fn update_min_rtt(&mut self, now: Instant) {
let sample_rtt = self.delivery_rate_estimator.sample_rtt();
self.probe_rtt_expired = now.saturating_duration_since(self.probe_rtt_min_stamp)
> self.config.probe_rtt_interval;
if !sample_rtt.is_zero()
&& (sample_rtt <= self.probe_rtt_min_delay || self.probe_rtt_expired)
{
self.probe_rtt_min_delay = sample_rtt;
self.probe_rtt_min_stamp = now;
}
let min_rtt_expired =
now.saturating_duration_since(self.min_rtt_stamp) > MIN_RTT_FILTER_LEN;
if self.probe_rtt_min_delay < self.min_rtt || min_rtt_expired {
self.min_rtt = self.probe_rtt_min_delay;
self.min_rtt_stamp = self.probe_rtt_min_stamp;
}
}
fn check_probe_rtt(&mut self, now: Instant, bytes_in_flight: u64) {
if self.state != State::ProbeRTT && self.probe_rtt_expired && !self.idle_restart {
self.enter_probe_rtt();
self.save_cwnd();
self.probe_rtt_done_stamp = None;
self.ack_phase = AckProbePhase::Stopping;
self.start_round();
}
if self.state == State::ProbeRTT {
self.handle_probe_rtt(now, bytes_in_flight);
}
if self.delivery_rate_estimator.delivered() > 0 {
self.idle_restart = false;
}
}
fn enter_probe_rtt(&mut self) {
self.state = State::ProbeRTT;
self.update_gains();
}
fn handle_probe_rtt(&mut self, now: Instant, bytes_in_flight: u64) {
self.mark_connection_app_limited();
if let Some(probe_rtt_done_stamp) = self.probe_rtt_done_stamp {
if self.is_round_start() {
self.probe_rtt_round_done = true;
}
if self.probe_rtt_round_done {
self.check_probe_rtt_done(now);
}
} else if bytes_in_flight <= self.probe_rtt_cwnd() {
self.probe_rtt_done_stamp = Some(now + self.config.probe_rtt_duration);
self.probe_rtt_round_done = false;
self.start_round();
}
}
fn check_probe_rtt_done(&mut self, now: Instant) {
if let Some(probe_rtt_done_stamp) = self.probe_rtt_done_stamp {
if now > probe_rtt_done_stamp {
self.probe_rtt_min_stamp = now;
self.restore_cwnd();
self.exit_probe_rtt(now);
}
}
}
fn mark_connection_app_limited(&mut self) {
self.delivery_rate_estimator.set_app_limited(true);
}
fn exit_probe_rtt(&mut self, now: Instant) {
self.reset_lower_bounds();
if self.is_filled_pipe() {
self.start_probe_bw_down(now);
self.start_probe_bw_cruise();
} else {
self.enter_startup();
}
}
fn handle_restart_from_idle(&mut self, now: Instant, bytes_in_flight: u64) {
if bytes_in_flight == 0 && self.is_app_limited() {
self.idle_restart = true;
self.extra_acked_interval_start = Some(now);
if self.is_in_a_probe_bw_state() {
self.set_pacing_rate_with_gain(1.0);
} else if self.state == State::ProbeRTT {
self.check_probe_rtt_done(now);
}
}
}
fn update_max_bw(&mut self) {
let bw = self.delivery_rate_estimator.delivery_rate();
self.update_round();
if bw >= self.max_bw || !self.is_app_limited() {
self.max_bw_filter.bw[1] = bw.max(self.max_bw_filter.bw[1]);
self.max_bw = self.max_bw_filter.max_bw();
}
}
fn advance_max_bw_filter(&mut self) {
self.cycle_count += 1;
if self.max_bw_filter.bw[1] == 0 {
return;
}
self.max_bw_filter.bw[0] = self.max_bw_filter.bw[1];
self.max_bw_filter.bw[1] = 0;
}
fn update_offload_budget(&mut self) {
self.offload_budget = 3 * self.send_quantum;
}
fn update_ack_aggregation(&mut self, now: Instant) {
if let Some(extra_acked_interval_start) = self.extra_acked_interval_start {
let interval = now.saturating_duration_since(extra_acked_interval_start);
let mut expected_delivered =
(self.bw as u128).saturating_mul(interval.as_micros()) as u64 / 1_000_000;
if self.extra_acked_delivered <= expected_delivered {
self.extra_acked_delivered = 0;
self.extra_acked_interval_start = Some(now);
expected_delivered = 0;
}
self.extra_acked_delivered = self
.extra_acked_delivered
.saturating_add(self.ack_state.newly_acked_bytes);
let extra = self
.extra_acked_delivered
.saturating_sub(expected_delivered)
.min(self.cwnd);
self.extra_acked_filter
.update_max(self.round.round_count, extra);
} else {
self.extra_acked_delivered = 0;
self.extra_acked_interval_start = Some(now);
}
}
fn check_inflight_too_high(&mut self, now: Instant) -> bool {
if self.is_inflight_too_high() {
if self.bw_probe_samples {
self.handle_inflight_too_high(now);
}
return true;
}
false
}
fn is_inflight_too_high(&self) -> bool {
self.ack_state.lost
> (self.ack_state.tx_in_flight as f64 * self.config.loss_threshold) as u64
}
fn handle_inflight_too_high(&mut self, now: Instant) {
self.bw_probe_samples = false;
if !self.is_app_limited() {
self.inflight_hi = ((self.target_inflight() as f64 * self.config.beta) as u64)
.max(self.ack_state.tx_in_flight);
}
if self.state == State::ProbeBwUp {
self.start_probe_bw_down(now);
}
}
fn handle_lost_packet(&mut self, now: Instant, packet: &SentPacket) {
if !self.bw_probe_samples && !self.in_slow_start() {
return;
}
self.ack_state.tx_in_flight = packet.rate_sample_state.tx_in_flight;
self.ack_state.lost = self.stats.bytes_lost_in_total - packet.rate_sample_state.lost;
self.delivery_rate_estimator
.set_app_limited(packet.rate_sample_state.is_app_limited);
if self.is_inflight_too_high() {
self.ack_state.tx_in_flight = self.inflight_hi_from_lost_packet(packet);
self.handle_inflight_too_high(now);
}
}
fn inflight_hi_from_lost_packet(&mut self, packet: &SentPacket) -> u64 {
let size = packet.sent_size as u64;
let inflight_prev = packet.rate_sample_state.tx_in_flight.saturating_sub(size);
let lost_prev = self.ack_state.lost.saturating_sub(size);
let lost_prefix = (inflight_prev as f64 * self.config.loss_threshold - lost_prev as f64)
/ (1.0_f64 - self.config.loss_threshold);
inflight_prev.saturating_add(lost_prefix as u64)
}
fn update_latest_delivery_signals(&mut self) {
self.loss_round_start = false;
self.bw_latest = self
.bw_latest
.max(self.delivery_rate_estimator.delivery_rate());
self.inflight_latest = self
.inflight_latest
.max(self.delivery_rate_estimator.sample_delivered());
if self.delivery_rate_estimator.sample_prior_delivered() >= self.loss_round_delivered {
self.loss_round_delivered = self.delivery_rate_estimator.delivered();
self.loss_round_start = true;
}
}
fn advance_latest_delivery_signals(&mut self) {
if self.loss_round_start {
self.bw_latest = self.delivery_rate_estimator.delivery_rate();
self.inflight_latest = self.delivery_rate_estimator.sample_delivered();
}
}
fn reset_congestion_signals(&mut self) {
self.loss_in_round = false;
self.bw_latest = 0;
self.inflight_latest = 0;
}
fn update_congestion_signals(&mut self) {
self.update_max_bw();
if self.ack_state.lost > 0 {
self.loss_in_round = true;
}
if !self.loss_round_start {
return;
}
self.adapt_lower_bounds_from_congestion();
self.loss_in_round = true;
}
fn is_probing_bw(&self) -> bool {
matches!(
self.state,
State::Startup | State::ProbeBwRefill | State::ProbeBwUp
)
}
fn adapt_lower_bounds_from_congestion(&mut self) {
if self.is_probing_bw() {
return;
}
if self.loss_in_round {
self.init_lower_bounds();
self.loss_lower_bounds();
}
}
fn init_lower_bounds(&mut self) {
if self.bw_lo == u64::MAX {
self.bw_lo = self.max_bw;
}
if self.inflight_lo == u64::MAX {
self.inflight_lo = self.cwnd;
}
}
fn loss_lower_bounds(&mut self) {
self.bw_lo = self
.bw_latest
.max((self.bw_lo as f64 * self.config.beta) as u64);
self.inflight_lo = self
.inflight_latest
.max((self.inflight_lo as f64 * self.config.beta) as u64);
}
fn reset_lower_bounds(&mut self) {
self.bw_lo = u64::MAX;
self.inflight_lo = u64::MAX;
}
fn bound_bw_for_model(&mut self) {
self.bw = self.max_bw.min(self.bw_lo).min(self.bw_hi);
}
fn init_pacing_rate(&mut self) {
let srtt = match self.config.initial_rtt {
Some(rtt) => rtt,
_ => Duration::from_millis(1),
};
let nominal_bandwidth = self.config.initial_cwnd as f64 / srtt.as_secs_f64();
self.pacing_rate = (self.pacing_gain * nominal_bandwidth) as u64;
}
fn set_pacing_rate_with_gain(&mut self, pacing_gain: f64) {
let rate = (pacing_gain * self.bw as f64 * (1.0_f64 - PACING_MARGIN_PERCENT)) as u64;
if self.is_filled_pipe() || rate > self.pacing_rate {
self.pacing_rate = rate;
}
}
fn set_pacing_rate(&mut self) {
self.set_pacing_rate_with_gain(self.pacing_gain);
}
fn set_send_quantum(&mut self) {
let floor = if self.pacing_rate < SEND_QUANTUM_THRESHOLD_PACING_RATE {
self.config.max_datagram_size
} else {
2 * self.config.max_datagram_size
};
self.send_quantum = (self.pacing_rate / 1000).clamp(floor, 64 * 1024);
}
fn bdp_multiple(&mut self, bw: u64, gain: f64) -> u64 {
if self.min_rtt == Duration::MAX {
return self.config.initial_cwnd;
}
let bdp = bw as f64 * self.min_rtt.as_secs_f64();
self.bdp = bdp as u64;
(gain * bdp) as u64
}
fn quantization_budget(&mut self, bytes_in_flight: u64) -> u64 {
self.update_offload_budget();
let mut inflight = bytes_in_flight
.max(self.offload_budget)
.max(self.config.min_cwnd);
if self.state == State::ProbeBwUp {
inflight = inflight.saturating_add(2 * self.config.max_datagram_size);
}
inflight
}
fn filled_pipe(&self) -> bool {
self.full_pipe.is_filled_pipe
}
fn inflight(&mut self, gain: f64) -> u64 {
let inflight = self.bdp_multiple(self.max_bw, gain);
self.quantization_budget(inflight)
}
fn update_aggregation_budget(&mut self) {
}
fn update_max_inflight(&mut self) {
self.update_aggregation_budget();
let mut inflight = self.bdp_multiple(self.max_bw, self.cwnd_gain);
inflight = inflight.saturating_add(self.extra_acked_filter.get());
self.max_inflight = self.quantization_budget(inflight);
}
fn enter_recovery(&mut self, now: Instant) {
self.save_cwnd();
self.recovery_epoch_start = Some(now);
self.cwnd = self.stats.bytes_in_flight
+ self
.ack_state
.newly_acked_bytes
.max(self.config.max_datagram_size);
self.packet_conservation = true;
self.in_recovery = true;
self.start_round();
}
fn exit_recovery(&mut self) {
self.recovery_epoch_start = None;
self.packet_conservation = false;
self.in_recovery = false;
self.restore_cwnd();
}
fn probe_rtt_cwnd(&mut self) -> u64 {
self.bdp_multiple(self.bw, self.cwnd_gain)
.max(self.config.min_cwnd)
}
fn bound_cwnd_for_probe_rtt(&mut self) {
if self.state == State::ProbeRTT {
self.cwnd = self.cwnd.min(self.probe_rtt_cwnd());
}
}
fn modulate_cwnd_for_recovery(&mut self, bytes_in_flight: u64) {
if self.ack_state.newly_lost_bytes > 0 {
self.cwnd = self
.cwnd
.saturating_sub(self.ack_state.newly_lost_bytes)
.max(self.config.min_cwnd);
}
if self.packet_conservation {
self.cwnd = self
.cwnd
.max(bytes_in_flight + self.ack_state.newly_acked_bytes);
}
}
fn set_cwnd(&mut self) {
let bytes_in_flight = self.stats.bytes_in_flight;
self.update_max_inflight();
self.modulate_cwnd_for_recovery(bytes_in_flight);
if !self.packet_conservation {
if self.is_filled_pipe() {
self.cwnd = self
.max_inflight
.min(self.cwnd + self.ack_state.newly_acked_bytes);
} else if self.cwnd < self.max_inflight
|| self.delivery_rate_estimator.delivered() < self.config.initial_cwnd
{
self.cwnd = self.cwnd.saturating_add(self.ack_state.newly_acked_bytes);
}
self.cwnd = self.cwnd.max(self.config.min_cwnd);
}
self.bound_cwnd_for_probe_rtt();
self.bound_cwnd_for_model();
}
fn bound_cwnd_for_model(&mut self) {
let mut cap = u64::MAX;
if self.is_in_a_probe_bw_state() && self.state != State::ProbeBwCruise {
cap = self.inflight_hi;
} else if self.state == State::ProbeRTT || self.state == State::ProbeBwCruise {
cap = self.inflight_with_headroom();
}
cap = cap.min(self.inflight_lo);
cap = cap.max(self.config.min_cwnd);
self.cwnd = self.cwnd.min(cap);
}
}
impl CongestionController for Bbr3 {
fn name(&self) -> &str {
"BBRv3"
}
fn congestion_window(&self) -> u64 {
self.cwnd.max(self.config.min_cwnd)
}
fn initial_window(&self) -> u64 {
self.config.initial_cwnd
}
fn minimal_window(&self) -> u64 {
self.config.min_cwnd
}
fn stats(&self) -> &CongestionStats {
&self.stats
}
fn on_sent(&mut self, now: Instant, packet: &mut SentPacket, bytes_in_flight: u64) {
self.delivery_rate_estimator.on_packet_sent(
packet,
self.stats.bytes_in_flight,
self.stats.bytes_lost_in_total,
);
self.handle_restart_from_idle(now, self.stats.bytes_in_flight);
self.stats.bytes_in_flight += packet.sent_size as u64;
}
fn begin_ack(&mut self, now: Instant, bytes_in_flight: u64) {
self.ack_state.newly_acked_bytes = 0;
self.ack_state.newly_lost_bytes = 0;
self.ack_state.packet_delivered = 0;
self.ack_state.last_ack_packet_sent_time = now;
self.ack_state.prior_bytes_in_flight = self.stats.bytes_in_flight;
self.ack_state.now = now;
self.ack_state.tx_in_flight = 0;
self.ack_state.lost = 0;
}
fn on_ack(
&mut self,
packet: &mut SentPacket,
now: Instant,
_app_limited: bool,
_rtt: &RttEstimator,
bytes_in_flight: u64,
) {
self.delivery_rate_estimator.update_rate_sample(packet);
self.stats.bytes_in_flight = self
.stats
.bytes_in_flight
.saturating_sub(packet.sent_size as u64);
self.stats.bytes_acked_in_total = self
.stats
.bytes_acked_in_total
.saturating_add(packet.sent_size as u64);
if self.in_slow_start() {
self.stats.bytes_acked_in_slow_start = self
.stats
.bytes_acked_in_slow_start
.saturating_add(packet.sent_size as u64);
}
self.ack_state.newly_acked_bytes += packet.sent_size as u64;
self.ack_state.last_ack_packet_sent_time = packet.time_sent;
self.ack_state.packet_delivered = self
.ack_state
.packet_delivered
.max(packet.rate_sample_state.delivered);
}
fn end_ack(&mut self) {
let bytes_in_flight: u64 = self.stats.bytes_in_flight;
self.delivery_rate_estimator.generate_rate_sample();
if self.in_recovery && !self.in_recovery(self.ack_state.last_ack_packet_sent_time) {
self.exit_recovery();
}
self.update_model_and_state(self.ack_state.now, bytes_in_flight);
self.update_gains();
self.update_control_parameters();
}
fn on_congestion_event(
&mut self,
now: Instant,
packet: &SentPacket,
in_persistent_congestion: bool,
lost_bytes: u64,
bytes_in_flight: u64,
) {
self.stats.bytes_in_flight = self.stats.bytes_in_flight.saturating_sub(lost_bytes);
self.stats.bytes_lost_in_total = self.stats.bytes_lost_in_total.saturating_add(lost_bytes);
self.ack_state.newly_lost_bytes =
self.ack_state.newly_lost_bytes.saturating_add(lost_bytes);
self.update_on_loss(now, packet);
match in_persistent_congestion {
true => {
self.cwnd = self.config.min_cwnd;
self.recovery_epoch_start = None;
}
false => {
if !self.in_recovery && !self.in_recovery(packet.time_sent) {
self.enter_recovery(now);
}
}
}
}
fn pacing_rate(&self) -> Option<u64> {
Some(self.pacing_rate)
}
}
#[cfg(test)]
mod tests {}