use std::time::Duration;
use std::time::Instant;
const MIN_RTT_THRESH: Duration = Duration::from_millis(4);
const MAX_RTT_THRESH: Duration = Duration::from_millis(16);
const MIN_RTT_DIVISOR: u32 = 8;
pub const N_RTT_SAMPLE: u32 = 8;
pub const CSS_GROWTH_DIVISOR: u32 = 4;
pub const CSS_ROUNDS: u32 = 5;
const HYSTART_L: u32 = 64;
#[derive(Debug, PartialEq)]
enum HystartPhase {
InStandardSlowStart,
InConservativeSlowStart,
Exited,
}
pub struct HystartPlusPlus {
enabled: bool,
phase: HystartPhase,
last_round_min_rtt: Duration,
current_round_min_rtt: Duration,
rtt_sample_count: u32,
last_sent_pkt_num: u64,
max_acked_packet_num: u64,
window_end: u64,
css_round_count: u32,
css_baseline_min_rtt: Duration,
}
impl std::fmt::Debug for HystartPlusPlus {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Hystart ")?;
write!(f, "Hystart_enabled={:?} ", self.enabled)?;
write!(f, "phase={:?} ", self.phase)?;
write!(
f,
"last_round_min_rtt={:?} ",
self.last_round_min_rtt.as_millis()
)?;
write!(
f,
"current_round_min_rtt={:?} ",
self.current_round_min_rtt.as_millis()
)?;
write!(f, "rtt_sample_count={:?} ", self.rtt_sample_count)?;
write!(f, "last_sent_pkt_num={:?} ", self.last_sent_pkt_num)?;
write!(f, "max_acked_pkt_num={:?} ", self.max_acked_packet_num)?;
write!(f, "window_end={:?} ", self.window_end)?;
write!(f, "css_round_count={:?} ", self.css_round_count)?;
write!(
f,
"css_baseline_min_rtt={:?} ",
self.css_baseline_min_rtt.as_millis()
)?;
Ok(())
}
}
impl HystartPlusPlus {
pub fn new(enabled: bool) -> Self {
Self {
enabled,
phase: HystartPhase::InStandardSlowStart,
last_round_min_rtt: Duration::MAX,
current_round_min_rtt: Duration::MAX,
rtt_sample_count: 0,
last_sent_pkt_num: 0,
max_acked_packet_num: 0,
window_end: 0,
css_round_count: 0,
css_baseline_min_rtt: Duration::MAX,
}
}
pub fn enabled(&self) -> bool {
self.enabled
}
pub fn has_exited(&self) -> bool {
self.phase == HystartPhase::Exited
}
pub fn in_conservative_slow_start(&self) -> bool {
self.phase == HystartPhase::InConservativeSlowStart
}
pub fn in_standard_slow_start(&self) -> bool {
self.phase == HystartPhase::InStandardSlowStart
}
pub fn on_sent(&mut self, pkt_num: u64) {
self.last_sent_pkt_num = pkt_num;
}
pub fn on_ack(&mut self, pkt_num: u64, acked_bytes: u64, rtt: Duration) {
if !self.enabled && !self.has_exited() {
return;
}
self.max_acked_packet_num = pkt_num.max(self.max_acked_packet_num);
self.current_round_min_rtt = self.current_round_min_rtt.min(rtt);
self.rtt_sample_count += 1;
match self.phase {
HystartPhase::InStandardSlowStart => {
if self.rtt_sample_count >= N_RTT_SAMPLE
&& self.current_round_min_rtt != Duration::MAX
&& self.last_round_min_rtt != Duration::MAX
{
let rtt_thresh = (self.last_round_min_rtt / MIN_RTT_DIVISOR)
.clamp(MIN_RTT_THRESH, MAX_RTT_THRESH);
if self.current_round_min_rtt
>= self.last_round_min_rtt.saturating_add(rtt_thresh)
{
self.css_baseline_min_rtt = self.current_round_min_rtt;
self.phase = HystartPhase::InConservativeSlowStart;
}
}
}
HystartPhase::InConservativeSlowStart => {
if self.rtt_sample_count >= N_RTT_SAMPLE
&& self.current_round_min_rtt < self.css_baseline_min_rtt
{
self.css_baseline_min_rtt = Duration::MAX;
self.phase = HystartPhase::InStandardSlowStart;
self.css_round_count = 0;
}
}
_ => (),
};
}
pub fn end_ack(&mut self) {
if !self.enabled() && !self.has_exited() {
return;
}
if self.max_acked_packet_num > self.window_end {
self.window_end = self.last_sent_pkt_num;
self.last_round_min_rtt = self.current_round_min_rtt;
self.current_round_min_rtt = Duration::MAX;
self.rtt_sample_count = 0;
if self.in_conservative_slow_start() {
self.css_round_count += 1;
if self.css_round_count >= CSS_ROUNDS {
self.css_round_count = 0;
self.phase = HystartPhase::Exited;
}
}
}
}
pub fn on_congestion_event(&mut self) {
if self.enabled {
self.window_end = 0;
self.phase = HystartPhase::Exited;
}
}
pub fn cwnd_increment(&self, acked_bytes: u64, max_datagram_size: u64) -> u64 {
match self.phase {
HystartPhase::InStandardSlowStart => {
acked_bytes.min(HYSTART_L as u64 * max_datagram_size)
}
HystartPhase::InConservativeSlowStart => {
(acked_bytes / CSS_GROWTH_DIVISOR as u64).min(HYSTART_L as u64 * max_datagram_size)
}
_ => 0,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Clone, Copy)]
struct AckPacket {
pkt_num: u64,
acked_bytes: u64,
rtt: Duration,
}
#[test]
fn hystart_on_event() {
let mut hspp = HystartPlusPlus::new(true);
assert_eq!(hspp.enabled(), true);
assert_eq!(hspp.in_standard_slow_start(), true);
let max_datagram_size: u64 = 1350; let acked_bytes: u64 = max_datagram_size;
let pkt_num: u64 = 1;
let mut rtt_ms: Vec<Duration> = Vec::new();
let mut ack_packets: Vec<AckPacket> = Vec::new();
let min_round_rtt_in_ms: Vec<u64> = vec![
30, 32, 38, 40, 36, 42, 43, 44, 45, 46, ];
let n_rounds: usize = min_round_rtt_in_ms.len();
let sample_cnt: u64 = N_RTT_SAMPLE as u64 * n_rounds as u64;
assert_eq!(min_round_rtt_in_ms.len(), n_rounds);
for round in 0..n_rounds {
for i in 0..N_RTT_SAMPLE {
rtt_ms.push(
Duration::from_millis(min_round_rtt_in_ms[round])
+ Duration::from_millis(i as u64),
);
}
}
for i in 0..sample_cnt {
ack_packets.push(AckPacket {
pkt_num: i + 1,
acked_bytes: acked_bytes,
rtt: rtt_ms[i as usize],
});
}
let mut round: usize = 0;
for i in round * N_RTT_SAMPLE as usize..(round + 1) * N_RTT_SAMPLE as usize {
println!("sent {}", i);
hspp.on_sent((i + 1) as u64);
}
assert_eq!(hspp.rtt_sample_count, 0);
assert_eq!(
hspp.last_sent_pkt_num,
(round + 1) as u64 * N_RTT_SAMPLE as u64
);
for i in round * N_RTT_SAMPLE as usize..(round + 1) * N_RTT_SAMPLE as usize {
hspp.on_ack(
ack_packets[i as usize].pkt_num,
ack_packets[i as usize].acked_bytes,
ack_packets[i as usize].rtt,
);
println!("{:?}", hspp);
}
assert_eq!(
hspp.max_acked_packet_num,
(round + 1) as u64 * N_RTT_SAMPLE as u64
);
assert_eq!(hspp.last_round_min_rtt, Duration::MAX);
assert_eq!(
hspp.current_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round])
);
assert_eq!(hspp.rtt_sample_count, N_RTT_SAMPLE);
hspp.end_ack();
assert_eq!(hspp.in_standard_slow_start(), true);
assert_eq!(
hspp.last_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round])
);
assert_eq!(hspp.current_round_min_rtt, Duration::MAX);
assert_eq!(hspp.rtt_sample_count, 0);
assert_eq!(hspp.window_end, hspp.last_sent_pkt_num);
round += 1;
for i in round * N_RTT_SAMPLE as usize..(round + 1) * N_RTT_SAMPLE as usize {
println!("sent {}", i);
hspp.on_sent((i + 1) as u64);
}
assert_eq!(hspp.rtt_sample_count, 0);
assert_eq!(
hspp.last_sent_pkt_num,
(round + 1) as u64 * N_RTT_SAMPLE as u64
);
for i in round * N_RTT_SAMPLE as usize..(round + 1) * N_RTT_SAMPLE as usize {
hspp.on_ack(
ack_packets[i as usize].pkt_num,
ack_packets[i as usize].acked_bytes,
ack_packets[i as usize].rtt,
);
println!("{:?}", hspp);
}
assert_eq!(
hspp.last_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round.saturating_sub(1)])
);
assert_eq!(
hspp.current_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round])
);
assert_eq!(hspp.rtt_sample_count, N_RTT_SAMPLE);
hspp.end_ack();
assert_eq!(hspp.in_standard_slow_start(), true);
assert_eq!(
hspp.last_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round])
);
assert_eq!(hspp.current_round_min_rtt, Duration::MAX);
assert_eq!(hspp.rtt_sample_count, 0);
assert_eq!(hspp.window_end, hspp.last_sent_pkt_num);
assert_eq!(
hspp.cwnd_increment(acked_bytes * N_RTT_SAMPLE as u64, max_datagram_size),
acked_bytes * N_RTT_SAMPLE as u64
);
round += 1;
for i in round * N_RTT_SAMPLE as usize..(round + 1) * N_RTT_SAMPLE as usize {
println!("sent {}", i);
hspp.on_sent((i + 1) as u64);
}
assert_eq!(hspp.rtt_sample_count, 0);
assert_eq!(
hspp.last_sent_pkt_num,
(round + 1) as u64 * N_RTT_SAMPLE as u64
);
for i in round * N_RTT_SAMPLE as usize..(round + 1) * N_RTT_SAMPLE as usize {
hspp.on_ack(
ack_packets[i as usize].pkt_num,
ack_packets[i as usize].acked_bytes,
ack_packets[i as usize].rtt,
);
println!("{:?}", hspp);
}
assert_eq!(
hspp.last_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round.saturating_sub(1)])
);
assert_eq!(
hspp.current_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round])
);
assert_eq!(hspp.rtt_sample_count, N_RTT_SAMPLE);
hspp.end_ack();
assert_eq!(hspp.in_conservative_slow_start(), true);
assert_eq!(
hspp.last_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round])
);
assert_eq!(hspp.current_round_min_rtt, Duration::MAX);
assert_eq!(hspp.rtt_sample_count, 0);
assert_eq!(hspp.window_end, hspp.last_sent_pkt_num);
round += 1;
for i in round * N_RTT_SAMPLE as usize..(round + 1) * N_RTT_SAMPLE as usize {
println!("sent {}", i);
hspp.on_sent((i + 1) as u64);
}
assert_eq!(hspp.rtt_sample_count, 0);
assert_eq!(
hspp.last_sent_pkt_num,
(round + 1) as u64 * N_RTT_SAMPLE as u64
);
for i in round * N_RTT_SAMPLE as usize..(round + 1) * N_RTT_SAMPLE as usize {
hspp.on_ack(
ack_packets[i as usize].pkt_num,
ack_packets[i as usize].acked_bytes,
ack_packets[i as usize].rtt,
);
println!("{:?}", hspp);
}
assert_eq!(
hspp.last_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round.saturating_sub(1)])
);
assert_eq!(
hspp.current_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round])
);
assert_eq!(hspp.rtt_sample_count, N_RTT_SAMPLE);
hspp.end_ack();
assert_eq!(hspp.in_conservative_slow_start(), true);
assert_eq!(
hspp.last_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round])
);
assert_eq!(hspp.current_round_min_rtt, Duration::MAX);
assert_eq!(hspp.rtt_sample_count, 0);
assert_eq!(hspp.window_end, hspp.last_sent_pkt_num);
assert_eq!(
hspp.cwnd_increment(acked_bytes * N_RTT_SAMPLE as u64, max_datagram_size),
acked_bytes * N_RTT_SAMPLE as u64 / CSS_GROWTH_DIVISOR as u64
);
round += 1;
for i in round * N_RTT_SAMPLE as usize..(round + 1) * N_RTT_SAMPLE as usize {
println!("sent {}", i);
hspp.on_sent((i + 1) as u64);
}
assert_eq!(hspp.rtt_sample_count, 0);
assert_eq!(
hspp.last_sent_pkt_num,
(round + 1) as u64 * N_RTT_SAMPLE as u64
);
for i in round * N_RTT_SAMPLE as usize..(round + 1) * N_RTT_SAMPLE as usize {
hspp.on_ack(
ack_packets[i as usize].pkt_num,
ack_packets[i as usize].acked_bytes,
ack_packets[i as usize].rtt,
);
println!("{:?}", hspp);
}
assert_eq!(
hspp.last_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round.saturating_sub(1)])
);
assert_eq!(
hspp.current_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round])
);
assert_eq!(hspp.rtt_sample_count, N_RTT_SAMPLE);
hspp.end_ack();
assert_eq!(hspp.in_standard_slow_start(), true);
assert_eq!(
hspp.last_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round])
);
assert_eq!(hspp.current_round_min_rtt, Duration::MAX);
assert_eq!(hspp.rtt_sample_count, 0);
assert_eq!(hspp.window_end, hspp.last_sent_pkt_num);
round += 1;
for i in round * N_RTT_SAMPLE as usize..(round + 1) * N_RTT_SAMPLE as usize {
println!("sent {}", i);
hspp.on_sent((i + 1) as u64);
}
assert_eq!(hspp.rtt_sample_count, 0);
assert_eq!(
hspp.last_sent_pkt_num,
(round + 1) as u64 * N_RTT_SAMPLE as u64
);
for i in round * N_RTT_SAMPLE as usize..(round + 1) * N_RTT_SAMPLE as usize {
hspp.on_ack(
ack_packets[i as usize].pkt_num,
ack_packets[i as usize].acked_bytes,
ack_packets[i as usize].rtt,
);
println!("{:?}", hspp);
}
assert_eq!(
hspp.last_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round.saturating_sub(1)])
);
assert_eq!(
hspp.current_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round])
);
assert_eq!(hspp.rtt_sample_count, N_RTT_SAMPLE);
hspp.end_ack();
assert_eq!(hspp.in_conservative_slow_start(), true);
assert_eq!(hspp.css_round_count, 1);
assert_eq!(
hspp.last_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round])
);
assert_eq!(hspp.current_round_min_rtt, Duration::MAX);
assert_eq!(hspp.rtt_sample_count, 0);
assert_eq!(hspp.window_end, hspp.last_sent_pkt_num);
let loop_start = round;
for idx in loop_start + 1..loop_start + CSS_ROUNDS as usize - 1 {
round = idx;
println!("==> round = {}", round);
for i in round * N_RTT_SAMPLE as usize..(round + 1) * N_RTT_SAMPLE as usize {
println!("sent {}", i);
hspp.on_sent((i + 1) as u64);
}
assert_eq!(hspp.rtt_sample_count, 0);
assert_eq!(
hspp.last_sent_pkt_num,
(round + 1) as u64 * N_RTT_SAMPLE as u64
);
for i in round * N_RTT_SAMPLE as usize..(round + 1) * N_RTT_SAMPLE as usize {
hspp.on_ack(
ack_packets[i as usize].pkt_num,
ack_packets[i as usize].acked_bytes,
ack_packets[i as usize].rtt,
);
println!("{:?}", hspp);
}
assert_eq!(
hspp.last_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round.saturating_sub(1)])
);
assert_eq!(
hspp.current_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round])
);
assert_eq!(hspp.rtt_sample_count, N_RTT_SAMPLE);
hspp.end_ack();
assert_eq!(hspp.in_conservative_slow_start(), true);
assert_eq!(
hspp.last_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round])
);
assert_eq!(hspp.current_round_min_rtt, Duration::MAX);
assert_eq!(hspp.rtt_sample_count, 0);
assert_eq!(hspp.window_end, hspp.last_sent_pkt_num);
}
assert_eq!(hspp.css_round_count, CSS_ROUNDS - 1);
round = loop_start + CSS_ROUNDS as usize - 1;
println!("==> round = {}", round);
for i in round * N_RTT_SAMPLE as usize..(round + 1) * N_RTT_SAMPLE as usize {
println!("sent {}", i);
hspp.on_sent((i + 1) as u64);
}
assert_eq!(hspp.rtt_sample_count, 0);
assert_eq!(
hspp.last_sent_pkt_num,
(round + 1) as u64 * N_RTT_SAMPLE as u64
);
for i in round * N_RTT_SAMPLE as usize..(round + 1) * N_RTT_SAMPLE as usize {
hspp.on_ack(
ack_packets[i as usize].pkt_num,
ack_packets[i as usize].acked_bytes,
ack_packets[i as usize].rtt,
);
println!("{:?}", hspp);
}
assert_eq!(
hspp.last_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round.saturating_sub(1)])
);
assert_eq!(
hspp.current_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round])
);
assert_eq!(hspp.rtt_sample_count, N_RTT_SAMPLE);
hspp.end_ack();
assert_eq!(hspp.has_exited(), true);
assert_eq!(
hspp.last_round_min_rtt,
Duration::from_millis(min_round_rtt_in_ms[round])
);
assert_eq!(hspp.current_round_min_rtt, Duration::MAX);
assert_eq!(hspp.rtt_sample_count, 0);
assert_eq!(hspp.window_end, hspp.last_sent_pkt_num);
assert_eq!(hspp.cwnd_increment(acked_bytes, max_datagram_size), 0);
}
}