use crate::bbr::{AckSample, BbrConfig, BbrController, BbrState};
pub const PROBE_BW_CYCLE_LEN: usize = 8;
pub const PROBE_BW_GAINS: [f64; PROBE_BW_CYCLE_LEN] = [1.25, 0.75, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0];
pub const PROBE_BW_PHASE_DURATION_ROUNDS: u64 = 1;
pub const PROBE_RTT_DURATION_MS: u64 = 200;
pub const MIN_PACING_BURST_BYTES: u64 = 1460;
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum BandwidthProbeEvent {
ProbeUp {
btlbw_bps: f64,
rtprop_secs: f64,
},
Drain {
bdp_bytes: u64,
},
Steady,
ProbeRtt,
ProbeRttDone {
rtprop_secs: f64,
},
NewBandwidthRecord {
btlbw_bps: f64,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CongestionState {
FillingPipe,
Stable,
Probing,
Draining,
MeasuringRtt,
}
impl CongestionState {
#[must_use]
pub fn from_bbr(state: BbrState, cycle_idx: usize) -> Self {
match state {
BbrState::Startup => Self::FillingPipe,
BbrState::Drain => Self::Draining,
BbrState::ProbeRtt => Self::MeasuringRtt,
BbrState::ProbeBw => match cycle_idx {
0 => Self::Probing,
1 => Self::Draining,
_ => Self::Stable,
},
}
}
}
impl std::fmt::Display for CongestionState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
Self::FillingPipe => "filling_pipe",
Self::Stable => "stable",
Self::Probing => "probing",
Self::Draining => "draining",
Self::MeasuringRtt => "measuring_rtt",
};
write!(f, "{s}")
}
}
#[derive(Debug, Clone, Copy)]
pub struct PacingBudget {
pub bytes_available: u64,
pub interval_secs: f64,
pub pacing_rate_bps: f64,
}
impl PacingBudget {
#[must_use]
pub fn can_send(&self) -> bool {
self.bytes_available >= MIN_PACING_BURST_BYTES
}
}
#[derive(Debug, Clone, Default)]
pub struct BbrStats {
pub ack_count: u64,
pub bytes_delivered: u64,
pub probe_up_count: u64,
pub probe_rtt_count: u64,
pub peak_bandwidth_bps: f64,
pub min_rtt_secs: f64,
pub events_emitted: u64,
}
pub struct BbrCongestionController {
core: BbrController,
prev_state: BbrState,
prev_cycle_idx: usize,
cycle_phase_start_round: u64,
round_count: u64,
prev_peak_bw: f64,
pending_events: Vec<BandwidthProbeEvent>,
stats: BbrStats,
}
impl BbrCongestionController {
#[must_use]
pub fn new(config: BbrConfig) -> Self {
let core = BbrController::new(config);
let init_state = *core.state();
Self {
core,
prev_state: init_state,
prev_cycle_idx: 0,
cycle_phase_start_round: 0,
round_count: 0,
prev_peak_bw: 0.0,
pending_events: Vec::with_capacity(4),
stats: BbrStats {
min_rtt_secs: f64::MAX,
..BbrStats::default()
},
}
}
pub fn on_ack(&mut self, sample: AckSample) -> &[BandwidthProbeEvent] {
self.pending_events.clear();
if sample.elapsed_secs <= 0.0 || sample.rtt_secs <= 0.0 {
return &self.pending_events;
}
self.stats.ack_count += 1;
self.stats.bytes_delivered += sample.delivered;
if sample.rtt_secs < self.stats.min_rtt_secs {
self.stats.min_rtt_secs = sample.rtt_secs;
}
self.round_count += 1;
self.core.on_ack(sample);
let new_state = *self.core.state();
let new_bw = self.core.btlbw();
if new_bw > self.prev_peak_bw * 1.001 {
self.pending_events
.push(BandwidthProbeEvent::NewBandwidthRecord { btlbw_bps: new_bw });
self.prev_peak_bw = new_bw;
self.stats.peak_bandwidth_bps = new_bw;
self.stats.events_emitted += 1;
}
if new_state != self.prev_state {
self.emit_state_transition_event(new_state);
self.prev_state = new_state;
}
if new_state == BbrState::ProbeBw {
let cycle_idx = self.probe_bw_cycle_idx();
if cycle_idx != self.prev_cycle_idx {
self.emit_cycle_change_event(cycle_idx);
self.prev_cycle_idx = cycle_idx;
}
}
&self.pending_events
}
#[must_use]
pub fn pacing_budget(&self, interval_secs: f64) -> PacingBudget {
let rate = self.core.pacing_rate();
let raw = (rate * interval_secs) as u64;
PacingBudget {
bytes_available: raw.max(MIN_PACING_BURST_BYTES),
interval_secs,
pacing_rate_bps: rate,
}
}
#[must_use]
pub fn congestion_state(&self) -> CongestionState {
CongestionState::from_bbr(*self.core.state(), self.probe_bw_cycle_idx())
}
#[must_use]
pub fn pacing_rate(&self) -> f64 {
self.core.pacing_rate()
}
#[must_use]
pub fn cwnd(&self) -> u64 {
self.core.cwnd()
}
#[must_use]
pub fn btlbw(&self) -> f64 {
self.core.btlbw()
}
#[must_use]
pub fn rtprop(&self) -> f64 {
self.core.rtprop()
}
#[must_use]
pub fn bdp_bytes(&self) -> u64 {
let bdp = self.core.btlbw() * self.core.rtprop();
bdp as u64
}
#[must_use]
pub fn inflight_target(&self) -> u64 {
self.core.inflight_target()
}
#[must_use]
pub fn stats(&self) -> &BbrStats {
&self.stats
}
#[must_use]
pub fn is_probing_bandwidth(&self) -> bool {
matches!(self.congestion_state(), CongestionState::Probing)
}
#[must_use]
pub fn is_measuring_rtt(&self) -> bool {
matches!(self.core.state(), BbrState::ProbeRtt)
}
#[must_use]
pub fn effective_pacing_gain(&self) -> f64 {
let bw = self.core.btlbw();
if bw > 0.0 {
self.core.pacing_rate() / bw
} else {
1.0
}
}
fn probe_bw_cycle_idx(&self) -> usize {
let gain = self.effective_pacing_gain();
if gain > 1.1 {
0 } else if gain < 0.9 {
1 } else {
2 }
}
fn emit_state_transition_event(&mut self, new_state: BbrState) {
match new_state {
BbrState::ProbeBw => {
let was_probe_rtt = self.prev_state == BbrState::ProbeRtt;
if was_probe_rtt {
self.stats.probe_rtt_count += 1;
self.pending_events.push(BandwidthProbeEvent::ProbeRttDone {
rtprop_secs: self.core.rtprop(),
});
self.stats.events_emitted += 1;
} else {
self.pending_events.push(BandwidthProbeEvent::Steady);
self.stats.events_emitted += 1;
}
}
BbrState::ProbeRtt => {
self.pending_events.push(BandwidthProbeEvent::ProbeRtt);
self.stats.events_emitted += 1;
}
BbrState::Drain => {
let bdp = self.bdp_bytes();
self.pending_events
.push(BandwidthProbeEvent::Drain { bdp_bytes: bdp });
self.stats.events_emitted += 1;
}
BbrState::Startup => {} }
}
fn emit_cycle_change_event(&mut self, new_cycle_idx: usize) {
self.cycle_phase_start_round = self.round_count;
match new_cycle_idx {
0 => {
self.stats.probe_up_count += 1;
self.pending_events.push(BandwidthProbeEvent::ProbeUp {
btlbw_bps: self.core.btlbw(),
rtprop_secs: self.core.rtprop(),
});
self.stats.events_emitted += 1;
}
1 => {
let bdp = self.bdp_bytes();
self.pending_events
.push(BandwidthProbeEvent::Drain { bdp_bytes: bdp });
self.stats.events_emitted += 1;
}
_ => {
self.pending_events.push(BandwidthProbeEvent::Steady);
self.stats.events_emitted += 1;
}
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct PacingRateAdvisory {
pub target_bitrate_bps: u64,
pub headroom: f64,
pub state: CongestionState,
}
impl PacingRateAdvisory {
#[must_use]
pub fn from_controller(ctrl: &BbrCongestionController) -> Self {
let state = ctrl.congestion_state();
let headroom = match state {
CongestionState::Stable | CongestionState::FillingPipe => 0.9,
CongestionState::Probing => 1.0,
CongestionState::Draining => 0.7,
CongestionState::MeasuringRtt => 0.5,
};
let raw_bps = ctrl.pacing_rate() * headroom;
let target_bitrate_bps = (raw_bps * 8.0) as u64;
Self {
target_bitrate_bps,
headroom,
state,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn sample(delivered: u64, elapsed: f64, rtt: f64) -> AckSample {
AckSample {
delivered,
elapsed_secs: elapsed,
rtt_secs: rtt,
is_app_limited: false,
}
}
fn default_ctrl() -> BbrCongestionController {
BbrCongestionController::new(BbrConfig::default())
}
#[test]
fn test_congestion_state_from_startup() {
let state = CongestionState::from_bbr(BbrState::Startup, 0);
assert_eq!(state, CongestionState::FillingPipe);
}
#[test]
fn test_congestion_state_from_probe_bw_cycle_0() {
let state = CongestionState::from_bbr(BbrState::ProbeBw, 0);
assert_eq!(state, CongestionState::Probing);
}
#[test]
fn test_congestion_state_from_probe_bw_cycle_1() {
let state = CongestionState::from_bbr(BbrState::ProbeBw, 1);
assert_eq!(state, CongestionState::Draining);
}
#[test]
fn test_congestion_state_from_probe_bw_steady() {
for idx in 2..8 {
let state = CongestionState::from_bbr(BbrState::ProbeBw, idx);
assert_eq!(state, CongestionState::Stable);
}
}
#[test]
fn test_congestion_state_display() {
assert_eq!(CongestionState::Stable.to_string(), "stable");
assert_eq!(CongestionState::Probing.to_string(), "probing");
assert_eq!(CongestionState::MeasuringRtt.to_string(), "measuring_rtt");
}
#[test]
fn test_initial_pacing_rate_positive() {
let ctrl = default_ctrl();
assert!(ctrl.pacing_rate() > 0.0);
}
#[test]
fn test_initial_cwnd_positive() {
let ctrl = default_ctrl();
assert!(ctrl.cwnd() > 0);
}
#[test]
fn test_initial_congestion_state_filling_pipe() {
let ctrl = default_ctrl();
assert_eq!(ctrl.congestion_state(), CongestionState::FillingPipe);
}
#[test]
fn test_pacing_budget_zero_interval_gets_minimum() {
let ctrl = default_ctrl();
let budget = ctrl.pacing_budget(0.0);
assert!(budget.bytes_available >= MIN_PACING_BURST_BYTES);
}
#[test]
fn test_pacing_budget_can_send() {
let ctrl = default_ctrl();
let budget = ctrl.pacing_budget(0.001); assert!(budget.can_send());
}
#[test]
fn test_pacing_budget_larger_interval_more_bytes() {
let ctrl = default_ctrl();
let b1 = ctrl.pacing_budget(0.001);
let b2 = ctrl.pacing_budget(0.010);
assert!(b2.bytes_available >= b1.bytes_available);
}
#[test]
fn test_ack_updates_stats() {
let mut ctrl = default_ctrl();
ctrl.on_ack(sample(10_000, 0.001, 0.01));
assert_eq!(ctrl.stats().ack_count, 1);
assert_eq!(ctrl.stats().bytes_delivered, 10_000);
}
#[test]
fn test_invalid_ack_ignored() {
let mut ctrl = default_ctrl();
ctrl.on_ack(AckSample {
delivered: 0,
elapsed_secs: 0.0,
rtt_secs: 0.0,
is_app_limited: false,
});
assert_eq!(ctrl.stats().ack_count, 0);
}
#[test]
fn test_min_rtt_tracked() {
let mut ctrl = default_ctrl();
ctrl.on_ack(sample(1000, 0.001, 0.050));
ctrl.on_ack(sample(1000, 0.001, 0.020)); ctrl.on_ack(sample(1000, 0.001, 0.080));
assert!(ctrl.stats().min_rtt_secs <= 0.020 + 1e-9);
}
#[test]
fn test_bdp_bytes_non_zero_after_acks() {
let mut ctrl = default_ctrl();
for _ in 0..5 {
ctrl.on_ack(sample(50_000, 0.001, 0.01));
}
assert!(ctrl.bdp_bytes() > 0);
}
#[test]
fn test_inflight_target_at_least_bdp() {
let mut ctrl = default_ctrl();
for _ in 0..10 {
ctrl.on_ack(sample(50_000, 0.001, 0.01));
}
assert!(ctrl.inflight_target() >= ctrl.bdp_bytes());
}
#[test]
fn test_advisory_bitrate_positive() {
let mut ctrl = default_ctrl();
ctrl.on_ack(sample(50_000, 0.001, 0.01));
let advice = PacingRateAdvisory::from_controller(&ctrl);
assert!(advice.target_bitrate_bps > 0);
}
#[test]
fn test_advisory_headroom_in_valid_range() {
let ctrl = default_ctrl();
let advice = PacingRateAdvisory::from_controller(&ctrl);
assert!(advice.headroom > 0.0 && advice.headroom <= 1.0);
}
#[test]
fn test_new_bw_record_event_emitted() {
let mut ctrl = default_ctrl();
let events = ctrl.on_ack(sample(1_000_000, 0.001, 0.01));
assert!(
events
.iter()
.any(|e| matches!(e, BandwidthProbeEvent::NewBandwidthRecord { .. })),
"expected a NewBandwidthRecord event on first high-bw sample"
);
}
#[test]
fn test_is_probing_bandwidth_false_in_startup() {
let ctrl = default_ctrl();
assert!(!ctrl.is_probing_bandwidth());
}
#[test]
fn test_effective_pacing_gain_startup() {
let ctrl = default_ctrl();
let gain = ctrl.effective_pacing_gain();
assert!(
gain > 1.5,
"startup gain should be well above 1.0, got {gain}"
);
}
#[test]
fn test_many_acks_stable() {
let mut ctrl = default_ctrl();
for i in 0..500u64 {
let rtt = 0.005 + (i % 10) as f64 * 0.001;
let delivered = 10_000 + (i % 3) * 1_000;
ctrl.on_ack(sample(delivered, 0.001, rtt));
}
assert!(ctrl.pacing_rate() > 0.0);
assert!(ctrl.cwnd() > 0);
assert!(ctrl.stats().ack_count == 500);
}
}