use std::time::Duration;
use std::time::Instant;
use log::*;
use super::minmax::MinMax;
use super::CongestionController;
use super::CongestionStats;
use crate::connection::rtt::RttEstimator;
use crate::connection::space::RateSamplePacketState;
use crate::connection::space::SentPacket;
use crate::RecoveryConfig;
pub const COPA_DELTA: f64 = 0.04;
const SPEED_UP_THRESHOLD: u64 = 3;
const STANDING_RTT_FILTER_WINDOW: Duration = Duration::from_millis(100);
const MIN_RTT_FILTER_WINDOW: Duration = Duration::from_secs(10);
const PACING_GAIN: u64 = 2;
const DELAY_OSCILLATION_THRESHOLD: f64 = 0.1;
const LOSS_RATE_THRESHOLD: f64 = 0.1;
#[derive(Debug)]
pub struct CopaConfig {
min_cwnd: u64,
initial_cwnd: u64,
initial_rtt: Option<Duration>,
max_datagram_size: u64,
slow_start_delta: f64,
steady_delta: f64,
use_standing_rtt: bool,
}
impl CopaConfig {
pub fn from(conf: &RecoveryConfig) -> Self {
let max_datagram_size = conf.max_datagram_size as u64;
let min_cwnd = conf.min_congestion_window.saturating_mul(max_datagram_size);
let initial_cwnd = conf
.initial_congestion_window
.saturating_mul(max_datagram_size);
let initial_rtt = Some(conf.initial_rtt);
Self {
min_cwnd,
initial_cwnd,
initial_rtt,
max_datagram_size,
slow_start_delta: conf.copa_slow_start_delta,
steady_delta: conf.copa_steady_delta,
use_standing_rtt: conf.copa_use_standing_rtt,
}
}
}
impl Default for CopaConfig {
fn default() -> Self {
Self {
min_cwnd: 4 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_cwnd: 80 * crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
initial_rtt: Some(crate::INITIAL_RTT),
max_datagram_size: crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE as u64,
slow_start_delta: COPA_DELTA,
steady_delta: COPA_DELTA,
use_standing_rtt: true,
}
}
}
#[derive(Eq, PartialEq, Debug)]
enum Direction {
Up,
Down,
}
#[derive(Eq, PartialEq, Debug)]
enum CompetingMode {
Default,
Competitive,
}
#[derive(Debug)]
struct Velocity {
direction: Direction,
velocity: u64,
last_cwnd: u64,
same_direction_cnt: u64,
}
impl Default for Velocity {
fn default() -> Self {
Self {
direction: Direction::Up,
velocity: 1,
last_cwnd: 0,
same_direction_cnt: 0,
}
}
}
#[derive(Debug)]
struct AckState {
now: Instant,
newly_lost_bytes: u64,
newly_acked_bytes: u64,
largest_acked_pkt_num: u64,
min_rtt: Duration,
last_srtt: Duration,
}
impl Default for AckState {
fn default() -> Self {
let now = Instant::now();
Self {
now,
newly_lost_bytes: 0,
newly_acked_bytes: 0,
largest_acked_pkt_num: 0,
min_rtt: Duration::ZERO,
last_srtt: Duration::ZERO,
}
}
}
#[derive(Debug, Default)]
struct RoundTripCounter {
pub round_count: u64,
pub is_round_start: bool,
pub round_end_pkt_num: u64,
pub last_total_acked_bytes: u64,
pub last_total_lost_bytes: u64,
pub loss_rate: f64,
}
#[derive(Debug)]
pub struct Copa {
config: CopaConfig,
stats: CongestionStats,
init_time: Instant,
mode: CompetingMode,
slow_start: bool,
cwnd: u64,
velocity: Velocity,
delta: f64,
standing_rtt_filter: MinMax,
min_rtt_filter: MinMax,
ack_state: AckState,
increase_cwnd: bool,
target_rate: u64,
last_sent_pkt_num: u64,
round: RoundTripCounter,
}
impl Copa {
pub fn new(config: CopaConfig) -> Self {
let slow_start_delta = config.slow_start_delta;
let initial_cwnd = config.initial_cwnd;
Self {
config,
stats: Default::default(),
init_time: Instant::now(),
mode: CompetingMode::Default,
slow_start: true,
cwnd: initial_cwnd,
velocity: Velocity::default(),
delta: slow_start_delta,
standing_rtt_filter: MinMax::new(STANDING_RTT_FILTER_WINDOW.as_micros() as u64),
min_rtt_filter: MinMax::new(MIN_RTT_FILTER_WINDOW.as_micros() as u64),
ack_state: Default::default(),
increase_cwnd: false,
target_rate: 0,
last_sent_pkt_num: 0,
round: Default::default(),
}
}
fn update_velocity(&mut self) {
if self.slow_start && self.increase_cwnd {
return;
}
if self.velocity.last_cwnd == 0 {
self.velocity.last_cwnd = self.cwnd.max(self.config.min_cwnd);
self.velocity.velocity = 1;
self.velocity.same_direction_cnt = 0;
return;
}
if !self.is_round_start() {
return;
}
let new_direction = if self.cwnd > self.velocity.last_cwnd {
Direction::Up
} else {
Direction::Down
};
if new_direction != self.velocity.direction {
self.velocity.velocity = 1;
self.velocity.same_direction_cnt = 0;
} else {
self.velocity.same_direction_cnt = self.velocity.same_direction_cnt.saturating_add(1);
if self.velocity.same_direction_cnt >= SPEED_UP_THRESHOLD {
self.velocity.velocity = self.velocity.velocity.saturating_mul(2);
}
}
if self.increase_cwnd
&& self.velocity.direction != Direction::Up
&& self.velocity.velocity > 1
{
self.velocity.direction = Direction::Up;
self.velocity.velocity = 1;
} else if !self.increase_cwnd
&& self.velocity.direction != Direction::Down
&& self.velocity.velocity > 1
{
self.velocity.direction = Direction::Down;
self.velocity.velocity = 1;
}
self.velocity.direction = new_direction;
self.velocity.last_cwnd = self.cwnd;
}
fn update_mode(&mut self) {
self.mode = if self.round.loss_rate >= LOSS_RATE_THRESHOLD {
CompetingMode::Competitive
} else {
CompetingMode::Default
};
match self.mode {
CompetingMode::Default => {
self.delta = if self.slow_start {
self.config.slow_start_delta
} else {
self.config.steady_delta
};
}
CompetingMode::Competitive => {
self.delta *= 2.0_f64;
self.delta = self.delta.min(0.5);
}
}
}
fn update_cwnd(&mut self) {
if self.slow_start && !self.increase_cwnd {
self.slow_start = false;
}
if self.slow_start {
if self.increase_cwnd {
self.cwnd = self.cwnd.saturating_add(self.ack_state.newly_acked_bytes);
}
} else {
let cwnd_delta = ((self.velocity.velocity
* self.ack_state.newly_acked_bytes
* self.config.max_datagram_size) as f64
/ (self.delta * (self.cwnd as f64))) as u64;
self.cwnd = if self.increase_cwnd {
self.cwnd.saturating_add(cwnd_delta)
} else {
self.cwnd.saturating_sub(cwnd_delta)
};
if self.cwnd == 0 {
trace!("{}. cwnd is zero!!!", self.name());
self.cwnd = self.config.min_cwnd;
self.velocity.velocity = 1;
}
}
}
fn update_round(&mut self) {
if self.ack_state.largest_acked_pkt_num >= self.round.round_end_pkt_num {
let bytes_lost_in_this_round = self
.stats
.bytes_lost_in_total
.saturating_sub(self.round.last_total_lost_bytes);
let bytes_acked_in_this_round = self
.stats
.bytes_acked_in_total
.saturating_sub(self.round.last_total_acked_bytes);
self.round.loss_rate = bytes_lost_in_this_round as f64
/ bytes_lost_in_this_round.saturating_add(bytes_acked_in_this_round) as f64;
self.round.last_total_acked_bytes = self.stats.bytes_acked_in_total;
self.round.last_total_lost_bytes = self.stats.bytes_lost_in_total;
self.round.round_count = self.round.round_count.saturating_add(1);
self.round.round_end_pkt_num = self.last_sent_pkt_num;
self.round.is_round_start = true;
} else {
self.round.is_round_start = false;
}
}
fn is_round_start(&self) -> bool {
self.round.is_round_start
}
fn update_model(&mut self) {
if self.config.use_standing_rtt {
self.standing_rtt_filter
.set_window(self.ack_state.last_srtt.as_micros() as u64);
} else {
self.standing_rtt_filter
.set_window(self.ack_state.last_srtt.as_micros() as u64 / 2);
}
if self.ack_state.min_rtt == Duration::ZERO {
self.ack_state.min_rtt = if self.ack_state.last_srtt == Duration::ZERO {
self.config.initial_rtt.unwrap_or(Duration::from_millis(20))
} else {
self.ack_state.last_srtt
};
}
let elapsed = self.ack_state.now.saturating_duration_since(self.init_time);
self.min_rtt_filter.update_min(
elapsed.as_micros() as u64,
self.ack_state.min_rtt.as_micros() as u64,
);
self.standing_rtt_filter.update_min(
elapsed.as_micros() as u64,
self.ack_state.min_rtt.as_micros() as u64,
);
self.update_mode();
let min_rtt = Duration::from_micros(self.min_rtt_filter.get());
let standing_rtt = self.get_standing_rtt();
trace!(
"{}. round_min_rtt = {}us, elapsed = {}us, min_rtt = {}us, standing_rtt = {}us",
self.name(),
self.ack_state.min_rtt.as_micros(),
elapsed.as_micros(),
min_rtt.as_micros(),
standing_rtt.as_micros(),
);
let current_rate: u64 = (self.cwnd as f64 / standing_rtt.as_secs_f64()) as u64;
let queueing_delay = standing_rtt.saturating_sub(min_rtt);
if queueing_delay.is_zero() {
self.increase_cwnd = true;
trace!(
"{}. queuing delay is zero. rtt_standing and min_rtt is the same: {}us",
self.name(),
min_rtt.as_micros()
);
self.target_rate = (self.cwnd as f64 / standing_rtt.as_secs_f64()) as u64;
} else {
self.target_rate = (self.config.max_datagram_size as f64
/ self.delta
/ queueing_delay.max(Duration::from_micros(1)).as_secs_f64())
as u64;
trace!(
"{}. target_rate = {}, delta = {}, max_datagram_size = {}",
self.name(),
self.target_rate,
self.delta,
self.config.max_datagram_size
);
self.increase_cwnd = self.target_rate >= current_rate;
}
self.update_velocity();
self.update_cwnd();
trace!(
"{}. mode = {:?}, slow_start={}, delta={}, target_rate={}, current_rate={},
increase_cwnd={}, queuing_delay={}us, rtt_standing={}us, cwnd={}",
self.name(),
self.mode,
self.slow_start,
self.delta,
self.target_rate,
current_rate,
self.increase_cwnd,
queueing_delay.as_micros(),
standing_rtt.as_micros(),
self.cwnd
);
}
fn get_standing_rtt(&self) -> Duration {
let standing_rtt = Duration::from_micros(self.standing_rtt_filter.get());
if standing_rtt.is_zero() {
return std::cmp::max(
self.config.initial_rtt.unwrap_or(crate::INITIAL_RTT),
Duration::from_micros(1),
);
}
standing_rtt
}
}
impl CongestionController for Copa {
fn pacing_rate(&self) -> Option<u64> {
let standing_rtt = self.get_standing_rtt();
let current_rate = (self.cwnd as f64 / standing_rtt.as_secs_f64()) as u64;
Some(PACING_GAIN * current_rate)
}
fn name(&self) -> &str {
"COPA"
}
fn congestion_window(&self) -> u64 {
self.cwnd.max(self.config.min_cwnd)
}
fn initial_window(&self) -> u64 {
self.config.initial_cwnd
}
fn minimal_window(&self) -> u64 {
self.config.min_cwnd
}
fn in_slow_start(&self) -> bool {
self.slow_start
}
fn stats(&self) -> &CongestionStats {
&self.stats
}
fn on_sent(&mut self, now: Instant, packet: &mut SentPacket, bytes_in_flight: u64) {
self.stats.bytes_in_flight = bytes_in_flight;
self.stats.bytes_sent_in_total = self
.stats
.bytes_sent_in_total
.saturating_add(packet.sent_size as u64);
self.last_sent_pkt_num = packet.pkt_num;
}
fn begin_ack(&mut self, now: Instant, bytes_in_flight: u64) {
self.ack_state.newly_acked_bytes = 0;
self.ack_state.newly_lost_bytes = 0;
self.ack_state.now = now;
self.ack_state.min_rtt = Duration::ZERO;
self.ack_state.last_srtt = Duration::ZERO;
self.ack_state.largest_acked_pkt_num = 0;
}
fn on_ack(
&mut self,
packet: &mut SentPacket,
now: Instant,
_app_limited: bool,
rtt: &RttEstimator,
bytes_in_flight: u64,
) {
let sent_time = packet.time_sent;
let acked_bytes = packet.sent_size as u64;
self.stats.bytes_in_flight = self.stats.bytes_in_flight.saturating_sub(acked_bytes);
self.stats.bytes_acked_in_total =
self.stats.bytes_acked_in_total.saturating_add(acked_bytes);
if self.in_slow_start() {
self.stats.bytes_acked_in_slow_start = self
.stats
.bytes_acked_in_slow_start
.saturating_add(acked_bytes);
}
self.ack_state.newly_acked_bytes =
self.ack_state.newly_acked_bytes.saturating_add(acked_bytes);
self.ack_state.largest_acked_pkt_num =
self.ack_state.largest_acked_pkt_num.max(packet.pkt_num);
self.ack_state.last_srtt = rtt.smoothed_rtt();
if self.ack_state.min_rtt.is_zero() || self.ack_state.min_rtt >= rtt.latest_rtt() {
trace!(
"{}. Got a smaller rtt: {}us -> {}us",
self.name(),
self.ack_state.min_rtt.as_micros(),
rtt.latest_rtt().as_micros()
);
self.ack_state.min_rtt = rtt.latest_rtt();
}
trace!(
"{}. ON_ACK. latest_rtt = {}us, srtt = {}us, newly_acked = {}, total_acked = {}",
self.name(),
rtt.latest_rtt().as_micros(),
rtt.smoothed_rtt().as_micros(),
self.ack_state.newly_acked_bytes,
self.stats.bytes_acked_in_total,
)
}
fn end_ack(&mut self) {
self.update_round();
trace!(
"{}. END_ACK. round_start = {:?}, round_count = {}, end_pkt = {}, loss_rate = {}, last_acked = {}, last_lost = {}",
self.name(),
self.is_round_start(),
self.round.round_count,
self.round.round_end_pkt_num,
self.round.loss_rate,
self.round.last_total_acked_bytes,
self.round.last_total_lost_bytes,
);
self.update_model();
}
fn on_congestion_event(
&mut self,
now: Instant,
packet: &SentPacket,
in_persistent_congestion: bool,
lost_bytes: u64,
bytes_in_flight: u64,
) {
self.stats.bytes_lost_in_total = self.stats.bytes_lost_in_total.saturating_add(lost_bytes);
self.stats.bytes_in_flight = bytes_in_flight;
if self.in_slow_start() {
self.stats.bytes_lost_in_slow_start = self
.stats
.bytes_lost_in_slow_start
.saturating_add(lost_bytes);
}
}
}