use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use crate::simulation::{RealTime, TimeSource};
use crate::transport::packet_data::MAX_PACKET_SIZE;
pub const DEFAULT_RATE_BYTES_PER_SEC: usize = 1_250_000;
const LOSS_PAUSE_MARGIN: usize = 50 * MAX_PACKET_SIZE;
const _: () = assert!(
LOSS_PAUSE_MARGIN >= 10 * MAX_PACKET_SIZE,
"LOSS_PAUSE_MARGIN must allow enough packets for reliable recovery under loss"
);
#[derive(Debug, Clone)]
pub struct FixedRateConfig {
pub rate_bytes_per_sec: usize,
}
impl Default for FixedRateConfig {
fn default() -> Self {
Self {
rate_bytes_per_sec: DEFAULT_RATE_BYTES_PER_SEC,
}
}
}
impl FixedRateConfig {
pub fn new(rate_bytes_per_sec: usize) -> Self {
Self { rate_bytes_per_sec }
}
pub fn from_mbps(mbps: usize) -> Self {
Self {
rate_bytes_per_sec: mbps * 1_000_000 / 8,
}
}
}
pub struct FixedRateController<T: TimeSource = RealTime> {
rate: usize,
flightsize: AtomicUsize,
loss_pause_cwnd: AtomicUsize,
#[allow(dead_code)]
time_source: T,
}
impl FixedRateController<RealTime> {
pub fn new(config: FixedRateConfig) -> Self {
Self::new_with_time_source(config, RealTime::new())
}
}
impl<T: TimeSource> FixedRateController<T> {
pub fn new_with_time_source(config: FixedRateConfig, time_source: T) -> Self {
Self {
rate: config.rate_bytes_per_sec,
flightsize: AtomicUsize::new(0),
loss_pause_cwnd: AtomicUsize::new(0),
time_source,
}
}
pub fn on_send(&self, bytes: usize) {
self.flightsize.fetch_add(bytes, Ordering::Relaxed);
}
pub fn on_ack(&self, _rtt_sample: Duration, bytes_acked: usize) {
self.on_ack_without_rtt(bytes_acked);
}
pub fn on_ack_without_rtt(&self, bytes_acked: usize) {
self.loss_pause_cwnd.store(0, Ordering::Release);
self.flightsize
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
Some(current.saturating_sub(bytes_acked))
})
.ok();
}
pub fn on_loss(&self) {
let fs = self.flightsize.load(Ordering::Relaxed);
self.loss_pause_cwnd.store(fs.max(1), Ordering::Release);
}
pub fn on_timeout(&self) {
let fs = self.flightsize.load(Ordering::Relaxed);
self.loss_pause_cwnd.store(fs.max(1), Ordering::Release);
}
pub fn current_cwnd(&self) -> usize {
let paused_at = self.loss_pause_cwnd.load(Ordering::Acquire);
if paused_at > 0 {
paused_at + LOSS_PAUSE_MARGIN
} else {
usize::MAX / 2
}
}
pub fn current_rate(&self, _rtt: Duration) -> usize {
self.rate
}
pub fn flightsize(&self) -> usize {
self.flightsize.load(Ordering::Relaxed)
}
pub fn base_delay(&self) -> Duration {
Duration::ZERO
}
pub fn queuing_delay(&self) -> Duration {
Duration::ZERO
}
pub fn peak_cwnd(&self) -> usize {
self.current_cwnd()
}
pub fn rate(&self) -> usize {
self.rate
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::simulation::VirtualTime;
#[test]
fn test_default_config() {
let config = FixedRateConfig::default();
assert_eq!(config.rate_bytes_per_sec, 1_250_000); }
#[test]
fn test_from_mbps() {
let config = FixedRateConfig::from_mbps(10);
assert_eq!(config.rate_bytes_per_sec, 1_250_000); }
#[test]
fn test_flightsize_tracking() {
let controller = FixedRateController::new(FixedRateConfig::default());
assert_eq!(controller.flightsize(), 0);
controller.on_send(1000);
assert_eq!(controller.flightsize(), 1000);
controller.on_send(500);
assert_eq!(controller.flightsize(), 1500);
controller.on_ack(Duration::from_millis(100), 800);
assert_eq!(controller.flightsize(), 700);
controller.on_ack_without_rtt(700);
assert_eq!(controller.flightsize(), 0);
}
#[test]
fn test_flightsize_saturating() {
let controller = FixedRateController::new(FixedRateConfig::default());
controller.on_send(100);
controller.on_ack_without_rtt(200);
assert_eq!(controller.flightsize(), 0);
}
#[test]
fn test_cwnd_always_large() {
let controller = FixedRateController::new(FixedRateConfig::default());
let cwnd = controller.current_cwnd();
assert!(cwnd > 1_000_000_000); }
#[test]
fn test_rate_constant() {
let config = FixedRateConfig::from_mbps(50);
let controller = FixedRateController::new(config);
assert_eq!(
controller.current_rate(Duration::from_millis(10)),
6_250_000
);
assert_eq!(
controller.current_rate(Duration::from_millis(100)),
6_250_000
);
assert_eq!(
controller.current_rate(Duration::from_millis(1000)),
6_250_000
);
}
#[test]
fn test_loss_pause_caps_cwnd_with_margin() {
let controller = FixedRateController::new(FixedRateConfig::default());
controller.on_send(1000);
let flightsize_at_loss = controller.flightsize();
assert!(controller.current_cwnd() > 1_000_000_000);
controller.on_loss();
assert_eq!(
controller.current_cwnd(),
flightsize_at_loss + LOSS_PAUSE_MARGIN
);
controller.on_send(2000);
assert_eq!(controller.flightsize(), 3000);
assert_eq!(
controller.current_cwnd(),
flightsize_at_loss + LOSS_PAUSE_MARGIN,
"cwnd must be frozen at loss-time flightsize, not current flightsize"
);
controller.on_ack(Duration::from_millis(50), 500);
assert!(controller.current_cwnd() > 1_000_000_000);
assert_eq!(controller.flightsize(), 2500); }
#[test]
fn test_loss_pause_allows_packet_then_blocks() {
let controller = FixedRateController::new(FixedRateConfig::default());
let packet_size = MAX_PACKET_SIZE;
controller.on_send(5000);
controller.on_loss();
let flightsize = controller.flightsize();
let cwnd = controller.current_cwnd();
assert!(
flightsize + packet_size <= cwnd,
"loss_pause cwnd ({cwnd}) must allow at least one packet \
(flightsize={flightsize}, packet_size={packet_size}). \
Without margin, sending stalls completely. See #3702."
);
controller.on_send(LOSS_PAUSE_MARGIN);
let new_flightsize = controller.flightsize();
assert!(
new_flightsize + packet_size > cwnd,
"After consuming the margin, loss_pause should block further sending \
(flightsize={new_flightsize}, cwnd={cwnd})"
);
}
#[test]
fn test_loss_pause_margin_sustains_progress_under_loss() {
let controller = FixedRateController::new(FixedRateConfig::default());
let initial_flightsize = 100_000;
controller.on_send(initial_flightsize);
controller.on_timeout();
let cwnd = controller.current_cwnd();
let margin = cwnd - initial_flightsize;
let packets_allowed = margin / MAX_PACKET_SIZE;
assert!(
packets_allowed >= 20,
"loss_pause margin should allow at least 20 packets for recovery, \
got {packets_allowed} (margin={margin}B). A 2-packet margin caused \
production stream stalls when both trickle packets were lost."
);
}
#[test]
fn test_with_virtual_time() {
let time_source = VirtualTime::new();
let config = FixedRateConfig::from_mbps(25);
let controller = FixedRateController::new_with_time_source(config, time_source);
assert_eq!(controller.rate(), 3_125_000);
assert_eq!(controller.flightsize(), 0);
}
}