use std::time::Instant;
use self::recovery::Acked;
use super::bandwidth::Bandwidth;
use super::RecoveryConfig;
use super::Sent;
use crate::recovery::rtt;
use crate::recovery::rtt::RttStats;
use crate::recovery::CongestionControlAlgorithm;
use crate::StartupExit;
use crate::StartupExitReason;
pub struct SsThresh {
ssthresh: usize,
startup_exit: Option<StartupExit>,
}
impl Default for SsThresh {
fn default() -> Self {
Self {
ssthresh: usize::MAX,
startup_exit: None,
}
}
}
impl SsThresh {
fn get(&self) -> usize {
self.ssthresh
}
fn startup_exit(&self) -> Option<StartupExit> {
self.startup_exit
}
fn update(&mut self, ssthresh: usize, in_css: bool) {
if self.startup_exit.is_none() {
let reason = if in_css {
StartupExitReason::ConservativeSlowStartRounds
} else {
StartupExitReason::Loss
};
self.startup_exit = Some(StartupExit::new(ssthresh, None, reason));
}
self.ssthresh = ssthresh;
}
}
pub struct Congestion {
pub(crate) cc_ops: &'static CongestionControlOps,
cubic_state: cubic::State,
pub(crate) hystart: hystart::Hystart,
pub(crate) prr: prr::PRR,
send_quantum: usize,
pub(crate) congestion_window: usize,
pub(crate) ssthresh: SsThresh,
bytes_acked_sl: usize,
bytes_acked_ca: usize,
pub(crate) congestion_recovery_start_time: Option<Instant>,
pub(crate) app_limited: bool,
pub(crate) delivery_rate: delivery_rate::Rate,
pub(crate) initial_congestion_window_packets: usize,
max_datagram_size: usize,
pub(crate) lost_count: usize,
pub(crate) enable_cubic_idle_restart_fix: bool,
}
impl Congestion {
pub(crate) fn from_config(recovery_config: &RecoveryConfig) -> Self {
let initial_congestion_window = recovery_config.max_send_udp_payload_size *
recovery_config.initial_congestion_window_packets;
let mut cc = Congestion {
congestion_window: initial_congestion_window,
ssthresh: Default::default(),
bytes_acked_sl: 0,
bytes_acked_ca: 0,
congestion_recovery_start_time: None,
cc_ops: recovery_config.cc_algorithm.into(),
cubic_state: cubic::State::default(),
app_limited: false,
lost_count: 0,
initial_congestion_window_packets: recovery_config
.initial_congestion_window_packets,
max_datagram_size: recovery_config.max_send_udp_payload_size,
send_quantum: initial_congestion_window,
delivery_rate: delivery_rate::Rate::default(),
hystart: hystart::Hystart::new(recovery_config.hystart),
prr: prr::PRR::default(),
enable_cubic_idle_restart_fix: recovery_config
.enable_cubic_idle_restart_fix,
};
(cc.cc_ops.on_init)(&mut cc);
cc
}
pub(crate) fn in_congestion_recovery(&self, sent_time: Instant) -> bool {
match self.congestion_recovery_start_time {
Some(congestion_recovery_start_time) =>
sent_time <= congestion_recovery_start_time,
None => false,
}
}
pub(crate) fn delivery_rate(&self) -> Bandwidth {
self.delivery_rate.sample_delivery_rate()
}
pub(crate) fn send_quantum(&self) -> usize {
self.send_quantum
}
pub(crate) fn congestion_window(&self) -> usize {
self.congestion_window
}
fn update_app_limited(&mut self, v: bool) {
self.app_limited = v;
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn on_packet_sent(
&mut self, bytes_in_flight: usize, sent_bytes: usize, now: Instant,
pkt: &mut Sent, bytes_lost: u64, in_flight: bool,
) {
if in_flight {
self.update_app_limited(
(bytes_in_flight + sent_bytes) < self.congestion_window,
);
(self.cc_ops.on_packet_sent)(self, sent_bytes, bytes_in_flight, now);
self.prr.on_packet_sent(sent_bytes);
if self.hystart.enabled() &&
self.congestion_window < self.ssthresh.get()
{
self.hystart.start_round(pkt.pkt_num);
}
}
pkt.time_sent = now;
self.delivery_rate
.on_packet_sent(pkt, bytes_in_flight, bytes_lost);
}
pub(crate) fn on_packets_acked(
&mut self, bytes_in_flight: usize, acked: &mut Vec<Acked>,
rtt_stats: &RttStats, now: Instant,
) {
for pkt in acked.iter() {
self.delivery_rate.update_rate_sample(pkt, now);
}
self.delivery_rate.generate_rate_sample(*rtt_stats.min_rtt);
(self.cc_ops.on_packets_acked)(
self,
bytes_in_flight,
acked,
now,
rtt_stats,
);
}
}
pub(crate) struct CongestionControlOps {
pub on_init: fn(r: &mut Congestion),
pub on_packet_sent: fn(
r: &mut Congestion,
sent_bytes: usize,
bytes_in_flight: usize,
now: Instant,
),
pub on_packets_acked: fn(
r: &mut Congestion,
bytes_in_flight: usize,
packets: &mut Vec<Acked>,
now: Instant,
rtt_stats: &RttStats,
),
pub congestion_event: fn(
r: &mut Congestion,
bytes_in_flight: usize,
lost_bytes: usize,
largest_lost_packet: &Sent,
now: Instant,
),
pub checkpoint: fn(r: &mut Congestion),
pub rollback: fn(r: &mut Congestion) -> bool,
#[cfg(feature = "qlog")]
pub state_str: fn(r: &Congestion, now: Instant) -> &'static str,
pub debug_fmt: fn(
r: &Congestion,
formatter: &mut std::fmt::Formatter,
) -> std::fmt::Result,
}
impl From<CongestionControlAlgorithm> for &'static CongestionControlOps {
fn from(algo: CongestionControlAlgorithm) -> Self {
match algo {
CongestionControlAlgorithm::Reno => &reno::RENO,
CongestionControlAlgorithm::CUBIC => &cubic::CUBIC,
CongestionControlAlgorithm::Bbr2Gcongestion => unreachable!(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ssthresh_init() {
let ssthresh: SsThresh = Default::default();
assert_eq!(ssthresh.get(), usize::MAX);
assert_eq!(ssthresh.startup_exit(), None);
}
#[test]
fn ssthresh_in_css() {
let expected_startup_exit = StartupExit::new(
1000,
None,
StartupExitReason::ConservativeSlowStartRounds,
);
let mut ssthresh: SsThresh = Default::default();
ssthresh.update(1000, true);
assert_eq!(ssthresh.get(), 1000);
assert_eq!(ssthresh.startup_exit(), Some(expected_startup_exit));
ssthresh.update(2000, true);
assert_eq!(ssthresh.get(), 2000);
assert_eq!(ssthresh.startup_exit(), Some(expected_startup_exit));
ssthresh.update(500, false);
assert_eq!(ssthresh.get(), 500);
assert_eq!(ssthresh.startup_exit(), Some(expected_startup_exit));
}
#[test]
fn ssthresh_in_slow_start() {
let expected_startup_exit =
StartupExit::new(1000, None, StartupExitReason::Loss);
let mut ssthresh: SsThresh = Default::default();
ssthresh.update(1000, false);
assert_eq!(ssthresh.get(), 1000);
assert_eq!(ssthresh.startup_exit(), Some(expected_startup_exit));
ssthresh.update(2000, true);
assert_eq!(ssthresh.get(), 2000);
assert_eq!(ssthresh.startup_exit(), Some(expected_startup_exit));
ssthresh.update(500, false);
assert_eq!(ssthresh.get(), 500);
assert_eq!(ssthresh.startup_exit(), Some(expected_startup_exit));
}
}
mod cubic;
mod delivery_rate;
mod hystart;
mod prr;
pub(crate) mod recovery;
mod reno;
#[cfg(test)]
mod test_sender;