use core::cmp;
use std::sync::OnceLock;
use std::time::Instant;
pub type size_tp = u64;
pub type window_tp = u64;
pub type rate_tp = u64;
pub type time_tp = i32;
pub type count_tp = i32;
pub type fps_tp = u8;
pub type prob_tp = i64;
#[repr(u8)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ecn_tp {
ecn_not_ect = 0,
ecn_l4s_id = 1,
ecn_ect0 = 2,
ecn_ce = 3,
}
#[repr(u8)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum cs_tp {
cs_init = 0,
cs_cong_avoid = 1,
cs_in_loss = 2,
cs_in_cwr = 3,
}
#[repr(u8)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum cca_tp {
cca_prague_win = 0,
cca_prague_rate = 1,
}
pub const PRAGUE_INITWIN: count_tp = 10;
pub const PRAGUE_MINMTU: size_tp = 150;
pub const PRAGUE_INITMTU: size_tp = 1400;
pub const PRAGUE_INITRATE: rate_tp = 12_500;
pub const PRAGUE_MINRATE: rate_tp = 12_500;
pub const PRAGUE_MAXRATE: rate_tp = 12_500_000_000;
const MIN_STEP: rate_tp = 7; const RATE_STEP: rate_tp = 1_920_000; const QUEUE_GROWTH: time_tp = 1_000; const BURST_TIME: time_tp = 250; const REF_RTT: time_tp = 25_000; const PROB_SHIFT: u8 = 20;
const MAX_PROB: prob_tp = 1i64 << PROB_SHIFT;
const ALPHA_SHIFT: u8 = 4; const MIN_PKT_BURST: count_tp = 1;
const MIN_PKT_WIN: count_tp = 2;
const RATE_OFFSET: u8 = 3; const MIN_FRAME_WIN: count_tp = 2;
#[inline]
fn wrapping_sub_time(a: time_tp, b: time_tp) -> time_tp {
a.wrapping_sub(b)
}
#[inline]
fn wrapping_sub_count(a: count_tp, b: count_tp) -> count_tp {
a.wrapping_sub(b)
}
#[inline]
fn ecn_mask_ce(v: ecn_tp) -> ecn_tp {
match (v as u8) & (ecn_tp::ecn_ce as u8) {
0 => ecn_tp::ecn_not_ect,
1 => ecn_tp::ecn_l4s_id,
2 => ecn_tp::ecn_ect0,
3 => ecn_tp::ecn_ce,
_ => ecn_tp::ecn_not_ect, }
}
#[inline]
fn mul_64_64_shift(left: u64, right: u64, shift: u32) -> u64 {
if shift >= 128 {
return 0;
}
let prod = (left as u128).wrapping_mul(right as u128);
let shifted = prod >> shift;
if shifted > u64::MAX as u128 {
u64::MAX
} else {
shifted as u64
}
}
#[inline]
fn div_64_64_round(a: u64, divisor: u64) -> u64 {
if divisor == 0 {
return u64::MAX;
}
let dividend = (a as u128) + ((divisor >> 1) as u128);
let q = dividend / (divisor as u128);
if q > u64::MAX as u128 {
u64::MAX
} else {
q as u64
}
}
static PROCESS_START: OnceLock<Instant> = OnceLock::new();
#[inline]
fn process_now_u32_micros() -> u32 {
let start = PROCESS_START.get_or_init(Instant::now);
let us = start.elapsed().as_micros();
us as u32
}
#[derive(Clone, Copy, Debug)]
pub struct PragueState {
pub m_start_ref: time_tp,
pub m_init_rate: rate_tp,
pub m_init_window: window_tp,
pub m_min_rate: rate_tp,
pub m_max_rate: rate_tp,
pub m_max_packet_size: size_tp,
pub m_frame_interval: time_tp,
pub m_frame_budget: time_tp,
pub m_ts_remote: time_tp,
pub m_rtt: time_tp,
pub m_srtt: time_tp,
pub m_vrtt: time_tp,
pub m_r_prev_ts: time_tp,
pub m_r_packets_received: count_tp,
pub m_r_packets_CE: count_tp,
pub m_r_packets_lost: count_tp,
pub m_r_error_L4S: bool,
pub m_cc_ts: time_tp,
pub m_packets_received: count_tp,
pub m_packets_CE: count_tp,
pub m_packets_lost: count_tp,
pub m_packets_sent: count_tp,
pub m_error_L4S: bool,
pub m_alpha_ts: time_tp,
pub m_alpha_packets_received: count_tp,
pub m_alpha_packets_CE: count_tp,
pub m_alpha_packets_lost: count_tp,
pub m_alpha_packets_sent: count_tp,
pub m_loss_ts: time_tp,
pub m_loss_cca: cca_tp,
pub m_lost_window: window_tp,
pub m_lost_rate: rate_tp,
pub m_lost_rtts_to_growth: count_tp,
pub m_loss_packets_lost: count_tp,
pub m_loss_packets_sent: count_tp,
pub m_cwr_ts: time_tp,
pub m_cwr_packets_sent: count_tp,
pub m_cc_state: cs_tp,
pub m_cca_mode: cca_tp,
pub m_rtts_to_growth: count_tp,
pub m_alpha: prob_tp,
pub m_pacing_rate: rate_tp,
pub m_fractional_window: window_tp,
pub m_packet_burst: count_tp,
pub m_packet_size: size_tp,
pub m_packet_window: count_tp,
}
impl Default for PragueState {
fn default() -> Self {
Self {
m_start_ref: 0,
m_init_rate: PRAGUE_INITRATE,
m_init_window: 0,
m_min_rate: PRAGUE_MINRATE,
m_max_rate: PRAGUE_MAXRATE,
m_max_packet_size: PRAGUE_INITMTU,
m_frame_interval: 0,
m_frame_budget: 0,
m_ts_remote: 0,
m_rtt: 0,
m_srtt: 0,
m_vrtt: 0,
m_r_prev_ts: 0,
m_r_packets_received: 0,
m_r_packets_CE: 0,
m_r_packets_lost: 0,
m_r_error_L4S: false,
m_cc_ts: 0,
m_packets_received: 0,
m_packets_CE: 0,
m_packets_lost: 0,
m_packets_sent: 0,
m_error_L4S: false,
m_alpha_ts: 0,
m_alpha_packets_received: 0,
m_alpha_packets_CE: 0,
m_alpha_packets_lost: 0,
m_alpha_packets_sent: 0,
m_loss_ts: 0,
m_loss_cca: cca_tp::cca_prague_win,
m_lost_window: 0,
m_lost_rate: 0,
m_lost_rtts_to_growth: 0,
m_loss_packets_lost: 0,
m_loss_packets_sent: 0,
m_cwr_ts: 0,
m_cwr_packets_sent: 0,
m_cc_state: cs_tp::cs_init,
m_cca_mode: cca_tp::cca_prague_win,
m_rtts_to_growth: 0,
m_alpha: 0,
m_pacing_rate: PRAGUE_INITRATE,
m_fractional_window: 0,
m_packet_burst: MIN_PKT_BURST,
m_packet_size: PRAGUE_INITMTU,
m_packet_window: MIN_PKT_WIN,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum PragueCongestionSignal {
Stable,
EcnMarked,
LossRecovery,
L4sFallback,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum PragueBitrateAction {
Increase,
Hold,
Decrease,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct PragueRateAdvice {
pub pacing_rate_bytes_per_sec: rate_tp,
pub packet_window: count_tp,
pub packet_burst: count_tp,
pub packet_size_bytes: size_tp,
pub next_send_ecn: ecn_tp,
pub controller_state: cs_tp,
pub controller_mode: cca_tp,
pub last_rtt_us: time_tp,
pub smoothed_rtt_us: time_tp,
pub virtual_rtt_us: time_tp,
pub ce_ratio_ppm: u32,
pub congestion_signal: PragueCongestionSignal,
pub l4s_fallback_active: bool,
pub packets_received: count_tp,
pub packets_ce: count_tp,
pub packets_lost: count_tp,
}
impl PragueRateAdvice {
fn new(
state: PragueState,
pacing_rate_bytes_per_sec: rate_tp,
packet_window: count_tp,
packet_burst: count_tp,
packet_size_bytes: size_tp,
) -> Self {
Self {
pacing_rate_bytes_per_sec,
packet_window,
packet_burst,
packet_size_bytes,
next_send_ecn: if state.m_error_L4S {
ecn_tp::ecn_not_ect
} else {
ecn_tp::ecn_l4s_id
},
controller_state: state.m_cc_state,
controller_mode: state.m_cca_mode,
last_rtt_us: state.m_rtt,
smoothed_rtt_us: state.m_srtt,
virtual_rtt_us: state.m_vrtt,
ce_ratio_ppm: alpha_to_ppm(state.m_alpha),
congestion_signal: congestion_signal_from_state(&state),
l4s_fallback_active: state.m_error_L4S,
packets_received: state.m_packets_received,
packets_ce: state.m_packets_CE,
packets_lost: state.m_packets_lost,
}
}
pub fn pacing_rate_bits_per_sec(&self) -> u64 {
self.pacing_rate_bytes_per_sec.saturating_mul(8)
}
pub fn bitrate_action_since(
&self,
previous: &Self,
tolerance_percent: u8,
) -> PragueBitrateAction {
let tolerance = u64::from(tolerance_percent.min(100));
let previous_rate = previous.pacing_rate_bytes_per_sec.max(1);
let increase_threshold = previous_rate.saturating_mul(100 + tolerance) / 100;
let decrease_threshold = previous_rate.saturating_mul(100 - tolerance) / 100;
if self.pacing_rate_bytes_per_sec > increase_threshold {
PragueBitrateAction::Increase
} else if self.pacing_rate_bytes_per_sec < decrease_threshold {
PragueBitrateAction::Decrease
} else {
PragueBitrateAction::Hold
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct PragueVideoRateAdvice {
pub transport: PragueRateAdvice,
pub target_frame_size_bytes: size_tp,
pub frame_window: count_tp,
}
impl PragueVideoRateAdvice {
pub fn pacing_rate_bits_per_sec(&self) -> u64 {
self.transport.pacing_rate_bits_per_sec()
}
pub fn bitrate_action_since(
&self,
previous: &Self,
tolerance_percent: u8,
) -> PragueBitrateAction {
self.transport
.bitrate_action_since(&previous.transport, tolerance_percent)
}
}
#[inline]
fn alpha_to_ppm(alpha: prob_tp) -> u32 {
let alpha = alpha.clamp(0, MAX_PROB) as u128;
((alpha * 1_000_000u128) / (MAX_PROB as u128)) as u32
}
#[inline]
fn congestion_signal_from_state(state: &PragueState) -> PragueCongestionSignal {
if state.m_error_L4S {
PragueCongestionSignal::L4sFallback
} else if state.m_cc_state == cs_tp::cs_in_loss {
PragueCongestionSignal::LossRecovery
} else if state.m_cc_state == cs_tp::cs_in_cwr || state.m_alpha > 0 {
PragueCongestionSignal::EcnMarked
} else {
PragueCongestionSignal::Stable
}
}
pub struct PragueCC {
state: PragueState,
}
impl Default for PragueCC {
fn default() -> Self {
Self::new(
PRAGUE_INITMTU,
0,
0,
PRAGUE_INITRATE,
PRAGUE_INITWIN,
PRAGUE_MINRATE,
PRAGUE_MAXRATE,
)
}
}
impl PragueCC {
pub fn new(
max_packet_size: size_tp,
fps: fps_tp,
frame_budget: time_tp,
init_rate: rate_tp,
init_window: count_tp,
min_rate: rate_tp,
max_rate: rate_tp,
) -> Self {
let mut cc = Self {
state: PragueState {
m_start_ref: 0,
..PragueState::default()
},
};
let ts_now = cc.Now();
cc.state.m_init_rate = init_rate;
cc.state.m_init_window = (init_window as window_tp)
.wrapping_mul(max_packet_size)
.wrapping_mul(1_000_000);
cc.state.m_min_rate = min_rate;
cc.state.m_max_rate = max_rate;
cc.state.m_max_packet_size = max_packet_size;
cc.state.m_frame_interval = if fps != 0 {
(1_000_000u32 / fps as u32) as time_tp
} else {
0
};
cc.state.m_frame_budget = frame_budget;
if cc.state.m_frame_budget > cc.state.m_frame_interval {
cc.state.m_frame_budget = cc.state.m_frame_interval;
}
cc.state.m_ts_remote = 0;
cc.state.m_rtt = 0;
cc.state.m_srtt = 0;
cc.state.m_vrtt = 0;
cc.state.m_r_prev_ts = 0;
cc.state.m_r_packets_received = 0;
cc.state.m_r_packets_CE = 0;
cc.state.m_r_packets_lost = 0;
cc.state.m_r_error_L4S = false;
cc.state.m_cc_ts = ts_now;
cc.state.m_packets_received = 0;
cc.state.m_packets_CE = 0;
cc.state.m_packets_lost = 0;
cc.state.m_packets_sent = 0;
cc.state.m_error_L4S = false;
cc.state.m_alpha_ts = ts_now;
cc.state.m_alpha_packets_received = 0;
cc.state.m_alpha_packets_CE = 0;
cc.state.m_alpha_packets_lost = 0;
cc.state.m_alpha_packets_sent = 0;
cc.state.m_loss_ts = 0;
cc.state.m_loss_cca = cca_tp::cca_prague_win;
cc.state.m_lost_window = 0;
cc.state.m_lost_rate = 0;
cc.state.m_loss_packets_lost = 0;
cc.state.m_loss_packets_sent = 0;
cc.state.m_lost_rtts_to_growth = 0;
cc.state.m_cwr_ts = 0;
cc.state.m_cwr_packets_sent = 0;
cc.state.m_cc_state = cs_tp::cs_init;
cc.state.m_cca_mode = cca_tp::cca_prague_win;
cc.state.m_rtts_to_growth = (init_rate / RATE_STEP) as count_tp + (MIN_STEP as count_tp);
cc.state.m_alpha = 0;
cc.state.m_pacing_rate = init_rate;
cc.state.m_fractional_window = cc.state.m_init_window;
let ref_rtt = cc.get_ref_rtt();
cc.state.m_packet_size =
cc.state.m_pacing_rate.wrapping_mul(ref_rtt as u64) / 1_000_000 / (MIN_PKT_WIN as u64);
if cc.state.m_packet_size < PRAGUE_MINMTU {
cc.state.m_packet_size = PRAGUE_MINMTU;
}
if cc.state.m_packet_size > cc.state.m_max_packet_size {
cc.state.m_packet_size = cc.state.m_max_packet_size;
}
cc.state.m_packet_burst = (cc.state.m_pacing_rate.wrapping_mul(BURST_TIME as u64)
/ 1_000_000
/ cc.state.m_packet_size) as count_tp;
if cc.state.m_packet_burst < MIN_PKT_BURST {
cc.state.m_packet_burst = MIN_PKT_BURST;
}
cc.state.m_packet_window =
(cc.state.m_fractional_window / 1_000_000).div_ceil(cc.state.m_packet_size) as count_tp;
if cc.state.m_packet_window < MIN_PKT_WIN {
cc.state.m_packet_window = MIN_PKT_WIN;
}
cc
}
pub fn GetStatePtr(&self) -> &PragueState {
&self.state
}
pub fn GetStats(&self, stats: &mut PragueState) {
*stats = self.state;
}
pub fn Now(&mut self) -> time_tp {
if self.state.m_start_ref == 0 {
let mut start = process_now_u32_micros() as time_tp;
if start == 0 {
start = -1; }
self.state.m_start_ref = start;
return 1;
}
let now_abs = process_now_u32_micros() as time_tp;
let now = wrapping_sub_time(now_abs, self.state.m_start_ref);
if now == 0 {
1
} else {
now
}
}
pub fn get_ref_rtt(&self) -> time_tp {
if self.state.m_frame_interval != 0 {
self.state.m_frame_interval
} else {
REF_RTT
}
}
pub fn get_alpha_shift(&self) -> count_tp {
if self.state.m_frame_interval != 0 {
(((1u64 << ALPHA_SHIFT) * (REF_RTT as u64)) / (self.state.m_frame_interval as u64))
as count_tp
} else {
1i32 << ALPHA_SHIFT
}
}
pub fn RFC8888Received(&mut self, num_rtt: usize, pkts_rtt: &[time_tp]) -> bool {
let n = cmp::min(num_rtt, pkts_rtt.len());
for &rtt in &pkts_rtt[..n] {
self.state.m_rtt = rtt;
if self.state.m_cc_state != cs_tp::cs_init {
self.state.m_srtt = self
.state
.m_srtt
.wrapping_add((self.state.m_rtt.wrapping_sub(self.state.m_srtt)) >> 3);
} else {
self.state.m_srtt = self.state.m_rtt;
}
let ref_rtt = self.get_ref_rtt();
self.state.m_vrtt = if self.state.m_srtt > ref_rtt {
self.state.m_srtt
} else {
ref_rtt
};
}
true
}
pub fn PacketReceived(&mut self, timestamp: time_tp, echoed_timestamp: time_tp) -> bool {
if self.state.m_cc_state != cs_tp::cs_init
&& wrapping_sub_time(self.state.m_r_prev_ts, timestamp) > 0
{
return false;
}
let ts = self.Now();
self.state.m_ts_remote = wrapping_sub_time(ts, timestamp);
self.state.m_rtt = wrapping_sub_time(ts, echoed_timestamp);
if self.state.m_cc_state != cs_tp::cs_init {
self.state.m_srtt = self
.state
.m_srtt
.wrapping_add((self.state.m_rtt.wrapping_sub(self.state.m_srtt)) >> 3);
} else {
self.state.m_srtt = self.state.m_rtt;
}
let ref_rtt = self.get_ref_rtt();
self.state.m_vrtt = if self.state.m_srtt > ref_rtt {
self.state.m_srtt
} else {
ref_rtt
};
self.state.m_r_prev_ts = timestamp;
true
}
pub fn ACKReceived(
&mut self,
packets_received: count_tp,
packets_CE: count_tp,
packets_lost: count_tp,
packets_sent: count_tp,
error_L4S: bool,
inflight: &mut count_tp,
) -> bool {
if wrapping_sub_count(self.state.m_packets_received, packets_received) > 0
|| wrapping_sub_count(self.state.m_packets_CE, packets_CE) > 0
{
return false;
}
let pacing_interval: time_tp = (self.state.m_packet_size.wrapping_mul(1_000_000)
/ cmp::max(self.state.m_pacing_rate, 1))
as time_tp;
let srtt: time_tp = self.state.m_srtt;
if self.state.m_cc_state == cs_tp::cs_init {
self.state.m_fractional_window = (srtt as u64).wrapping_mul(self.state.m_pacing_rate);
self.state.m_cc_state = cs_tp::cs_cong_avoid;
}
if srtt <= 2_000 || srtt <= pacing_interval {
self.state.m_cca_mode = cca_tp::cca_prague_rate;
} else {
if self.state.m_cca_mode == cca_tp::cca_prague_rate {
self.state.m_fractional_window =
(srtt as u64).wrapping_mul(self.state.m_pacing_rate);
}
self.state.m_cca_mode = cca_tp::cca_prague_win;
}
let ts = self.Now();
if wrapping_sub_count(
packets_received.wrapping_add(packets_lost),
self.state.m_alpha_packets_sent,
) > 0
&& (wrapping_sub_time(ts, self.state.m_alpha_ts).wrapping_sub(self.state.m_vrtt) >= 0)
{
let denom = packets_received.wrapping_sub(self.state.m_alpha_packets_received);
if denom != 0 {
let num = (packets_CE.wrapping_sub(self.state.m_alpha_packets_CE) as prob_tp)
<< PROB_SHIFT;
let prob = num / (denom as prob_tp);
let alpha_shift = self.get_alpha_shift() as prob_tp;
if alpha_shift != 0 {
self.state.m_alpha += (prob - self.state.m_alpha) / alpha_shift;
}
if self.state.m_alpha > MAX_PROB {
self.state.m_alpha = MAX_PROB;
}
}
self.state.m_alpha_packets_sent = packets_sent;
self.state.m_alpha_packets_CE = packets_CE;
self.state.m_alpha_packets_received = packets_received;
self.state.m_alpha_ts = ts;
if self.state.m_rtts_to_growth > 0 {
self.state.m_rtts_to_growth -= 1;
}
}
if (self.state.m_lost_window > 0 || self.state.m_lost_rate > 0)
&& (wrapping_sub_count(self.state.m_loss_packets_lost, packets_lost) >= 0)
{
self.state.m_cca_mode = self.state.m_loss_cca;
if self.state.m_cca_mode == cca_tp::cca_prague_rate {
self.state.m_pacing_rate = self
.state
.m_pacing_rate
.wrapping_add(self.state.m_lost_rate);
self.state.m_lost_rate = 0;
} else {
self.state.m_fractional_window = self
.state
.m_fractional_window
.wrapping_add(self.state.m_lost_window);
self.state.m_lost_window = 0;
}
self.state.m_rtts_to_growth = self
.state
.m_rtts_to_growth
.wrapping_sub(self.state.m_lost_rtts_to_growth);
if self.state.m_rtts_to_growth < 0 {
self.state.m_rtts_to_growth = 0;
}
self.state.m_lost_rtts_to_growth = 0;
self.state.m_cc_state = cs_tp::cs_cong_avoid;
}
if self.state.m_cc_state == cs_tp::cs_in_loss
&& wrapping_sub_count(
packets_received.wrapping_add(packets_lost),
self.state.m_loss_packets_sent,
) > 0
&& (wrapping_sub_time(ts, self.state.m_loss_ts).wrapping_sub(self.state.m_vrtt) >= 0)
{
self.state.m_cc_state = cs_tp::cs_cong_avoid;
}
if self.state.m_cc_state != cs_tp::cs_in_loss
&& wrapping_sub_count(self.state.m_packets_lost, packets_lost) < 0
{
let mut rtts_to_growth_u64 = self.state.m_pacing_rate / 2;
rtts_to_growth_u64 /= cmp::max(self.state.m_max_packet_size, 1);
rtts_to_growth_u64 = rtts_to_growth_u64.saturating_mul(REF_RTT as u64);
rtts_to_growth_u64 /= cmp::max(self.state.m_vrtt as u64, 1);
rtts_to_growth_u64 = rtts_to_growth_u64.saturating_mul(REF_RTT as u64);
rtts_to_growth_u64 /= 1_000_000;
let rtts_to_growth = rtts_to_growth_u64 as count_tp;
self.state.m_lost_rtts_to_growth += rtts_to_growth - self.state.m_rtts_to_growth;
if self.state.m_lost_rtts_to_growth > rtts_to_growth {
self.state.m_lost_rtts_to_growth = rtts_to_growth;
}
self.state.m_rtts_to_growth = rtts_to_growth;
if self.state.m_cca_mode == cca_tp::cca_prague_win {
self.state.m_lost_window = self.state.m_fractional_window / 2;
self.state.m_fractional_window = self
.state
.m_fractional_window
.wrapping_sub(self.state.m_lost_window);
} else {
self.state.m_lost_rate = self.state.m_pacing_rate / 2;
self.state.m_pacing_rate = self
.state
.m_pacing_rate
.wrapping_sub(self.state.m_lost_rate);
}
self.state.m_cc_state = cs_tp::cs_in_loss;
self.state.m_loss_cca = self.state.m_cca_mode;
self.state.m_loss_packets_sent = packets_sent;
self.state.m_loss_ts = ts;
self.state.m_loss_packets_lost = self.state.m_packets_lost;
}
let acks = (packets_received.wrapping_sub(self.state.m_packets_received))
.wrapping_sub(packets_CE.wrapping_sub(self.state.m_packets_CE));
if self.state.m_cc_state != cs_tp::cs_in_loss && acks > 0 {
let mut increment =
mul_64_64_shift(self.state.m_pacing_rate, QUEUE_GROWTH as u64, 0) / 1_000_000;
if increment < self.state.m_max_packet_size || self.state.m_rtts_to_growth != 0 {
increment = self.state.m_max_packet_size;
}
if self.state.m_cca_mode == cca_tp::cca_prague_win {
let divisor =
mul_64_64_shift(self.state.m_vrtt as u64, self.state.m_vrtt as u64, 0);
let scaler = div_64_64_round(
(srtt as u64)
.wrapping_mul(1_000_000)
.wrapping_mul(srtt as u64),
divisor,
);
let increase = div_64_64_round(
(acks as u64)
.wrapping_mul(self.state.m_packet_size)
.wrapping_mul(scaler)
.wrapping_mul(1_000_000),
cmp::max(self.state.m_fractional_window, 1),
);
let scaled_increase = mul_64_64_shift(increase, increment, 0);
self.state.m_fractional_window =
self.state.m_fractional_window.wrapping_add(scaled_increase);
} else {
let divisor = mul_64_64_shift(self.state.m_packet_size, 1_000_000, 0);
let invscaler = div_64_64_round(
mul_64_64_shift(self.state.m_pacing_rate, self.state.m_vrtt as u64, 0),
divisor,
);
let increase = div_64_64_round(
mul_64_64_shift((acks as u64).wrapping_mul(increment), 1_000_000, 0),
cmp::max(self.state.m_vrtt as u64, 1),
);
let scaled_increase = div_64_64_round(increase, cmp::max(invscaler, 1));
self.state.m_pacing_rate = self.state.m_pacing_rate.wrapping_add(scaled_increase);
}
}
if self.state.m_cc_state == cs_tp::cs_in_cwr
&& wrapping_sub_count(
packets_received.wrapping_add(packets_lost),
self.state.m_cwr_packets_sent,
) > 0
&& (wrapping_sub_time(ts, self.state.m_cwr_ts).wrapping_sub(self.state.m_vrtt) >= 0)
{
self.state.m_cc_state = cs_tp::cs_cong_avoid;
}
if self.state.m_cc_state == cs_tp::cs_cong_avoid
&& wrapping_sub_count(self.state.m_packets_CE, packets_CE) < 0
{
self.state.m_rtts_to_growth =
(self.state.m_pacing_rate / RATE_STEP) as count_tp + (MIN_STEP as count_tp);
let alpha_u64 = self.state.m_alpha as u64;
if self.state.m_cca_mode == cca_tp::cca_prague_win {
self.state.m_fractional_window = self.state.m_fractional_window.wrapping_sub(
(self.state.m_fractional_window.wrapping_mul(alpha_u64)) >> (PROB_SHIFT + 1),
);
} else {
self.state.m_pacing_rate = self.state.m_pacing_rate.wrapping_sub(
(self.state.m_pacing_rate.wrapping_mul(alpha_u64)) >> (PROB_SHIFT + 1),
);
}
self.state.m_cc_state = cs_tp::cs_in_cwr;
self.state.m_cwr_packets_sent = packets_sent;
self.state.m_cwr_ts = ts;
}
if self.state.m_cca_mode != cca_tp::cca_prague_rate {
self.state.m_pacing_rate = self.state.m_fractional_window / cmp::max(srtt as u64, 1);
}
if self.state.m_pacing_rate < self.state.m_min_rate {
self.state.m_pacing_rate = self.state.m_min_rate;
}
if self.state.m_pacing_rate > self.state.m_max_rate {
self.state.m_pacing_rate = self.state.m_max_rate;
}
self.state.m_fractional_window = self.state.m_pacing_rate.wrapping_mul(srtt as u64);
if self.state.m_fractional_window == 0 {
self.state.m_fractional_window = 1;
}
self.state.m_packet_size = self
.state
.m_pacing_rate
.wrapping_mul(self.state.m_vrtt as u64)
/ 1_000_000
/ (MIN_PKT_WIN as u64);
if self.state.m_packet_size < PRAGUE_MINMTU {
self.state.m_packet_size = PRAGUE_MINMTU;
}
if self.state.m_packet_size > self.state.m_max_packet_size {
self.state.m_packet_size = self.state.m_max_packet_size;
}
self.state.m_packet_burst = (self.state.m_pacing_rate.wrapping_mul(BURST_TIME as u64)
/ 1_000_000
/ cmp::max(self.state.m_packet_size, 1))
as count_tp;
if self.state.m_packet_burst < MIN_PKT_BURST {
self.state.m_packet_burst = MIN_PKT_BURST;
}
self.state.m_packet_window =
(((self.state.m_fractional_window * (100 + RATE_OFFSET as u64)) / 100_000_000)
/ cmp::max(self.state.m_packet_size, 1)
+ 1) as count_tp;
if self.state.m_packet_window < MIN_PKT_WIN {
self.state.m_packet_window = MIN_PKT_WIN;
}
self.state.m_cc_ts = ts;
self.state.m_packets_received = packets_received;
self.state.m_packets_CE = packets_CE;
self.state.m_packets_lost = packets_lost;
self.state.m_packets_sent = packets_sent;
if error_L4S {
self.state.m_error_L4S = true;
}
*inflight = packets_sent
.wrapping_sub(self.state.m_packets_received)
.wrapping_sub(self.state.m_packets_lost);
true
}
pub fn DataReceivedSequence(&mut self, mut ip_ecn: ecn_tp, packet_seq_nr: count_tp) {
ip_ecn = ecn_mask_ce(ip_ecn);
self.state.m_r_packets_received = self.state.m_r_packets_received.wrapping_add(1);
let skipped = packet_seq_nr
.wrapping_sub(self.state.m_r_packets_received)
.wrapping_sub(self.state.m_r_packets_lost);
if skipped >= 0 {
self.state.m_r_packets_lost = self.state.m_r_packets_lost.wrapping_add(skipped);
} else if self.state.m_r_packets_lost > 0 {
self.state.m_r_packets_lost = self.state.m_r_packets_lost.wrapping_sub(1);
}
if ip_ecn == ecn_tp::ecn_ce {
self.state.m_r_packets_CE = self.state.m_r_packets_CE.wrapping_add(1);
} else if ip_ecn != ecn_tp::ecn_l4s_id {
self.state.m_r_error_L4S = true;
}
}
pub fn DataReceived(&mut self, mut ip_ecn: ecn_tp, packets_lost: count_tp) {
ip_ecn = ecn_mask_ce(ip_ecn);
self.state.m_r_packets_received = self.state.m_r_packets_received.wrapping_add(1);
self.state.m_r_packets_lost = self.state.m_r_packets_lost.wrapping_add(packets_lost);
if ip_ecn == ecn_tp::ecn_ce {
self.state.m_r_packets_CE = self.state.m_r_packets_CE.wrapping_add(1);
} else if ip_ecn != ecn_tp::ecn_l4s_id {
self.state.m_r_error_L4S = true;
}
}
pub fn ResetCCInfo(&mut self) {
self.state.m_cc_ts = self.Now();
self.state.m_cc_state = cs_tp::cs_init;
self.state.m_cca_mode = cca_tp::cca_prague_win;
self.state.m_alpha_ts = self.state.m_cc_ts;
self.state.m_alpha = 0;
self.state.m_pacing_rate = self.state.m_init_rate;
self.state.m_fractional_window = self.state.m_max_packet_size.wrapping_mul(1_000_000);
self.state.m_packet_burst = MIN_PKT_BURST;
self.state.m_packet_size = self.state.m_max_packet_size;
self.state.m_packet_window = MIN_PKT_WIN;
self.state.m_rtts_to_growth =
(self.state.m_pacing_rate / RATE_STEP) as count_tp + (MIN_STEP as count_tp);
self.state.m_lost_rtts_to_growth = 0;
}
pub fn GetTimeInfo(
&mut self,
timestamp: &mut time_tp,
echoed_timestamp: &mut time_tp,
ip_ecn: &mut ecn_tp,
) {
*timestamp = self.Now();
if self.state.m_ts_remote != 0 {
*echoed_timestamp = timestamp.wrapping_sub(self.state.m_ts_remote);
} else {
*echoed_timestamp = 0;
}
*ip_ecn = if self.state.m_error_L4S {
ecn_tp::ecn_not_ect
} else {
ecn_tp::ecn_l4s_id
};
}
pub fn GetCCInfo(
&mut self,
pacing_rate: &mut rate_tp,
packet_window: &mut count_tp,
packet_burst: &mut count_tp,
packet_size: &mut size_tp,
) {
if wrapping_sub_time(self.Now(), self.state.m_alpha_ts).wrapping_sub(self.state.m_vrtt >> 1)
>= 0
{
*pacing_rate = self.state.m_pacing_rate * 100 / (100 + RATE_OFFSET as u64);
} else {
*pacing_rate = self.state.m_pacing_rate * (100 + RATE_OFFSET as u64) / 100;
}
*packet_window = self.state.m_packet_window;
*packet_burst = self.state.m_packet_burst;
*packet_size = self.state.m_packet_size;
}
pub fn GetCCInfoVideo(
&mut self,
pacing_rate: &mut rate_tp,
frame_size: &mut size_tp,
frame_window: &mut count_tp,
packet_burst: &mut count_tp,
packet_size: &mut size_tp,
) {
*pacing_rate = self.state.m_pacing_rate;
*packet_burst = self.state.m_packet_burst;
*packet_size = self.state.m_packet_size;
let fs = self
.state
.m_pacing_rate
.wrapping_mul(self.state.m_frame_budget as u64)
/ 1_000_000;
*frame_size = if self.state.m_packet_size > fs {
self.state.m_packet_size
} else {
fs
};
*frame_window = (self.state.m_packet_window as i64 * self.state.m_packet_size as i64
/ (*frame_size as i64)) as count_tp;
if *frame_window < MIN_FRAME_WIN {
*frame_window = MIN_FRAME_WIN;
}
}
pub fn bulk_advice(&mut self) -> PragueRateAdvice {
let (mut pacing_rate, mut packet_window, mut packet_burst, mut packet_size) = (0, 0, 0, 0);
self.GetCCInfo(
&mut pacing_rate,
&mut packet_window,
&mut packet_burst,
&mut packet_size,
);
PragueRateAdvice::new(
self.state,
pacing_rate,
packet_window,
packet_burst,
packet_size,
)
}
pub fn video_advice(&mut self) -> PragueVideoRateAdvice {
let (mut pacing_rate, mut frame_size, mut frame_window, mut packet_burst, mut packet_size) =
(0, 0, 0, 0, 0);
self.GetCCInfoVideo(
&mut pacing_rate,
&mut frame_size,
&mut frame_window,
&mut packet_burst,
&mut packet_size,
);
PragueVideoRateAdvice {
transport: PragueRateAdvice::new(
self.state,
pacing_rate,
self.state.m_packet_window,
packet_burst,
packet_size,
),
target_frame_size_bytes: frame_size,
frame_window,
}
}
pub fn GetACKInfo(
&self,
packets_received: &mut count_tp,
packets_CE: &mut count_tp,
packets_lost: &mut count_tp,
error_L4S: &mut bool,
) {
*packets_received = self.state.m_r_packets_received;
*packets_CE = self.state.m_r_packets_CE;
*packets_lost = self.state.m_r_packets_lost;
*error_L4S = self.state.m_r_error_L4S;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn now_skips_zero_and_wraps() {
let mut cc = PragueCC::default();
let t1 = cc.Now();
assert!(t1 > 0);
for _ in 0..10 {
assert_ne!(cc.Now(), 0);
}
}
#[test]
fn helpers_match_expected_simple_cases() {
assert_eq!(mul_64_64_shift(2, 3, 0), 6);
assert_eq!(mul_64_64_shift(u64::MAX, 2, 0), u64::MAX);
assert_eq!(div_64_64_round(10, 3), 3); assert_eq!(div_64_64_round(11, 3), 4); assert_eq!(div_64_64_round(1, 0), u64::MAX);
}
#[test]
fn bulk_advice_exposes_application_facing_guidance() {
let mut cc = PragueCC::default();
cc.state.m_alpha_ts = cc.Now();
cc.state.m_vrtt = 0;
cc.state.m_pacing_rate = 200_000;
cc.state.m_packet_window = 11;
cc.state.m_packet_burst = 3;
cc.state.m_packet_size = 1200;
cc.state.m_cc_state = cs_tp::cs_cong_avoid;
cc.state.m_cca_mode = cca_tp::cca_prague_rate;
cc.state.m_rtt = 11_000;
cc.state.m_srtt = 12_000;
cc.state.m_alpha = MAX_PROB / 8;
let advice = cc.bulk_advice();
assert_eq!(advice.pacing_rate_bytes_per_sec, 200_000 * 100 / 103);
assert_eq!(advice.packet_window, 11);
assert_eq!(advice.packet_burst, 3);
assert_eq!(advice.packet_size_bytes, 1200);
assert_eq!(advice.next_send_ecn, ecn_tp::ecn_l4s_id);
assert_eq!(advice.controller_state, cs_tp::cs_cong_avoid);
assert_eq!(advice.controller_mode, cca_tp::cca_prague_rate);
assert_eq!(advice.last_rtt_us, 11_000);
assert_eq!(advice.smoothed_rtt_us, 12_000);
assert_eq!(advice.ce_ratio_ppm, 125_000);
assert_eq!(advice.congestion_signal, PragueCongestionSignal::EcnMarked);
assert!(!advice.l4s_fallback_active);
assert_eq!(advice.pacing_rate_bits_per_sec(), (200_000 * 100 / 103) * 8);
}
#[test]
fn video_advice_reports_frame_budget_guidance() {
let mut cc = PragueCC::new(
PRAGUE_INITMTU,
60,
10_000,
PRAGUE_INITRATE,
PRAGUE_INITWIN,
PRAGUE_MINRATE,
PRAGUE_MAXRATE,
);
cc.state.m_pacing_rate = 240_000;
cc.state.m_packet_window = 8;
cc.state.m_packet_burst = 2;
cc.state.m_packet_size = 1200;
let advice = cc.video_advice();
assert_eq!(advice.transport.pacing_rate_bytes_per_sec, 240_000);
assert_eq!(advice.transport.packet_burst, 2);
assert_eq!(advice.transport.packet_size_bytes, 1200);
assert_eq!(advice.target_frame_size_bytes, 2400);
assert_eq!(advice.frame_window, 4);
}
#[test]
fn bitrate_action_since_classifies_meaningful_changes() {
let previous = PragueRateAdvice {
pacing_rate_bytes_per_sec: 100_000,
packet_window: 4,
packet_burst: 1,
packet_size_bytes: 1200,
next_send_ecn: ecn_tp::ecn_l4s_id,
controller_state: cs_tp::cs_cong_avoid,
controller_mode: cca_tp::cca_prague_win,
last_rtt_us: 10_000,
smoothed_rtt_us: 10_000,
virtual_rtt_us: 10_000,
ce_ratio_ppm: 0,
congestion_signal: PragueCongestionSignal::Stable,
l4s_fallback_active: false,
packets_received: 0,
packets_ce: 0,
packets_lost: 0,
};
let mut current = previous;
current.pacing_rate_bytes_per_sec = 108_000;
assert_eq!(
current.bitrate_action_since(&previous, 5),
PragueBitrateAction::Increase
);
current.pacing_rate_bytes_per_sec = 97_000;
assert_eq!(
current.bitrate_action_since(&previous, 5),
PragueBitrateAction::Hold
);
current.pacing_rate_bytes_per_sec = 90_000;
assert_eq!(
current.bitrate_action_since(&previous, 5),
PragueBitrateAction::Decrease
);
}
#[test]
fn congestion_signal_prioritizes_fallback_then_loss_then_ecn() {
let mut cc = PragueCC::default();
cc.state.m_alpha_ts = cc.Now();
cc.state.m_alpha = MAX_PROB / 16;
assert_eq!(
cc.bulk_advice().congestion_signal,
PragueCongestionSignal::EcnMarked
);
cc.state.m_cc_state = cs_tp::cs_in_loss;
assert_eq!(
cc.bulk_advice().congestion_signal,
PragueCongestionSignal::LossRecovery
);
cc.state.m_error_L4S = true;
assert_eq!(
cc.bulk_advice().congestion_signal,
PragueCongestionSignal::L4sFallback
);
}
#[test]
fn ack_received_rejects_stale_counters() {
let mut cc = PragueCC::default();
cc.state.m_packets_received = 10;
cc.state.m_packets_CE = 3;
let mut inflight = 77;
assert!(!cc.ACKReceived(9, 3, 0, 20, false, &mut inflight));
assert_eq!(inflight, 77);
assert!(!cc.ACKReceived(10, 2, 0, 20, false, &mut inflight));
assert_eq!(inflight, 77);
assert_eq!(cc.state.m_packets_received, 10);
assert_eq!(cc.state.m_packets_CE, 3);
}
#[test]
fn data_received_sequence_tracks_loss_marks_and_reordering() {
let mut cc = PragueCC::default();
cc.DataReceivedSequence(ecn_tp::ecn_l4s_id, 1);
assert_eq!(cc.state.m_r_packets_received, 1);
assert_eq!(cc.state.m_r_packets_lost, 0);
assert_eq!(cc.state.m_r_packets_CE, 0);
assert!(!cc.state.m_r_error_L4S);
cc.DataReceivedSequence(ecn_tp::ecn_ce, 3);
assert_eq!(cc.state.m_r_packets_received, 2);
assert_eq!(cc.state.m_r_packets_lost, 1);
assert_eq!(cc.state.m_r_packets_CE, 1);
cc.DataReceivedSequence(ecn_tp::ecn_ect0, 2);
assert_eq!(cc.state.m_r_packets_received, 3);
assert_eq!(cc.state.m_r_packets_lost, 0);
assert_eq!(cc.state.m_r_packets_CE, 1);
assert!(cc.state.m_r_error_L4S);
}
#[test]
fn ack_received_enters_loss_and_reordering_undo_restores_window() {
let mut cc = PragueCC::default();
cc.state.m_cc_state = cs_tp::cs_cong_avoid;
cc.state.m_cca_mode = cca_tp::cca_prague_win;
cc.state.m_srtt = 10_000;
cc.state.m_vrtt = 10_000;
cc.state.m_packet_size = 1200;
cc.state.m_max_packet_size = 1200;
cc.state.m_pacing_rate = 200_000;
cc.state.m_fractional_window = 2_000_000_000;
cc.state.m_packet_window = 10;
cc.state.m_packets_received = 10;
cc.state.m_packets_CE = 0;
cc.state.m_packets_lost = 0;
cc.state.m_packets_sent = 20;
cc.state.m_alpha_ts = cc.Now().wrapping_sub(cc.state.m_vrtt);
let original_window = cc.state.m_fractional_window;
let mut inflight = 0;
assert!(cc.ACKReceived(12, 0, 1, 30, false, &mut inflight));
assert_eq!(cc.state.m_cc_state, cs_tp::cs_in_loss);
assert_eq!(cc.state.m_lost_window, original_window / 2);
assert_eq!(cc.state.m_fractional_window, original_window / 2);
assert!(cc.ACKReceived(12, 0, 0, 31, false, &mut inflight));
assert_eq!(cc.state.m_cc_state, cs_tp::cs_cong_avoid);
assert_eq!(cc.state.m_lost_window, 0);
assert_eq!(cc.state.m_fractional_window, original_window);
}
#[test]
fn reset_cc_info_restores_initial_runtime_bounds() {
let mut cc = PragueCC::default();
cc.state.m_cc_state = cs_tp::cs_in_cwr;
cc.state.m_cca_mode = cca_tp::cca_prague_rate;
cc.state.m_pacing_rate = 333_333;
cc.state.m_fractional_window = 999;
cc.state.m_packet_burst = 7;
cc.state.m_packet_size = 777;
cc.state.m_packet_window = 99;
cc.state.m_rtts_to_growth = 44;
cc.state.m_lost_rtts_to_growth = 11;
cc.ResetCCInfo();
assert_eq!(cc.state.m_cc_state, cs_tp::cs_init);
assert_eq!(cc.state.m_cca_mode, cca_tp::cca_prague_win);
assert_eq!(cc.state.m_pacing_rate, cc.state.m_init_rate);
assert_eq!(
cc.state.m_fractional_window,
cc.state.m_max_packet_size * 1_000_000
);
assert_eq!(cc.state.m_packet_burst, MIN_PKT_BURST);
assert_eq!(cc.state.m_packet_size, cc.state.m_max_packet_size);
assert_eq!(cc.state.m_packet_window, MIN_PKT_WIN);
assert_eq!(cc.state.m_lost_rtts_to_growth, 0);
assert_eq!(
cc.state.m_rtts_to_growth,
(cc.state.m_pacing_rate / RATE_STEP) as count_tp + (MIN_STEP as count_tp)
);
}
}