use super::PacketId;
use super::bbr::DeliveryRateToken;
#[cfg(test)]
use crate::simulation::RealTime;
use crate::simulation::TimeSource;
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::Duration;
type PendingReceiptEntry = (Box<[u8]>, u64, Option<DeliveryRateToken>);
type AckProcessingResult = (
Vec<(Option<Duration>, usize, Option<DeliveryRateToken>)>,
Option<f64>,
);
const NETWORK_DELAY_ALLOWANCE: Duration = Duration::from_millis(500);
#[cfg(test)]
pub(crate) const MAX_CONFIRMATION_DELAY: Duration = Duration::from_millis(100);
#[cfg(not(test))]
const MAX_CONFIRMATION_DELAY: Duration = Duration::from_millis(100);
pub(super) const MESSAGE_CONFIRMATION_TIMEOUT: Duration = {
let millis: u128 = MAX_CONFIRMATION_DELAY.as_millis() + NETWORK_DELAY_ALLOWANCE.as_millis();
if millis > u64::MAX as u128 {
panic!("Value too large for u64");
}
Duration::from_millis(millis as u64)
};
const MAX_RTO_BACKOFF: u32 = 512;
const MIN_RTO: Duration = Duration::from_millis(500);
const MIN_TLP_TIMEOUT: Duration = Duration::from_millis(10);
const PACKET_LOSS_DECAY_FACTOR: f64 = 1.0 / 1000.0;
pub(super) struct SentPacketTracker<T: TimeSource> {
pending_receipts: HashMap<PacketId, PendingReceiptEntry>,
resend_queue: VecDeque<ResendQueueEntry>,
packet_loss_proportion: f64,
pub(super) time_source: T,
srtt: Option<Duration>,
rttvar: Duration,
min_rtt: Duration,
rto: Duration,
retransmitted_packets: HashSet<PacketId>,
total_packets_sent: usize,
rto_backoff: u32,
tlp_sent_packets: HashSet<PacketId>,
}
impl<T: TimeSource> SentPacketTracker<T> {
pub(super) fn new_with_time_source(time_source: T) -> Self {
SentPacketTracker {
pending_receipts: HashMap::new(),
resend_queue: VecDeque::new(),
packet_loss_proportion: 0.0,
time_source,
srtt: None,
rttvar: Duration::from_secs(0),
min_rtt: Duration::from_millis(100), rto: Duration::from_secs(1),
retransmitted_packets: HashSet::new(),
total_packets_sent: 0,
rto_backoff: 1, tlp_sent_packets: HashSet::new(),
}
}
fn probe_timeout(&self) -> Option<Duration> {
self.srtt.map(|srtt| {
let pto = srtt.saturating_mul(2);
pto.max(MIN_TLP_TIMEOUT)
})
}
}
impl<T: TimeSource> SentPacketTracker<T> {
pub(super) fn report_sent_packet(&mut self, packet_id: PacketId, payload: Box<[u8]>) {
self.report_sent_packet_with_token(packet_id, payload, None);
}
pub(super) fn report_sent_packet_with_token(
&mut self,
packet_id: PacketId,
payload: Box<[u8]>,
token: Option<DeliveryRateToken>,
) {
let sent_time_nanos = self.time_source.now_nanos();
self.pending_receipts
.insert(packet_id, (payload, sent_time_nanos, token));
self.resend_queue.push_back(ResendQueueEntry { packet_id });
self.total_packets_sent += 1;
}
pub(super) fn report_received_receipts(
&mut self,
packet_ids: &[PacketId],
) -> AckProcessingResult {
let now_nanos = self.time_source.now_nanos();
let mut ack_info = Vec::new();
for packet_id in packet_ids {
self.packet_loss_proportion = self.packet_loss_proportion
* (1.0 - PACKET_LOSS_DECAY_FACTOR)
+ (PACKET_LOSS_DECAY_FACTOR * 0.0);
let is_retransmitted = self.retransmitted_packets.contains(packet_id);
if let Some((payload, sent_time_nanos, token)) = self.pending_receipts.get(packet_id) {
let packet_size = payload.len();
let token = *token;
if is_retransmitted {
ack_info.push((None, packet_size, token));
self.rto_backoff = 1;
} else {
let rtt_nanos = now_nanos.saturating_sub(*sent_time_nanos);
let rtt_sample = Duration::from_nanos(rtt_nanos);
ack_info.push((Some(rtt_sample), packet_size, token));
self.update_rtt(rtt_sample);
self.rto_backoff = 1;
}
}
self.pending_receipts.remove(packet_id);
self.retransmitted_packets.remove(packet_id);
self.tlp_sent_packets.remove(packet_id);
}
let loss_rate = if self.total_packets_sent > 0 {
Some(self.packet_loss_proportion)
} else {
None
};
(ack_info, loss_rate)
}
fn update_rtt(&mut self, sample: Duration) {
const ALPHA: f64 = 1.0 / 8.0; const BETA: f64 = 1.0 / 4.0; const K: u32 = 4; const G: Duration = Duration::from_millis(10);
match self.srtt {
None => {
self.srtt = Some(sample);
self.rttvar = sample / 2;
self.min_rtt = sample;
}
Some(srtt) => {
let abs_diff = sample.abs_diff(srtt);
self.rttvar = self.rttvar.mul_f64(1.0 - BETA) + abs_diff.mul_f64(BETA);
self.srtt = Some(srtt.mul_f64(1.0 - ALPHA) + sample.mul_f64(ALPHA));
self.min_rtt = self.min_rtt.min(sample);
}
}
let rto_variance = self.rttvar * K;
let max_variance = if rto_variance > G { rto_variance } else { G };
self.rto = self.srtt.unwrap_or(Duration::from_secs(1)) + max_variance;
if self.rto < MIN_RTO {
self.rto = MIN_RTO;
} else if self.rto > Duration::from_secs(60) {
self.rto = Duration::from_secs(60);
}
}
pub(super) fn mark_retransmitted(&mut self, packet_id: PacketId) {
self.retransmitted_packets.insert(packet_id);
}
#[allow(dead_code)] pub(super) fn smoothed_rtt(&self) -> Option<Duration> {
self.srtt
}
pub(super) fn min_rtt(&self) -> Duration {
self.min_rtt
}
#[allow(dead_code)] pub(super) fn rto(&self) -> Duration {
self.rto
}
pub(super) fn effective_rto(&self) -> Duration {
let backed_off = self.rto.saturating_mul(self.rto_backoff);
backed_off.min(Duration::from_secs(60))
}
pub(super) fn on_timeout(&mut self) {
self.rto_backoff = (self.rto_backoff.saturating_mul(2)).min(MAX_RTO_BACKOFF);
tracing::debug!(
rto_backoff = self.rto_backoff,
effective_rto_ms = self.effective_rto().as_millis(),
"RTO backoff increased on timeout"
);
}
#[allow(dead_code)]
pub(super) fn rto_backoff(&self) -> u32 {
self.rto_backoff
}
pub(super) fn get_resend(&mut self) -> ResendAction {
let now_nanos = self.time_source.now_nanos();
let pto = self.probe_timeout();
let effective_rto_nanos = self.effective_rto().as_nanos() as u64;
while let Some(entry) = self.resend_queue.pop_front() {
if !self.pending_receipts.contains_key(&entry.packet_id) {
continue;
}
let sent_time_nanos = self
.pending_receipts
.get(&entry.packet_id)
.map(|(_, ts, _)| *ts)
.unwrap_or(0);
let rto_deadline = sent_time_nanos + effective_rto_nanos;
let tlp_deadline = if let Some(pto) = pto {
if !self.tlp_sent_packets.contains(&entry.packet_id) {
Some(sent_time_nanos + pto.as_nanos() as u64)
} else {
None
}
} else {
None
};
if let Some(tlp_at) = tlp_deadline {
if now_nanos >= tlp_at && now_nanos < rto_deadline {
if let Some((packet, _, _)) = self.pending_receipts.get(&entry.packet_id) {
let packet_clone = packet.clone();
let packet_id = entry.packet_id;
self.tlp_sent_packets.insert(packet_id);
self.resend_queue.push_front(entry);
self.mark_retransmitted(packet_id);
return ResendAction::TlpProbe(packet_id, packet_clone);
}
}
}
if now_nanos >= rto_deadline {
if let Some((packet, _sent_time_nanos, _token)) =
self.pending_receipts.remove(&entry.packet_id)
{
self.packet_loss_proportion = self.packet_loss_proportion
* (1.0 - PACKET_LOSS_DECAY_FACTOR)
+ PACKET_LOSS_DECAY_FACTOR;
self.mark_retransmitted(entry.packet_id);
self.on_timeout();
self.tlp_sent_packets.remove(&entry.packet_id);
return ResendAction::Resend(entry.packet_id, packet);
}
}
let next_deadline = if let Some(tlp_at) = tlp_deadline {
tlp_at.min(rto_deadline)
} else {
rto_deadline
};
self.resend_queue.push_front(entry);
return ResendAction::WaitUntil(next_deadline);
}
let deadline_nanos = self.time_source.now_nanos() + self.effective_rto().as_nanos() as u64;
ResendAction::WaitUntil(deadline_nanos)
}
}
#[derive(Debug, PartialEq)]
pub enum ResendAction {
WaitUntil(u64),
Resend(u32, Box<[u8]>),
TlpProbe(u32, Box<[u8]>),
}
struct ResendQueueEntry {
packet_id: u32,
}
#[cfg(test)]
impl SentPacketTracker<RealTime> {
pub(super) fn new() -> Self {
Self::new_with_time_source(RealTime::new())
}
}
#[cfg(test)]
pub(in crate::transport) mod tests {
use super::*;
use crate::simulation::VirtualTime;
use rstest::rstest;
pub(in crate::transport) fn mock_sent_packet_tracker() -> SentPacketTracker<VirtualTime> {
let time_source = VirtualTime::new();
SentPacketTracker::new_with_time_source(time_source)
}
#[rstest]
#[case::single_packet(1, vec![1, 2, 3], 1, 1, 1)]
#[case::multiple_packets_first(1, vec![1], 1, 1, 1)]
fn test_report_sent_packet(
#[case] packet_id: PacketId,
#[case] payload: Vec<u8>,
#[case] expected_pending: usize,
#[case] expected_queue: usize,
#[case] expected_total_sent: usize,
) {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(packet_id, payload.into());
assert_eq!(tracker.pending_receipts.len(), expected_pending);
assert_eq!(tracker.resend_queue.len(), expected_queue);
assert_eq!(tracker.packet_loss_proportion, 0.0);
assert_eq!(tracker.total_packets_sent, expected_total_sent);
}
#[test]
fn test_report_received_receipts() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1, 2, 3].into());
let (ack_info, loss_rate) = tracker.report_received_receipts(&[1]);
assert_eq!(tracker.pending_receipts.len(), 0);
assert!(tracker.resend_queue.len() <= 1);
assert_eq!(tracker.packet_loss_proportion, 0.0);
assert_eq!(ack_info.len(), 1); assert!(ack_info[0].0.is_some()); assert_eq!(ack_info[0].1, 3); assert!(loss_rate.is_some()); }
#[test]
fn test_packet_lost() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1, 2, 3].into());
tracker.time_source.advance(tracker.effective_rto());
let resend_action = tracker.get_resend();
assert_eq!(resend_action, ResendAction::Resend(1, vec![1, 2, 3].into()));
assert_eq!(tracker.pending_receipts.len(), 0);
assert_eq!(tracker.resend_queue.len(), 0);
assert_eq!(tracker.packet_loss_proportion, PACKET_LOSS_DECAY_FACTOR);
}
#[test]
fn test_immediate_receipt_then_resend() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1, 2, 3].into());
tracker.report_sent_packet(2, vec![4, 5, 6].into());
let _ = tracker.report_received_receipts(&[1]);
tracker.time_source.advance(Duration::from_millis(9));
match tracker.get_resend() {
ResendAction::WaitUntil(_) => (),
ResendAction::Resend(..) | ResendAction::TlpProbe(..) => {
panic!("Expected WaitUntil, got Resend/TlpProbe too early")
}
}
tracker.time_source.advance(Duration::from_millis(2));
match tracker.get_resend() {
ResendAction::TlpProbe(packet_id, _) => {
assert_eq!(packet_id, 2)
}
ResendAction::Resend(_, _) => panic!("Expected TlpProbe, got Resend"),
ResendAction::WaitUntil(_) => panic!("Expected TlpProbe, got WaitUntil"),
}
}
#[test]
fn test_get_resend_with_pending_receipts() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(0, Box::from(&[][..]));
tracker.time_source.advance(Duration::from_millis(10));
tracker.report_sent_packet(1, Box::from(&[][..]));
let _ = tracker.report_received_receipts(&[0]);
match tracker.get_resend() {
ResendAction::WaitUntil(wait_until_nanos) => {
let now_nanos = tracker.time_source.now_nanos();
assert!(
wait_until_nanos >= now_nanos,
"Wait deadline should be in the future"
);
}
ResendAction::Resend(..) | ResendAction::TlpProbe(..) => {
panic!("Expected ResendAction::WaitUntil")
}
}
}
#[rstest]
#[case::fast_rtt(50, 50, 25, 50)]
#[case::slow_rtt(100, 100, 50, 100)]
#[case::very_fast_rtt(10, 10, 5, 10)]
fn test_rtt_estimation_first_sample(
#[case] delay_ms: u64,
#[case] expected_srtt_ms: u64,
#[case] expected_rttvar_ms: u64,
#[case] expected_min_rtt_ms: u64,
) {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1, 2, 3].into());
tracker.time_source.advance(Duration::from_millis(delay_ms));
let (ack_info, _) = tracker.report_received_receipts(&[1]);
assert_eq!(ack_info.len(), 1);
assert_eq!(ack_info[0].0, Some(Duration::from_millis(delay_ms)));
assert_eq!(ack_info[0].1, 3); assert_eq!(
tracker.smoothed_rtt(),
Some(Duration::from_millis(expected_srtt_ms))
);
assert_eq!(tracker.rttvar, Duration::from_millis(expected_rttvar_ms)); assert_eq!(
tracker.min_rtt(),
Duration::from_millis(expected_min_rtt_ms)
);
}
#[test]
fn test_rtt_estimation_exponential_smoothing() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(100));
tracker.report_received_receipts(&[1]);
tracker.report_sent_packet(2, vec![2].into());
tracker.time_source.advance(Duration::from_millis(200));
let (ack_info, _) = tracker.report_received_receipts(&[2]);
assert_eq!(ack_info[0].0, Some(Duration::from_millis(200))); let srtt = tracker.smoothed_rtt().unwrap();
assert!((srtt.as_millis() as i64 - 112).abs() <= 1); }
#[test]
fn test_rtt_excludes_retransmissions() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.mark_retransmitted(1);
tracker.time_source.advance(Duration::from_millis(50));
let (ack_info, _) = tracker.report_received_receipts(&[1]);
assert_eq!(ack_info.len(), 1);
assert_eq!(ack_info[0].0, None); assert_eq!(ack_info[0].1, 1); assert_eq!(tracker.smoothed_rtt(), None); }
#[test]
fn test_rto_clamping() {
let mut tracker = mock_sent_packet_tracker();
assert_eq!(tracker.rto(), Duration::from_secs(1));
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(10)); tracker.report_received_receipts(&[1]);
assert!(tracker.rto() >= Duration::from_millis(500));
assert!(tracker.rto() <= Duration::from_secs(60));
}
#[rstest]
#[case::three_samples(&[100, 50, 150], 50)]
#[case::descending(&[200, 100, 50], 50)]
#[case::ascending(&[50, 100, 150], 50)]
#[case::single_sample(&[75], 75)]
fn test_min_rtt_tracking(#[case] rtt_samples: &[u64], #[case] expected_min_rtt_ms: u64) {
let mut tracker = mock_sent_packet_tracker();
for (i, &rtt_ms) in rtt_samples.iter().enumerate() {
let packet_id = (i + 1) as PacketId;
tracker.report_sent_packet(packet_id, vec![packet_id as u8].into());
tracker.time_source.advance(Duration::from_millis(rtt_ms));
tracker.report_received_receipts(&[packet_id]);
}
assert_eq!(
tracker.min_rtt(),
Duration::from_millis(expected_min_rtt_ms)
);
}
#[test]
fn test_retransmitted_packet_marked_on_timeout() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1, 2, 3].into());
tracker.time_source.advance(tracker.effective_rto());
match tracker.get_resend() {
ResendAction::Resend(packet_id, _) | ResendAction::TlpProbe(packet_id, _) => {
assert_eq!(packet_id, 1);
assert!(tracker.retransmitted_packets.contains(&1));
}
_ => panic!("Expected Resend or TlpProbe action"),
}
}
#[rstest]
#[case::first_timeout(1, 2, 2)]
#[case::second_timeout(2, 4, 4)]
#[case::third_timeout(3, 8, 8)]
#[case::fourth_timeout(4, 16, 16)]
fn test_rto_backoff_doubles_on_timeout(
#[case] timeout_count: u32,
#[case] expected_backoff: u32,
#[case] expected_rto_secs: u64,
) {
let mut tracker = mock_sent_packet_tracker();
assert_eq!(tracker.rto_backoff(), 1);
assert_eq!(tracker.effective_rto(), Duration::from_secs(1));
for _ in 0..timeout_count {
tracker.on_timeout();
}
assert_eq!(tracker.rto_backoff(), expected_backoff);
assert_eq!(
tracker.effective_rto(),
Duration::from_secs(expected_rto_secs)
);
}
#[test]
fn test_rto_backoff_capped_at_60_seconds() {
let mut tracker = mock_sent_packet_tracker();
for _ in 0..10 {
tracker.on_timeout();
}
assert_eq!(tracker.effective_rto(), Duration::from_secs(60));
assert_eq!(tracker.rto_backoff(), MAX_RTO_BACKOFF);
}
#[rstest]
#[case::non_retransmitted(false)]
#[case::retransmitted(true)]
fn test_rto_backoff_resets_on_ack(#[case] mark_retransmitted: bool) {
let mut tracker = mock_sent_packet_tracker();
tracker.on_timeout();
tracker.on_timeout();
assert_eq!(tracker.rto_backoff(), 4);
tracker.report_sent_packet(1, vec![1, 2, 3].into());
if mark_retransmitted {
tracker.mark_retransmitted(1);
}
tracker.time_source.advance(Duration::from_millis(50));
tracker.report_received_receipts(&[1]);
assert_eq!(tracker.rto_backoff(), 1);
if mark_retransmitted {
assert_eq!(tracker.effective_rto(), Duration::from_secs(1));
} else {
assert_eq!(tracker.effective_rto(), Duration::from_millis(500));
}
}
#[test]
fn test_death_spiral_recovery() {
let mut tracker = mock_sent_packet_tracker();
for _ in 0..6 {
tracker.on_timeout();
}
assert_eq!(tracker.rto_backoff(), 64);
tracker.report_sent_packet(1, vec![1, 2, 3].into());
tracker.mark_retransmitted(1);
tracker.time_source.advance(Duration::from_millis(50));
tracker.report_received_receipts(&[1]);
assert_eq!(tracker.rto_backoff(), 1);
}
#[test]
fn test_mixed_ack_batch_resets_on_fresh_packet() {
let mut tracker = mock_sent_packet_tracker();
for _ in 0..4 {
tracker.on_timeout();
}
assert_eq!(tracker.rto_backoff(), 16);
tracker.report_sent_packet(1, vec![1].into());
tracker.report_sent_packet(2, vec![2].into());
tracker.report_sent_packet(3, vec![3].into());
tracker.mark_retransmitted(1);
tracker.mark_retransmitted(2);
tracker.time_source.advance(Duration::from_millis(50));
tracker.report_received_receipts(&[1, 2, 3]);
assert_eq!(tracker.rto_backoff(), 1);
}
#[test]
fn test_backoff_re_elevation_after_recovery() {
let mut tracker = mock_sent_packet_tracker();
for _ in 0..4 {
tracker.on_timeout();
}
assert_eq!(tracker.rto_backoff(), 16);
tracker.report_sent_packet(1, vec![1].into());
tracker.mark_retransmitted(1);
tracker.time_source.advance(Duration::from_millis(50));
tracker.report_received_receipts(&[1]);
assert_eq!(
tracker.rto_backoff(),
1,
"Single retransmit ACK should fully recover"
);
tracker.on_timeout();
assert_eq!(
tracker.rto_backoff(),
2,
"Timeout after recovery should work normally"
);
tracker.on_timeout();
assert_eq!(
tracker.rto_backoff(),
4,
"Subsequent timeout should continue doubling"
);
tracker.report_sent_packet(10, vec![10].into());
tracker.time_source.advance(Duration::from_millis(50));
tracker.report_received_receipts(&[10]);
assert_eq!(tracker.rto_backoff(), 1, "Fresh ACK should reset to 1");
}
#[test]
fn test_get_resend_triggers_backoff() {
let mut tracker = mock_sent_packet_tracker();
assert_eq!(tracker.rto_backoff(), 1);
let initial_rto = tracker.effective_rto();
tracker.report_sent_packet(1, vec![1, 2, 3].into());
tracker.time_source.advance(initial_rto);
match tracker.get_resend() {
ResendAction::Resend(packet_id, _) => {
assert_eq!(packet_id, 1);
}
ResendAction::TlpProbe(_, _) => panic!("TLP shouldn't fire without RTT samples"),
_ => panic!("Expected Resend action"),
}
assert_eq!(tracker.rto_backoff(), 2);
}
#[test]
fn test_consecutive_resends_increase_backoff() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1, 2, 3].into());
assert_eq!(tracker.rto_backoff(), 1);
tracker.time_source.advance(Duration::from_secs(1));
match tracker.get_resend() {
ResendAction::Resend(_, payload) => {
tracker.report_sent_packet(1, payload);
}
ResendAction::TlpProbe(_, _) => panic!("TLP shouldn't fire without RTT samples"),
_ => panic!("Expected Resend"),
}
assert_eq!(tracker.rto_backoff(), 2);
assert_eq!(tracker.effective_rto(), Duration::from_secs(2));
tracker.time_source.advance(Duration::from_secs(2));
match tracker.get_resend() {
ResendAction::Resend(_, payload) => {
tracker.report_sent_packet(1, payload);
}
ResendAction::TlpProbe(_, _) => panic!("TLP shouldn't fire without RTT samples"),
_ => panic!("Expected Resend"),
}
assert_eq!(tracker.rto_backoff(), 4);
assert_eq!(tracker.effective_rto(), Duration::from_secs(4));
tracker.time_source.advance(Duration::from_secs(4));
match tracker.get_resend() {
ResendAction::Resend(_, _) => {}
ResendAction::TlpProbe(_, _) => panic!("TLP shouldn't fire without RTT samples"),
_ => panic!("Expected Resend"),
}
assert_eq!(tracker.rto_backoff(), 8);
assert_eq!(tracker.effective_rto(), Duration::from_secs(8));
}
const EXPECTED_MIN_RTO: Duration = Duration::from_millis(500);
#[test]
fn test_min_rto_is_500ms() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(10));
tracker.report_received_receipts(&[1]);
assert_eq!(
tracker.rto(),
EXPECTED_MIN_RTO,
"RTO should be clamped to 500ms minimum for fast networks"
);
}
#[test]
fn test_min_rto_allows_higher_values() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(400));
tracker.report_received_receipts(&[1]);
assert!(
tracker.rto() > EXPECTED_MIN_RTO,
"RTO should not be clamped when naturally above minimum"
);
}
#[test]
fn test_initial_rto_before_samples() {
let tracker = mock_sent_packet_tracker();
assert_eq!(
tracker.rto(),
Duration::from_secs(1),
"Initial RTO before any samples should be 1s"
);
}
#[test]
fn test_effective_rto_with_backoff_respects_min() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(10));
tracker.report_received_receipts(&[1]);
assert_eq!(tracker.rto(), EXPECTED_MIN_RTO);
assert_eq!(tracker.effective_rto(), EXPECTED_MIN_RTO);
tracker.on_timeout();
assert_eq!(tracker.effective_rto(), Duration::from_millis(1000));
tracker.on_timeout();
assert_eq!(tracker.effective_rto(), Duration::from_millis(2000));
}
#[test]
fn test_fast_loss_detection_with_500ms_rto() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(20));
tracker.report_received_receipts(&[1]);
tracker.report_sent_packet(2, vec![2].into());
tracker.time_source.advance(Duration::from_millis(39));
match tracker.get_resend() {
ResendAction::WaitUntil(_) => {} ResendAction::Resend(..) | ResendAction::TlpProbe(..) => {
panic!("Should not fire before TLP timeout (40ms)")
}
}
tracker.time_source.advance(Duration::from_millis(2));
match tracker.get_resend() {
ResendAction::TlpProbe(id, _) => assert_eq!(id, 2, "TLP should probe packet 2"),
ResendAction::Resend(id, _) => {
assert_eq!(id, 2, "Or Resend if TLP not yet implemented")
}
ResendAction::WaitUntil(_) => panic!("Should have triggered TLP after 40ms"),
}
}
#[test]
fn test_timeout_intervals_actually_increase() {
let mut tracker = mock_sent_packet_tracker();
assert_eq!(tracker.effective_rto(), Duration::from_secs(1));
tracker.report_sent_packet(1, vec![1, 2, 3].into());
tracker.time_source.advance(Duration::from_millis(999));
match tracker.get_resend() {
ResendAction::WaitUntil(_) => {} ResendAction::TlpProbe(_, _) => panic!("TLP shouldn't fire without RTT samples"),
ResendAction::Resend(_, _) => panic!("Should not resend before RTO expires"),
}
tracker.time_source.advance(Duration::from_millis(1));
let payload = match tracker.get_resend() {
ResendAction::Resend(id, payload) => {
assert_eq!(id, 1);
payload
}
ResendAction::TlpProbe(_, _) => panic!("TLP shouldn't fire without RTT samples"),
_ => panic!("Expected Resend after 1s"),
};
assert_eq!(tracker.rto_backoff(), 2);
tracker.report_sent_packet(1, payload);
tracker.time_source.advance(Duration::from_millis(1999));
match tracker.get_resend() {
ResendAction::WaitUntil(_) => {} ResendAction::TlpProbe(_, _) => panic!("TLP shouldn't fire without RTT samples"),
ResendAction::Resend(_, _) => panic!("Should not resend before backed-off RTO (2s)"),
}
tracker.time_source.advance(Duration::from_millis(1));
let payload = match tracker.get_resend() {
ResendAction::Resend(id, payload) => {
assert_eq!(id, 1);
payload
}
ResendAction::TlpProbe(_, _) => panic!("TLP shouldn't fire without RTT samples"),
_ => panic!("Expected Resend after 2s"),
};
assert_eq!(tracker.rto_backoff(), 4);
tracker.report_sent_packet(1, payload);
tracker.time_source.advance(Duration::from_millis(3999));
match tracker.get_resend() {
ResendAction::WaitUntil(_) => {} ResendAction::TlpProbe(_, _) => panic!("TLP shouldn't fire without RTT samples"),
ResendAction::Resend(_, _) => panic!("Should not resend before backed-off RTO (4s)"),
}
tracker.time_source.advance(Duration::from_millis(1));
match tracker.get_resend() {
ResendAction::Resend(id, _) => assert_eq!(id, 1),
ResendAction::TlpProbe(_, _) => panic!("TLP shouldn't fire without RTT samples"),
_ => panic!("Expected Resend after 4s"),
}
assert_eq!(tracker.rto_backoff(), 8);
}
#[test]
fn test_tlp_fires_before_rto() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(50));
tracker.report_received_receipts(&[1]);
tracker.report_sent_packet(2, vec![2].into());
tracker.time_source.advance(Duration::from_millis(99));
match tracker.get_resend() {
ResendAction::WaitUntil(_) => {} ResendAction::Resend(..) | ResendAction::TlpProbe(..) => {
panic!("Should not fire before TLP timeout")
}
}
tracker.time_source.advance(Duration::from_millis(2));
match tracker.get_resend() {
ResendAction::TlpProbe(id, _) => assert_eq!(id, 2, "TLP should probe packet 2"),
ResendAction::Resend(_, _) => panic!("Should be TLP probe, not full RTO resend"),
ResendAction::WaitUntil(_) => panic!("TLP should have fired at 2*SRTT"),
}
}
#[test]
fn test_tlp_does_not_apply_backoff() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(50));
tracker.report_received_receipts(&[1]);
assert_eq!(tracker.rto_backoff(), 1);
tracker.report_sent_packet(2, vec![2].into());
tracker.time_source.advance(Duration::from_millis(101));
match tracker.get_resend() {
ResendAction::TlpProbe(_, _) => {}
ResendAction::WaitUntil(_) | ResendAction::Resend(..) => panic!("Expected TLP probe"),
}
assert_eq!(tracker.rto_backoff(), 1, "TLP should not increase backoff");
}
#[test]
fn test_tlp_followed_by_rto_if_no_ack() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(50));
tracker.report_received_receipts(&[1]);
tracker.report_sent_packet(2, vec![2].into());
tracker.time_source.advance(Duration::from_millis(101));
let payload = match tracker.get_resend() {
ResendAction::TlpProbe(id, payload) => {
assert_eq!(id, 2);
payload
}
ResendAction::WaitUntil(_) | ResendAction::Resend(..) => panic!("Expected TLP probe"),
};
tracker.report_sent_packet(2, payload);
tracker.time_source.advance(Duration::from_millis(499));
match tracker.get_resend() {
ResendAction::WaitUntil(_) => {} ResendAction::Resend(..) | ResendAction::TlpProbe(..) => {
panic!("Should still be waiting for RTO")
}
}
tracker.time_source.advance(Duration::from_millis(2));
match tracker.get_resend() {
ResendAction::Resend(id, _) => assert_eq!(id, 2, "RTO should fire after TLP failed"),
ResendAction::TlpProbe(_, _) => panic!("Should be RTO, not another TLP"),
_ => panic!("RTO should have fired"),
}
assert_eq!(tracker.rto_backoff(), 2, "RTO should increase backoff");
}
#[test]
fn test_tlp_cancelled_by_ack() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(50));
tracker.report_received_receipts(&[1]);
tracker.report_sent_packet(2, vec![2].into());
tracker.time_source.advance(Duration::from_millis(60));
tracker.report_received_receipts(&[2]);
match tracker.get_resend() {
ResendAction::WaitUntil(_) => {} ResendAction::TlpProbe(_, _) => panic!("TLP should be cancelled by ACK"),
ResendAction::Resend(_, _) => panic!("No resend needed, packet was ACKed"),
}
}
#[test]
fn test_tlp_minimum_timeout() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(2));
tracker.report_received_receipts(&[1]);
tracker.report_sent_packet(2, vec![2].into());
tracker.time_source.advance(Duration::from_millis(9));
match tracker.get_resend() {
ResendAction::WaitUntil(_) => {}
ResendAction::Resend(..) | ResendAction::TlpProbe(..) => {
panic!("Should not fire before minimum TLP timeout")
}
}
tracker.time_source.advance(Duration::from_millis(2));
match tracker.get_resend() {
ResendAction::TlpProbe(id, _) => assert_eq!(id, 2),
ResendAction::WaitUntil(_) | ResendAction::Resend(..) => {
panic!("TLP should fire at minimum 10ms")
}
}
}
#[test]
fn test_tlp_disabled_before_rtt_samples() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(500));
match tracker.get_resend() {
ResendAction::WaitUntil(_) => {}
ResendAction::TlpProbe(_, _) => panic!("TLP should not fire without RTT samples"),
ResendAction::Resend(_, _) => panic!("RTO is 1s, should not fire at 500ms"),
}
tracker.time_source.advance(Duration::from_millis(501));
match tracker.get_resend() {
ResendAction::Resend(id, _) => assert_eq!(id, 1),
ResendAction::TlpProbe(_, _) => panic!("Should be RTO, not TLP"),
_ => panic!("RTO should fire at 1s"),
}
}
#[test]
fn test_tlp_only_once_per_flight() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(50));
tracker.report_received_receipts(&[1]);
tracker.report_sent_packet(2, vec![2].into());
tracker.report_sent_packet(3, vec![3].into());
tracker.report_sent_packet(4, vec![4].into());
tracker.time_source.advance(Duration::from_millis(101));
match tracker.get_resend() {
ResendAction::TlpProbe(id, _) => {
assert_eq!(id, 2, "TLP should probe oldest unacked packet");
}
ResendAction::WaitUntil(_) | ResendAction::Resend(..) => panic!("Expected TLP probe"),
}
}
}