use std::time::{Duration, Instant};
use crate::RecoveryConfig;
const PACING_GRANULARITY: Duration = Duration::from_millis(1);
const MIN_BURST_PACKET_NUM: u64 = 10;
const MAX_BURST_PACKET_NUM: u64 = 128;
#[derive(Debug)]
pub struct Pacer {
enabled: bool,
capacity: u64,
tokens: u64,
last_cwnd: u64,
last_sched_time: Instant,
granularity: Duration,
}
impl Pacer {
pub fn new(
enabled: bool,
srtt: Duration,
cwnd: u64,
mtu: u64,
now: Instant,
granularity: Duration,
) -> Self {
let mut pacer = Pacer {
enabled,
capacity: 0,
tokens: 0,
last_cwnd: cwnd,
last_sched_time: now,
granularity,
};
pacer.update_capacity(cwnd, srtt, mtu);
pacer.tokens = pacer.capacity;
pacer
}
pub fn build_pacer_controller(conf: &RecoveryConfig) -> Self {
Pacer::new(
conf.enable_pacing,
conf.initial_rtt,
conf.initial_congestion_window
.saturating_mul(conf.max_datagram_size as u64),
conf.max_datagram_size as u64,
Instant::now(),
conf.pacing_granularity,
)
}
pub fn enabled(&self) -> bool {
self.enabled
}
pub fn on_sent(&mut self, bytes_sent: u64) {
if self.enabled {
self.tokens = self.tokens.saturating_sub(bytes_sent)
}
}
pub fn schedule(
&mut self,
bytes_to_send: u64,
pacing_rate: u64,
srtt: Duration,
cwnd: u64,
mtu: u64,
now: Instant,
) -> Option<Instant> {
if !self.enabled || srtt.is_zero() || cwnd == 0 || pacing_rate == 0 {
return None;
}
if cwnd != self.last_cwnd {
self.update_capacity(cwnd, srtt, mtu);
self.tokens = self.capacity.min(self.tokens);
self.last_cwnd = cwnd;
}
if self.tokens >= bytes_to_send {
return None;
}
let elapsed = now.saturating_duration_since(self.last_sched_time);
self.tokens = self
.tokens
.saturating_add((pacing_rate as u128 * elapsed.as_nanos() / 1_000_000_000) as u64)
.min(self.capacity);
self.last_sched_time = now;
if bytes_to_send <= self.tokens {
return None;
}
let time_to_wait =
bytes_to_send.saturating_sub(self.tokens) * 1_000_000_000 / pacing_rate.max(1);
Some(self.last_sched_time + Duration::from_nanos(time_to_wait))
}
fn update_capacity(&mut self, cwnd: u64, srtt: Duration, mtu: u64) {
let capacity =
(cwnd as u128 * self.granularity.as_nanos() / srtt.as_nanos().max(1_000_000)) as u64;
self.capacity = capacity.clamp(MIN_BURST_PACKET_NUM * mtu, MAX_BURST_PACKET_NUM * mtu)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pacer_new() {
let srtt = Duration::from_millis(1);
let mtu: u64 = 1500;
let enabled: bool = true;
let now = Instant::now();
let cwnd: u64 = 20 * mtu;
let p = Pacer::new(enabled, srtt, cwnd, mtu, now, PACING_GRANULARITY);
assert!(p.enabled() == true);
assert_eq!(p.capacity, p.tokens);
assert_eq!(
p.capacity,
cwnd * PACING_GRANULARITY.as_nanos() as u64 / srtt.as_nanos() as u64
);
let cwnd: u64 = 1 * mtu;
let p = Pacer::new(enabled, srtt, cwnd, mtu, now, PACING_GRANULARITY);
assert!(p.enabled() == true);
assert_eq!(p.capacity, p.tokens);
assert_eq!(p.capacity, MIN_BURST_PACKET_NUM * mtu);
let cwnd: u64 = 200 * mtu;
let p = Pacer::new(enabled, srtt, cwnd, mtu, now, PACING_GRANULARITY);
assert!(p.enabled() == true);
assert_eq!(p.capacity, p.tokens);
assert_eq!(p.capacity, MAX_BURST_PACKET_NUM * mtu);
}
#[test]
fn pacer_disabled() {
let srtt = Duration::from_millis(1);
let mtu: u64 = 1500;
let cwnd: u64 = 20 * 1500;
let enabled: bool = false;
let now = Instant::now();
let bytes_to_send: u64 = 1000;
let pacing_rate: u64 = 1000000;
let mut p = Pacer::new(enabled, srtt, cwnd, mtu, now, PACING_GRANULARITY);
assert_eq!(p.enabled(), false);
assert_eq!(p.capacity, 20 * 1500);
let next_sched_time = p.schedule(bytes_to_send, pacing_rate, srtt, cwnd, mtu, now);
assert_eq!(next_sched_time, None);
p.on_sent(bytes_to_send);
assert_eq!(p.capacity, p.tokens);
}
#[test]
fn pacer_schedule_and_send() {
let srtt = Duration::from_millis(1);
let mtu: u64 = 1000; let cwnd: u64 = 10 * mtu;
let enabled: bool = true;
let now = Instant::now();
let bytes_to_send = mtu;
let pacing_rate: u64 = 1000000;
assert_eq!(
Pacer::new(enabled, srtt, cwnd, mtu, now, PACING_GRANULARITY).schedule(
bytes_to_send,
pacing_rate,
Duration::ZERO,
cwnd,
mtu,
now
),
None
);
assert_eq!(
Pacer::new(enabled, srtt, cwnd, mtu, now, PACING_GRANULARITY).schedule(
bytes_to_send,
pacing_rate,
srtt,
0,
mtu,
now
),
None
);
let mut p = Pacer::new(enabled, srtt, cwnd, mtu, now, PACING_GRANULARITY);
assert_eq!(p.capacity, cwnd);
assert_eq!(p.capacity, p.tokens);
assert_eq!(
p.schedule(bytes_to_send, pacing_rate, srtt, 2 * cwnd, mtu, now),
None
);
assert_eq!(p.capacity, 2 * cwnd);
assert_eq!(p.tokens, cwnd);
let mut p = Pacer::new(enabled, srtt, cwnd, mtu, now, PACING_GRANULARITY);
assert_eq!(p.capacity, 10 * mtu);
assert_eq!(p.tokens, 10 * mtu);
let packet_num = p.capacity / mtu;
for _ in 0..packet_num {
assert_eq!(
p.schedule(bytes_to_send, pacing_rate, srtt, cwnd, mtu, now),
None
);
p.on_sent(mtu);
}
assert_eq!(p.tokens, 0);
let time_expected_to_wait = (bytes_to_send) * 1_000_000 / pacing_rate;
assert_eq!(
p.schedule(bytes_to_send, pacing_rate, srtt, cwnd, mtu, now)
.unwrap()
.duration_since(now)
.as_micros() as u64,
time_expected_to_wait
);
let time_to_refill_tokens_for_a_packet =
(bytes_to_send - p.tokens) * 1_000_000 / pacing_rate;
assert_eq!(
p.schedule(
bytes_to_send,
pacing_rate,
srtt,
cwnd,
mtu,
now + Duration::from_micros(time_to_refill_tokens_for_a_packet)
),
None
);
p.on_sent(bytes_to_send);
assert_eq!(p.tokens, 0);
}
}