use crate::{Duration, Instant};
use tracing::warn;
#[derive(Debug)]
pub(super) struct Pacer {
capacity: u64,
last_window: u64,
last_mtu: u16,
tokens: u64,
prev: Instant,
}
impl Pacer {
pub(super) fn new(smoothed_rtt: Duration, window: u64, mtu: u16, now: Instant) -> Self {
let capacity = optimal_capacity(smoothed_rtt, window, mtu);
Self {
capacity,
last_window: window,
last_mtu: mtu,
tokens: capacity,
prev: now,
}
}
pub(super) fn on_transmit(&mut self, packet_length: u16) {
self.tokens = self.tokens.saturating_sub(packet_length.into())
}
pub(super) fn delay(
&mut self,
smoothed_rtt: Duration,
bytes_to_send: u64,
mtu: u16,
window: u64,
now: Instant,
) -> Option<Duration> {
debug_assert_ne!(
window, 0,
"zero-sized congestion control window is nonsense"
);
if window != self.last_window || mtu != self.last_mtu {
self.capacity = optimal_capacity(smoothed_rtt, window, mtu);
self.tokens = self.capacity.min(self.tokens);
self.last_window = window;
self.last_mtu = mtu;
}
if self.tokens >= bytes_to_send {
return None;
}
if window > u64::from(u32::MAX) {
return None;
}
let window = window as u32;
let time_elapsed = now.checked_duration_since(self.prev).unwrap_or_else(|| {
warn!("received a timestamp early than a previous recorded time, ignoring");
Default::default()
});
if smoothed_rtt.as_nanos() == 0 {
return None;
}
let elapsed_rtts = time_elapsed.as_secs_f64() / smoothed_rtt.as_secs_f64();
let new_tokens = (window as f64 * 1.25 * elapsed_rtts).round() as u64;
self.tokens = self.tokens.saturating_add(new_tokens).min(self.capacity);
if new_tokens > 0 {
self.prev = now;
}
if self.tokens >= bytes_to_send {
return None;
}
let unscaled_delay = smoothed_rtt
.checked_mul((bytes_to_send.max(self.capacity) - self.tokens) as _)
.unwrap_or(Duration::MAX)
/ window;
Some((unscaled_delay / 5) * 4)
}
}
fn optimal_capacity(smoothed_rtt: Duration, window: u64, mtu: u16) -> u64 {
let rtt = smoothed_rtt.as_nanos().max(1);
let mtu = u64::from(mtu);
let target_capacity = ((window as u128 * TARGET_BURST_INTERVAL.as_nanos()) / rtt) as u64;
let max_capacity = Ord::max(
((window as u128 * MAX_BURST_INTERVAL.as_nanos()) / rtt) as u64,
mtu,
);
Ord::min(
max_capacity,
target_capacity.clamp(MIN_BURST_SIZE * mtu, MAX_BURST_SIZE * mtu),
)
}
const TARGET_BURST_INTERVAL: Duration = Duration::from_millis(2);
const MAX_BURST_INTERVAL: Duration = Duration::from_millis(10);
const MIN_BURST_SIZE: u64 = 10;
const MAX_BURST_SIZE: u64 = 256;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn does_not_panic_on_bad_instant() {
let old_instant = Instant::now();
let new_instant = old_instant + Duration::from_micros(15);
let rtt = Duration::from_micros(400);
assert!(
Pacer::new(rtt, 30000, 1500, new_instant)
.delay(Duration::from_micros(0), 0, 1500, 1, old_instant)
.is_none()
);
assert!(
Pacer::new(rtt, 30000, 1500, new_instant)
.delay(Duration::from_micros(0), 1600, 1500, 1, old_instant)
.is_none()
);
assert!(
Pacer::new(rtt, 30000, 1500, new_instant)
.delay(Duration::from_micros(0), 1500, 1500, 3000, old_instant)
.is_none()
);
}
#[test]
fn derives_initial_capacity() {
let window = 2_000_000;
let mtu = 1500;
let rtt = Duration::from_millis(50);
let now = Instant::now();
let pacer = Pacer::new(rtt, window, mtu, now);
assert_eq!(
pacer.capacity,
(window as u128 * TARGET_BURST_INTERVAL.as_nanos() / rtt.as_nanos()) as u64
);
assert_eq!(pacer.tokens, pacer.capacity);
let pacer = Pacer::new(Duration::from_millis(0), window, mtu, now);
assert_eq!(pacer.capacity, MAX_BURST_SIZE * mtu as u64);
assert_eq!(pacer.tokens, pacer.capacity);
let pacer = Pacer::new(rtt, 1, mtu, now);
assert_eq!(pacer.capacity, mtu as u64);
assert_eq!(pacer.tokens, pacer.capacity);
}
#[test]
fn adjusts_capacity() {
let window = 2_000_000;
let mtu = 1500;
let rtt = Duration::from_millis(50);
let now = Instant::now();
let mut pacer = Pacer::new(rtt, window, mtu, now);
assert_eq!(
pacer.capacity,
(window as u128 * TARGET_BURST_INTERVAL.as_nanos() / rtt.as_nanos()) as u64
);
assert_eq!(pacer.tokens, pacer.capacity);
let initial_tokens = pacer.tokens;
pacer.delay(rtt, mtu as u64, mtu, window * 2, now);
assert_eq!(
pacer.capacity,
(2 * window as u128 * TARGET_BURST_INTERVAL.as_nanos() / rtt.as_nanos()) as u64
);
assert_eq!(pacer.tokens, initial_tokens);
pacer.delay(rtt, mtu as u64, mtu, window / 2, now);
assert_eq!(
pacer.capacity,
(window as u128 / 2 * TARGET_BURST_INTERVAL.as_nanos() / rtt.as_nanos()) as u64
);
assert_eq!(pacer.tokens, initial_tokens / 2);
pacer.delay(rtt, mtu as u64, mtu * 2, window, now);
assert_eq!(
pacer.capacity,
(window as u128 * TARGET_BURST_INTERVAL.as_nanos() / rtt.as_nanos()) as u64
);
pacer.delay(rtt, mtu as u64, 20_000, window, now);
assert_eq!(pacer.capacity, 20_000_u64 * MIN_BURST_SIZE);
}
#[test]
fn computes_pause_correctly() {
let window = 2_000_000u64;
let mtu = 1000;
let rtt = Duration::from_millis(50);
let old_instant = Instant::now();
let mut pacer = Pacer::new(rtt, window, mtu, old_instant);
let packet_capacity = pacer.capacity / mtu as u64;
for _ in 0..packet_capacity {
assert_eq!(
pacer.delay(rtt, mtu as u64, mtu, window, old_instant),
None,
"When capacity is available packets should be sent immediately"
);
pacer.on_transmit(mtu);
}
let pace_duration = Duration::from_nanos((TARGET_BURST_INTERVAL.as_nanos() * 4 / 5) as u64);
let actual_delay = pacer
.delay(rtt, mtu as u64, mtu, window, old_instant)
.expect("Send must be delayed");
let diff = actual_delay.abs_diff(pace_duration);
assert!(
diff < Duration::from_nanos(2),
"expected ≈ {pace_duration:?}, got {actual_delay:?} (diff {diff:?})"
);
assert_eq!(
pacer.delay(
rtt,
mtu as u64,
mtu,
window,
old_instant + pace_duration / 2
),
None
);
assert_eq!(pacer.tokens, pacer.capacity / 2);
for _ in 0..packet_capacity / 2 {
assert_eq!(
pacer.delay(rtt, mtu as u64, mtu, window, old_instant),
None,
"When capacity is available packets should be sent immediately"
);
pacer.on_transmit(mtu);
}
assert_eq!(
pacer.delay(
rtt,
mtu as u64,
mtu,
window,
old_instant + pace_duration * 3 / 2
),
None
);
assert_eq!(pacer.tokens, pacer.capacity);
}
}