use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use log::*;
use super::CongestionController;
use super::CongestionStats;
use super::HystartPlusPlus;
use crate::connection::rtt::RttEstimator;
use crate::connection::space::SentPacket;
use crate::RecoveryConfig;
const C: f64 = 0.4;
const BETA: f64 = 0.7;
const ALPHA: f64 = 3.0 * (1.0 - BETA) / (1.0 + BETA);
#[derive(Debug)]
pub struct CubicConfig {
c: f64,
beta: f64,
min_congestion_window: u64,
initial_congestion_window: u64,
slow_start_thresh: u64,
max_datagram_size: u64,
hystart_enabled: bool,
fast_convergence_enabled: bool,
initial_rtt: Option<Duration>,
}
impl CubicConfig {
pub fn from(conf: &RecoveryConfig) -> Self {
let max_datagram_size = conf.max_datagram_size as u64;
let min_congestion_window = conf.min_congestion_window.saturating_mul(max_datagram_size);
let initial_congestion_window = conf
.initial_congestion_window
.saturating_mul(max_datagram_size);
let slow_start_thresh = conf.slow_start_thresh.saturating_mul(max_datagram_size);
let initial_rtt = Some(conf.initial_rtt);
Self {
c: C,
beta: BETA,
min_congestion_window,
initial_congestion_window,
slow_start_thresh,
initial_rtt,
max_datagram_size,
hystart_enabled: true,
fast_convergence_enabled: true,
}
}
fn set_c(&mut self, c: f64) -> &mut Self {
self.c = c;
self
}
fn set_beta(&mut self, beta: f64) -> &mut Self {
self.beta = beta;
self
}
fn enable_hystart(&mut self, enable: bool) -> &mut Self {
self.hystart_enabled = enable;
self
}
fn enable_fast_convergence(&mut self, enable: bool) -> &mut Self {
self.fast_convergence_enabled = enable;
self
}
fn set_min_congestion_window(&mut self, min_congestion_window: u64) -> &mut Self {
self.min_congestion_window = min_congestion_window;
self
}
fn set_initial_congestion_window(&mut self, initial_congestion_window: u64) -> &mut Self {
self.initial_congestion_window = initial_congestion_window;
self
}
fn set_max_datagram_size(&mut self, max_datagram_size: u64) -> &mut Self {
self.max_datagram_size = max_datagram_size;
self
}
}
impl Default for CubicConfig {
fn default() -> Self {
Self {
c: C,
beta: BETA,
min_congestion_window: 2 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_congestion_window: 10 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
slow_start_thresh: u64::MAX,
initial_rtt: Some(crate::INITIAL_RTT),
max_datagram_size: crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
hystart_enabled: true,
fast_convergence_enabled: true,
}
}
}
#[derive(Debug)]
pub struct Cubic {
config: CubicConfig,
hystart: HystartPlusPlus,
cwnd: u64,
ssthresh: u64,
w_max: f64,
k: f64,
alpha: f64,
w_est: f64,
cwnd_inc: u64,
recovery_epoch_start: Option<Instant>,
last_sent_time: Option<Instant>,
stats: CongestionStats,
pacing_rate: u64,
initial_rtt: Duration,
}
impl Cubic {
pub fn new(config: CubicConfig) -> Self {
let initial_cwnd = config.initial_congestion_window;
let ssthresh = config.slow_start_thresh;
let initial_rtt = std::cmp::max(
config.initial_rtt.unwrap_or(crate::INITIAL_RTT),
Duration::from_micros(1),
);
let pacing_rate = (initial_cwnd as f64 / initial_rtt.as_secs_f64()) as u64;
let hystart_enabled = config.hystart_enabled;
let alpha = 3.0 * (1.0 - config.beta) / (1.0 + config.beta);
Self {
config,
hystart: HystartPlusPlus::new(hystart_enabled),
cwnd: initial_cwnd,
ssthresh,
w_max: 0_f64,
k: 0_f64,
alpha,
w_est: 0_f64,
cwnd_inc: 0_u64,
recovery_epoch_start: None,
last_sent_time: None,
stats: Default::default(),
pacing_rate,
initial_rtt,
}
}
fn w_cubic(&self, t: Duration, max_datagram_size: u64) -> f64 {
self.config.c * (t.as_secs_f64() - self.k).powi(3) * max_datagram_size as f64 + self.w_max
}
fn w_est(&self, acked_bytes: u64, max_datagram_size: u64) -> f64 {
self.w_est + self.alpha * acked_bytes as f64 / self.cwnd as f64 * max_datagram_size as f64
}
fn cubic_k(&self, cwnd: u64, max_datagram_size: u64) -> f64 {
if self.w_max > cwnd as f64 {
((self.w_max - cwnd as f64) / max_datagram_size as f64 / self.config.c).cbrt()
} else {
0.0
}
}
}
impl CongestionController for Cubic {
fn name(&self) -> &str {
"CUBIC"
}
fn on_sent(&mut self, now: Instant, packet: &mut SentPacket, bytes_in_flight: u64) {
if bytes_in_flight == 0 {
if let Some(last_sent_time) = self.last_sent_time {
if let Some(recovery_epoch_start) = self.recovery_epoch_start {
self.recovery_epoch_start = Some(
recovery_epoch_start
+ packet.time_sent.saturating_duration_since(last_sent_time),
);
}
}
}
self.last_sent_time = Some(packet.time_sent);
self.hystart.on_sent(packet.pkt_num);
let sent_bytes = packet.sent_size as u64;
self.stats.bytes_in_flight = bytes_in_flight;
self.stats.bytes_sent_in_total = self.stats.bytes_sent_in_total.saturating_add(sent_bytes);
if self.in_slow_start() {
self.stats.bytes_sent_in_slow_start = self
.stats
.bytes_sent_in_slow_start
.saturating_add(sent_bytes);
}
}
fn begin_ack(&mut self, now: Instant, bytes_in_flight: u64) {
}
fn on_ack(
&mut self,
packet: &mut SentPacket,
now: Instant,
app_limited: bool,
rtt: &RttEstimator,
bytes_in_flight: u64,
) {
let sent_time = packet.time_sent;
let acked_bytes = packet.sent_size as u64;
self.stats.bytes_in_flight = bytes_in_flight;
self.stats.bytes_acked_in_total =
self.stats.bytes_acked_in_total.saturating_add(acked_bytes);
if self.in_slow_start() {
self.stats.bytes_acked_in_slow_start = self
.stats
.bytes_acked_in_slow_start
.saturating_add(acked_bytes);
}
if app_limited || self.in_recovery(sent_time) || rtt.smoothed_rtt().is_zero() {
return;
}
if self.in_slow_start() {
self.cwnd = self.cwnd.saturating_add(
self.hystart
.cwnd_increment(acked_bytes, self.config.max_datagram_size),
);
self.hystart
.on_ack(packet.pkt_num, acked_bytes, rtt.latest_rtt());
} else {
let duration_since_recovery: Duration;
if let Some(recovery_start) = self.recovery_epoch_start {
duration_since_recovery = now.saturating_duration_since(recovery_start);
} else {
self.recovery_epoch_start = Some(now);
self.w_max = self.cwnd as f64;
self.k = 0_f64;
self.w_est = self.cwnd as f64;
self.alpha = ALPHA;
duration_since_recovery = Duration::ZERO;
}
let mut target = self.w_cubic(
duration_since_recovery.saturating_add(rtt.smoothed_rtt()),
self.config.max_datagram_size,
);
target = target.clamp(self.cwnd as f64, 1.5 * self.cwnd as f64);
self.w_est = self.w_est(acked_bytes, self.config.max_datagram_size);
if self.w_est >= self.w_max {
self.alpha = 1.0_f64;
}
let mut cwnd = self.cwnd;
let w_cubic_t = self.w_cubic(duration_since_recovery, self.config.max_datagram_size);
if w_cubic_t < self.w_est {
cwnd = cwnd.max(self.w_est as u64);
} else {
let cubic_inc =
(target - cwnd as f64) / cwnd as f64 * self.config.max_datagram_size as f64;
cwnd += cubic_inc as u64;
}
self.cwnd_inc += cwnd - self.cwnd;
self.cwnd +=
self.cwnd_inc / self.config.max_datagram_size * self.config.max_datagram_size;
self.cwnd_inc %= self.config.max_datagram_size;
}
self.pacing_rate = if rtt.smoothed_rtt().is_zero() {
(self.cwnd as u128 * 1_000_000 / self.initial_rtt.as_micros()) as u64
} else {
(self.cwnd as u128 * 1_000_000 / rtt.smoothed_rtt().as_micros()) as u64
};
}
fn end_ack(&mut self) {
if !self.hystart.has_exited() {
self.hystart.end_ack();
if self.hystart.has_exited() {
self.ssthresh = self.cwnd;
}
}
}
fn on_congestion_event(
&mut self,
now: Instant,
packet: &SentPacket,
is_persistent_congestion: bool,
lost_bytes: u64,
bytes_in_flight: u64,
) {
self.stats.bytes_lost_in_total = self.stats.bytes_lost_in_total.saturating_add(lost_bytes);
self.stats.bytes_in_flight = bytes_in_flight;
if self.in_slow_start() {
self.stats.bytes_lost_in_slow_start = self
.stats
.bytes_lost_in_slow_start
.saturating_add(lost_bytes);
}
let sent_time = packet.time_sent;
if self.in_recovery(sent_time) {
return;
}
self.recovery_epoch_start = Some(now);
if self.config.fast_convergence_enabled {
self.w_max = if (self.cwnd as f64) < self.w_max {
self.cwnd as f64 * (1.0 + self.config.beta) / 2.0
} else {
self.cwnd as f64
};
}
self.ssthresh = (self.cwnd as f64 * self.config.beta) as u64;
self.ssthresh = self.ssthresh.max(2 * self.config.max_datagram_size);
self.cwnd = self.ssthresh.max(2 * self.config.max_datagram_size);
self.k = self.cubic_k(self.cwnd, self.config.max_datagram_size);
self.cwnd_inc = (self.cwnd_inc as f64 * self.config.beta) as u64;
self.w_est = self.cwnd as f64;
self.alpha = ALPHA;
self.hystart.on_congestion_event();
if is_persistent_congestion {
self.recovery_epoch_start = None;
self.w_max = self.cwnd as f64;
self.ssthresh = self
.config
.min_congestion_window
.max((self.cwnd as f64 * self.config.beta) as u64);
self.cwnd_inc = 0;
self.cwnd = self.config.min_congestion_window;
}
}
fn in_slow_start(&self) -> bool {
self.cwnd < self.ssthresh
}
fn in_recovery(&self, sent_time: Instant) -> bool {
self.recovery_epoch_start.is_some_and(|t| sent_time <= t)
}
fn congestion_window(&self) -> u64 {
self.cwnd.max(self.config.min_congestion_window)
}
fn initial_window(&self) -> u64 {
self.config.initial_congestion_window
}
fn minimal_window(&self) -> u64 {
self.config.min_congestion_window
}
fn stats(&self) -> &CongestionStats {
&self.stats
}
fn pacing_rate(&self) -> Option<u64> {
Some(self.pacing_rate)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::packet;
#[test]
fn cubic_calc_k() {
let cubic_cfg = CubicConfig::default();
let mut cubic = Cubic::new(cubic_cfg);
let max_datagram_size = 1000;
cubic.w_max = 10240.0;
assert_eq!(
cubic.minimal_window(),
2 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64
);
assert_eq!(
cubic.initial_window(),
10 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64
);
let cwnd = 7040;
assert_eq!(cubic.cubic_k(cwnd, max_datagram_size), 2.0);
let cwnd = 20000;
assert_eq!(cubic.cubic_k(cwnd, max_datagram_size), 0.0);
}
#[test]
fn cubic_on_sent() {
let cubic_cfg = CubicConfig::default();
let mut cubic = Cubic::new(cubic_cfg);
let now = Instant::now();
let mut pkts: Vec<SentPacket> = Vec::new(); let bytes_lost = 0;
let mut bytes_in_flight = 0;
let pkt_size: u64 = 240;
let n_pkts = 6;
for n in 0..n_pkts {
pkts.push(SentPacket {
pkt_num: n,
frames: Vec::new(),
time_sent: now + Duration::from_millis(10 * (n / (n_pkts / 2))),
time_acked: Some(now + Duration::from_millis(20 + n)),
time_lost: None,
ack_eliciting: true,
in_flight: true,
has_data: false,
sent_size: pkt_size as usize,
rate_sample_state: Default::default(),
..SentPacket::default()
});
}
for i in 0..(n_pkts / 2) {
bytes_in_flight += pkt_size;
cubic.on_sent(now, &mut pkts[i as usize], bytes_in_flight);
}
assert_eq!(cubic.last_sent_time.unwrap(), now);
assert_eq!(cubic.in_slow_start(), true);
assert_eq!(
cubic.stats().bytes_sent_in_slow_start,
n_pkts / 2 * pkt_size
);
assert_eq!(cubic.stats().bytes_sent_in_total, n_pkts / 2 * pkt_size);
assert_eq!(cubic.stats().bytes_in_flight, n_pkts / 2 * pkt_size);
bytes_in_flight = 0;
cubic.recovery_epoch_start = Some(now + Duration::from_millis(5));
for i in (n_pkts / 2)..n_pkts {
cubic.on_sent(now, &mut pkts[i as usize], bytes_in_flight);
bytes_in_flight += pkt_size;
}
assert_eq!(
cubic.last_sent_time.unwrap(),
now + Duration::from_millis(10)
);
assert_eq!(cubic.in_slow_start(), true);
assert_eq!(cubic.stats().bytes_sent_in_slow_start, n_pkts * pkt_size);
assert_eq!(cubic.stats().bytes_sent_in_total, n_pkts * pkt_size);
assert_eq!(
cubic.recovery_epoch_start,
Some(now + Duration::from_millis(5) + Duration::from_millis(10))
);
}
#[test]
fn cubic_ack() {
let cubic_cfg = CubicConfig::default();
let mut cubic = Cubic::new(cubic_cfg);
let now = Instant::now();
let mut pkts: Vec<SentPacket> = Vec::new(); let rtt = RttEstimator::new(Duration::from_millis(20));
let bytes_lost = 0;
let pkt_size: u64 = 240;
let n_pkts = 6;
for n in 0..n_pkts {
pkts.push(SentPacket {
pkt_num: n,
frames: Vec::new(),
time_sent: now + Duration::from_millis(n),
time_acked: Some(now + Duration::from_millis(20)),
time_lost: None,
ack_eliciting: true,
in_flight: true,
has_data: false,
sent_size: pkt_size as usize,
rate_sample_state: Default::default(),
..SentPacket::default()
});
}
let mut time_acked = now + Duration::from_millis(20);
let mut cwnd = cubic.congestion_window();
cubic.begin_ack(time_acked, pkt_size);
for i in 0..(n_pkts - 3) {
cubic.on_ack(&mut pkts[i as usize], time_acked, false, &rtt, 0);
}
cubic.end_ack();
assert_eq!(cubic.hystart.has_exited(), false);
assert_eq!(cubic.in_slow_start(), true);
assert_eq!(cubic.congestion_window(), cwnd + (n_pkts - 3) * pkt_size);
cwnd = cubic.congestion_window();
cubic.on_congestion_event(time_acked, &pkts[(n_pkts - 3) as usize], false, pkt_size, 0);
assert_eq!(cubic.w_max, cwnd as f64);
assert_eq!(cubic.ssthresh, (cwnd as f64 * cubic.config.beta) as u64);
assert_eq!(cubic.cwnd, cubic.ssthresh);
cwnd = cubic.congestion_window();
pkts[(n_pkts - 2) as usize].time_sent = time_acked + Duration::from_millis(5);
cubic.on_congestion_event(
time_acked + Duration::from_millis(20),
&pkts[(n_pkts - 2) as usize],
false,
pkt_size,
0,
);
assert_eq!(cubic.w_max, cwnd as f64 * (1.0 + cubic.config.beta) / 2.0);
assert_eq!(cubic.ssthresh, (cwnd as f64 * cubic.config.beta) as u64);
assert_eq!(cubic.cwnd, cubic.ssthresh);
assert_eq!(cubic.in_slow_start(), false);
assert_eq!(cubic.hystart.has_exited(), true);
pkts[(n_pkts - 1) as usize].time_sent = time_acked + Duration::from_millis(25);
time_acked += Duration::from_millis(30);
cubic.begin_ack(time_acked, 0);
cubic.on_ack(&mut pkts[(n_pkts - 1) as usize], time_acked, false, &rtt, 0);
cubic.end_ack();
assert!(cubic.cwnd >= cubic.ssthresh);
}
#[test]
fn cubic_in_recovery() {
let cubic_cfg = CubicConfig::default();
let mut cubic = Cubic::new(cubic_cfg);
let now = Instant::now();
cubic.recovery_epoch_start = Some(now + Duration::from_millis(10));
assert_eq!(cubic.in_recovery(now), true);
assert_eq!(cubic.in_recovery(now + Duration::from_millis(15)), false);
}
#[test]
fn cubic_new_config() {
let max_datagram_size: u64 = 1000;
let min_cwnd: u64 = 4 * max_datagram_size;
let initial_cwnd: u64 = 10 * max_datagram_size;
let mut cubic_config = CubicConfig::default();
cubic_config.set_c(0.7);
assert_eq!(cubic_config.c, 0.7);
cubic_config.set_beta(0.4);
assert_eq!(cubic_config.beta, 0.4);
cubic_config.enable_hystart(true);
assert_eq!(cubic_config.hystart_enabled, true);
cubic_config.enable_fast_convergence(true);
assert_eq!(cubic_config.fast_convergence_enabled, true);
cubic_config.set_initial_congestion_window(initial_cwnd);
assert_eq!(cubic_config.initial_congestion_window, initial_cwnd);
cubic_config.set_min_congestion_window(min_cwnd);
assert_eq!(cubic_config.min_congestion_window, min_cwnd);
cubic_config.set_max_datagram_size(max_datagram_size);
assert_eq!(cubic_config.max_datagram_size, max_datagram_size);
}
}