use super::PacketId;
use super::StreamId;
use super::bbr::DeliveryRateToken;
#[cfg(test)]
use crate::simulation::RealTime;
use crate::simulation::TimeSource;
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::Duration;
type PendingReceiptEntry = (Box<[u8]>, u64, Option<DeliveryRateToken>);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum PacketStream {
Stream(StreamId),
Control,
}
type AckProcessingResult = (
Vec<(Option<Duration>, usize, Option<DeliveryRateToken>)>,
Option<f64>,
);
const NETWORK_DELAY_ALLOWANCE: Duration = Duration::from_millis(500);
#[cfg(test)]
pub(crate) const MAX_CONFIRMATION_DELAY: Duration = Duration::from_millis(100);
#[cfg(not(test))]
const MAX_CONFIRMATION_DELAY: Duration = Duration::from_millis(100);
pub(super) const MESSAGE_CONFIRMATION_TIMEOUT: Duration = {
let millis: u128 = MAX_CONFIRMATION_DELAY.as_millis() + NETWORK_DELAY_ALLOWANCE.as_millis();
if millis > u64::MAX as u128 {
panic!("Value too large for u64");
}
Duration::from_millis(millis as u64)
};
const MAX_RTO_BACKOFF: u32 = 512;
const MAX_PACKET_RETRANSMITS: u32 = 12;
const MIN_RTO: Duration = Duration::from_millis(500);
const MIN_TLP_TIMEOUT: Duration = Duration::from_millis(10);
const PACKET_LOSS_DECAY_FACTOR: f64 = 1.0 / 1000.0;
pub(super) struct SentPacketTracker<T: TimeSource> {
pending_receipts: HashMap<PacketId, PendingReceiptEntry>,
resend_queue: VecDeque<ResendQueueEntry>,
packet_loss_proportion: f64,
pub(super) time_source: T,
srtt: Option<Duration>,
rttvar: Duration,
min_rtt: Duration,
rto: Duration,
retransmitted_packets: HashSet<PacketId>,
total_packets_sent: usize,
rto_backoff: u32,
tlp_sent_packets: HashSet<PacketId>,
retransmit_counts: HashMap<PacketId, u32>,
packet_streams: HashMap<PacketId, PacketStream>,
}
impl<T: TimeSource> SentPacketTracker<T> {
pub(super) fn new_with_time_source(time_source: T) -> Self {
SentPacketTracker {
pending_receipts: HashMap::new(),
resend_queue: VecDeque::new(),
packet_loss_proportion: 0.0,
time_source,
srtt: None,
rttvar: Duration::from_secs(0),
min_rtt: Duration::from_millis(100), rto: Duration::from_secs(1),
retransmitted_packets: HashSet::new(),
total_packets_sent: 0,
rto_backoff: 1, tlp_sent_packets: HashSet::new(),
retransmit_counts: HashMap::new(),
packet_streams: HashMap::new(),
}
}
fn probe_timeout(&self) -> Option<Duration> {
self.srtt.map(|srtt| {
let pto = srtt.saturating_mul(2);
pto.max(MIN_TLP_TIMEOUT)
})
}
}
impl<T: TimeSource> SentPacketTracker<T> {
pub(super) fn report_sent_packet(&mut self, packet_id: PacketId, payload: Box<[u8]>) {
self.report_sent_packet_with_token(packet_id, payload, None);
}
pub(super) fn report_sent_packet_with_token(
&mut self,
packet_id: PacketId,
payload: Box<[u8]>,
token: Option<DeliveryRateToken>,
) {
let stream = self
.packet_streams
.get(&packet_id)
.copied()
.unwrap_or(PacketStream::Control);
self.report_sent_packet_inner(packet_id, payload, token, stream);
}
pub(super) fn report_sent_stream_packet(
&mut self,
packet_id: PacketId,
payload: Box<[u8]>,
token: Option<DeliveryRateToken>,
stream_id: StreamId,
) {
self.report_sent_packet_inner(packet_id, payload, token, PacketStream::Stream(stream_id));
}
pub(super) fn refresh_sent_packet(
&mut self,
packet_id: PacketId,
payload: Box<[u8]>,
token: Option<DeliveryRateToken>,
) -> bool {
let sent_time_nanos = self.time_source.now_nanos();
match self.pending_receipts.get_mut(&packet_id) {
Some(slot) => {
*slot = (payload, sent_time_nanos, token);
true
}
None => {
false
}
}
}
fn report_sent_packet_inner(
&mut self,
packet_id: PacketId,
payload: Box<[u8]>,
token: Option<DeliveryRateToken>,
stream: PacketStream,
) {
let sent_time_nanos = self.time_source.now_nanos();
if let Some(slot) = self.pending_receipts.get_mut(&packet_id) {
*slot = (payload, sent_time_nanos, token);
return;
}
self.pending_receipts
.insert(packet_id, (payload, sent_time_nanos, token));
self.packet_streams.insert(packet_id, stream);
self.resend_queue.push_back(ResendQueueEntry { packet_id });
self.total_packets_sent += 1;
}
pub(super) fn drop_stream(&mut self, stream_id: StreamId) -> u64 {
let target = PacketStream::Stream(stream_id);
let owned: Vec<PacketId> = self
.packet_streams
.iter()
.filter_map(|(&pid, &stream)| (stream == target).then_some(pid))
.collect();
if owned.is_empty() {
return 0;
}
let mut released_bytes: u64 = 0;
for pid in &owned {
if let Some((payload, _, _)) = self.pending_receipts.remove(pid) {
released_bytes = released_bytes.saturating_add(payload.len() as u64);
}
self.packet_streams.remove(pid);
self.retransmit_counts.remove(pid);
self.retransmitted_packets.remove(pid);
self.tlp_sent_packets.remove(pid);
}
let dropped: HashSet<PacketId> = owned.into_iter().collect();
self.resend_queue
.retain(|entry| !dropped.contains(&entry.packet_id));
released_bytes
}
pub(super) fn contains_packet(&self, packet_id: PacketId) -> bool {
self.pending_receipts.contains_key(&packet_id)
}
pub(super) fn report_received_receipts(
&mut self,
packet_ids: &[PacketId],
) -> AckProcessingResult {
let now_nanos = self.time_source.now_nanos();
let mut ack_info = Vec::new();
for packet_id in packet_ids {
self.packet_loss_proportion = self.packet_loss_proportion
* (1.0 - PACKET_LOSS_DECAY_FACTOR)
+ (PACKET_LOSS_DECAY_FACTOR * 0.0);
let is_retransmitted = self.retransmitted_packets.contains(packet_id);
if let Some((payload, sent_time_nanos, token)) = self.pending_receipts.get(packet_id) {
let packet_size = payload.len();
let token = *token;
if is_retransmitted {
ack_info.push((None, packet_size, token));
self.rto_backoff = 1;
} else {
let rtt_nanos = now_nanos.saturating_sub(*sent_time_nanos);
let rtt_sample = Duration::from_nanos(rtt_nanos);
ack_info.push((Some(rtt_sample), packet_size, token));
self.update_rtt(rtt_sample);
self.rto_backoff = 1;
}
}
self.pending_receipts.remove(packet_id);
self.retransmitted_packets.remove(packet_id);
self.tlp_sent_packets.remove(packet_id);
self.retransmit_counts.remove(packet_id);
self.packet_streams.remove(packet_id);
}
let loss_rate = if self.total_packets_sent > 0 {
Some(self.packet_loss_proportion)
} else {
None
};
(ack_info, loss_rate)
}
fn update_rtt(&mut self, sample: Duration) {
const ALPHA: f64 = 1.0 / 8.0; const BETA: f64 = 1.0 / 4.0; const K: u32 = 4; const G: Duration = Duration::from_millis(10);
match self.srtt {
None => {
self.srtt = Some(sample);
self.rttvar = sample / 2;
self.min_rtt = sample;
}
Some(srtt) => {
let abs_diff = sample.abs_diff(srtt);
self.rttvar = self.rttvar.mul_f64(1.0 - BETA) + abs_diff.mul_f64(BETA);
self.srtt = Some(srtt.mul_f64(1.0 - ALPHA) + sample.mul_f64(ALPHA));
self.min_rtt = self.min_rtt.min(sample);
}
}
let rto_variance = self.rttvar * K;
let max_variance = if rto_variance > G { rto_variance } else { G };
self.rto = self.srtt.unwrap_or(Duration::from_secs(1)) + max_variance;
if self.rto < MIN_RTO {
self.rto = MIN_RTO;
} else if self.rto > Duration::from_secs(60) {
self.rto = Duration::from_secs(60);
}
}
pub(super) fn mark_retransmitted(&mut self, packet_id: PacketId) {
self.retransmitted_packets.insert(packet_id);
}
#[allow(dead_code)] pub(super) fn smoothed_rtt(&self) -> Option<Duration> {
self.srtt
}
pub(super) fn min_rtt(&self) -> Duration {
self.min_rtt
}
#[allow(dead_code)] pub(super) fn rto(&self) -> Duration {
self.rto
}
pub(super) fn effective_rto(&self) -> Duration {
let backed_off = self.rto.saturating_mul(self.rto_backoff);
backed_off.min(Duration::from_secs(60))
}
pub(super) fn on_timeout(&mut self) {
self.rto_backoff = (self.rto_backoff.saturating_mul(2)).min(MAX_RTO_BACKOFF);
tracing::debug!(
rto_backoff = self.rto_backoff,
effective_rto_ms = self.effective_rto().as_millis(),
"RTO backoff increased on timeout"
);
}
#[allow(dead_code)]
pub(super) fn rto_backoff(&self) -> u32 {
self.rto_backoff
}
pub(super) fn get_resend(&mut self) -> ResendAction {
let now_nanos = self.time_source.now_nanos();
let pto = self.probe_timeout();
let effective_rto_nanos = self.effective_rto().as_nanos() as u64;
while let Some(entry) = self.resend_queue.pop_front() {
if !self.pending_receipts.contains_key(&entry.packet_id) {
continue;
}
let sent_time_nanos = self
.pending_receipts
.get(&entry.packet_id)
.map(|(_, ts, _)| *ts)
.unwrap_or(0);
let rto_deadline = sent_time_nanos + effective_rto_nanos;
let tlp_deadline = if let Some(pto) = pto {
if !self.tlp_sent_packets.contains(&entry.packet_id) {
Some(sent_time_nanos + pto.as_nanos() as u64)
} else {
None
}
} else {
None
};
if let Some(tlp_at) = tlp_deadline {
if now_nanos >= tlp_at && now_nanos < rto_deadline {
if let Some((packet, _, _)) = self.pending_receipts.get(&entry.packet_id) {
let packet_clone = packet.clone();
let packet_id = entry.packet_id;
self.tlp_sent_packets.insert(packet_id);
self.resend_queue.push_front(entry);
self.mark_retransmitted(packet_id);
return ResendAction::TlpProbe(packet_id, packet_clone);
}
}
}
if now_nanos >= rto_deadline && self.pending_receipts.contains_key(&entry.packet_id) {
self.packet_loss_proportion = self.packet_loss_proportion
* (1.0 - PACKET_LOSS_DECAY_FACTOR)
+ PACKET_LOSS_DECAY_FACTOR;
let count = self.retransmit_counts.entry(entry.packet_id).or_insert(0);
*count += 1;
if *count > MAX_PACKET_RETRANSMITS {
let payload_len = self
.pending_receipts
.remove(&entry.packet_id)
.map(|(payload, _, _)| payload.len())
.unwrap_or(0);
self.retransmit_counts.remove(&entry.packet_id);
self.retransmitted_packets.remove(&entry.packet_id);
self.tlp_sent_packets.remove(&entry.packet_id);
self.packet_streams.remove(&entry.packet_id);
tracing::debug!(
packet_id = entry.packet_id,
retransmits = MAX_PACKET_RETRANSMITS,
payload_len,
"Abandoning packet after max retransmits — releasing flight size (#4345)"
);
return ResendAction::Abandon {
packet_id: entry.packet_id,
payload_len,
};
}
let packet = {
let slot = self
.pending_receipts
.get_mut(&entry.packet_id)
.expect("checked contains_key above");
slot.1 = now_nanos;
slot.0.clone()
};
self.resend_queue.push_back(ResendQueueEntry {
packet_id: entry.packet_id,
});
self.mark_retransmitted(entry.packet_id);
self.on_timeout();
self.tlp_sent_packets.remove(&entry.packet_id);
return ResendAction::Resend(entry.packet_id, packet);
}
let next_deadline = if let Some(tlp_at) = tlp_deadline {
tlp_at.min(rto_deadline)
} else {
rto_deadline
};
self.resend_queue.push_front(entry);
return ResendAction::WaitUntil(next_deadline);
}
let deadline_nanos = self.time_source.now_nanos() + self.effective_rto().as_nanos() as u64;
ResendAction::WaitUntil(deadline_nanos)
}
}
#[derive(Debug, PartialEq)]
pub enum ResendAction {
WaitUntil(u64),
Resend(u32, Box<[u8]>),
TlpProbe(u32, Box<[u8]>),
Abandon { packet_id: u32, payload_len: usize },
}
struct ResendQueueEntry {
packet_id: u32,
}
#[cfg(test)]
impl SentPacketTracker<RealTime> {
pub(super) fn new() -> Self {
Self::new_with_time_source(RealTime::new())
}
}
#[cfg(test)]
pub(in crate::transport) mod tests {
#![allow(clippy::wildcard_enum_match_arm)]
use super::*;
use crate::simulation::VirtualTime;
use rstest::rstest;
pub(in crate::transport) fn mock_sent_packet_tracker() -> SentPacketTracker<VirtualTime> {
let time_source = VirtualTime::new();
SentPacketTracker::new_with_time_source(time_source)
}
#[rstest]
#[case::single_packet(1, vec![1, 2, 3], 1, 1, 1)]
#[case::multiple_packets_first(1, vec![1], 1, 1, 1)]
fn test_report_sent_packet(
#[case] packet_id: PacketId,
#[case] payload: Vec<u8>,
#[case] expected_pending: usize,
#[case] expected_queue: usize,
#[case] expected_total_sent: usize,
) {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(packet_id, payload.into());
assert_eq!(tracker.pending_receipts.len(), expected_pending);
assert_eq!(tracker.resend_queue.len(), expected_queue);
assert_eq!(tracker.packet_loss_proportion, 0.0);
assert_eq!(tracker.total_packets_sent, expected_total_sent);
}
#[test]
fn test_report_received_receipts() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1, 2, 3].into());
let (ack_info, loss_rate) = tracker.report_received_receipts(&[1]);
assert_eq!(tracker.pending_receipts.len(), 0);
assert!(tracker.resend_queue.len() <= 1);
assert_eq!(tracker.packet_loss_proportion, 0.0);
assert_eq!(ack_info.len(), 1); assert!(ack_info[0].0.is_some()); assert_eq!(ack_info[0].1, 3); assert!(loss_rate.is_some()); }
#[test]
fn test_packet_lost() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1, 2, 3].into());
tracker.time_source.advance(tracker.effective_rto());
let resend_action = tracker.get_resend();
assert_eq!(resend_action, ResendAction::Resend(1, vec![1, 2, 3].into()));
assert_eq!(tracker.pending_receipts.len(), 1);
assert_eq!(tracker.resend_queue.len(), 1);
assert_eq!(tracker.packet_loss_proportion, PACKET_LOSS_DECAY_FACTOR);
}
#[test]
fn test_immediate_receipt_then_resend() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1, 2, 3].into());
tracker.report_sent_packet(2, vec![4, 5, 6].into());
let _ = tracker.report_received_receipts(&[1]);
tracker.time_source.advance(Duration::from_millis(9));
match tracker.get_resend() {
ResendAction::WaitUntil(_) => (),
ResendAction::Resend(..)
| ResendAction::TlpProbe(..)
| ResendAction::Abandon { .. } => {
panic!("Expected WaitUntil, got Resend/TlpProbe too early")
}
}
tracker.time_source.advance(Duration::from_millis(2));
match tracker.get_resend() {
ResendAction::TlpProbe(packet_id, _) => {
assert_eq!(packet_id, 2)
}
ResendAction::Resend(_, _) => panic!("Expected TlpProbe, got Resend"),
ResendAction::WaitUntil(_) => panic!("Expected TlpProbe, got WaitUntil"),
ResendAction::Abandon { .. } => panic!("Expected TlpProbe, got Abandon"),
}
}
#[test]
fn test_get_resend_with_pending_receipts() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(0, Box::from(&[][..]));
tracker.time_source.advance(Duration::from_millis(10));
tracker.report_sent_packet(1, Box::from(&[][..]));
let _ = tracker.report_received_receipts(&[0]);
match tracker.get_resend() {
ResendAction::WaitUntil(wait_until_nanos) => {
let now_nanos = tracker.time_source.now_nanos();
assert!(
wait_until_nanos >= now_nanos,
"Wait deadline should be in the future"
);
}
ResendAction::Resend(..)
| ResendAction::TlpProbe(..)
| ResendAction::Abandon { .. } => {
panic!("Expected ResendAction::WaitUntil")
}
}
}
#[rstest]
#[case::fast_rtt(50, 50, 25, 50)]
#[case::slow_rtt(100, 100, 50, 100)]
#[case::very_fast_rtt(10, 10, 5, 10)]
fn test_rtt_estimation_first_sample(
#[case] delay_ms: u64,
#[case] expected_srtt_ms: u64,
#[case] expected_rttvar_ms: u64,
#[case] expected_min_rtt_ms: u64,
) {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1, 2, 3].into());
tracker.time_source.advance(Duration::from_millis(delay_ms));
let (ack_info, _) = tracker.report_received_receipts(&[1]);
assert_eq!(ack_info.len(), 1);
assert_eq!(ack_info[0].0, Some(Duration::from_millis(delay_ms)));
assert_eq!(ack_info[0].1, 3); assert_eq!(
tracker.smoothed_rtt(),
Some(Duration::from_millis(expected_srtt_ms))
);
assert_eq!(tracker.rttvar, Duration::from_millis(expected_rttvar_ms)); assert_eq!(
tracker.min_rtt(),
Duration::from_millis(expected_min_rtt_ms)
);
}
#[test]
fn test_rtt_estimation_exponential_smoothing() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(100));
tracker.report_received_receipts(&[1]);
tracker.report_sent_packet(2, vec![2].into());
tracker.time_source.advance(Duration::from_millis(200));
let (ack_info, _) = tracker.report_received_receipts(&[2]);
assert_eq!(ack_info[0].0, Some(Duration::from_millis(200))); let srtt = tracker.smoothed_rtt().unwrap();
assert!((srtt.as_millis() as i64 - 112).abs() <= 1); }
#[test]
fn test_rtt_excludes_retransmissions() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.mark_retransmitted(1);
tracker.time_source.advance(Duration::from_millis(50));
let (ack_info, _) = tracker.report_received_receipts(&[1]);
assert_eq!(ack_info.len(), 1);
assert_eq!(ack_info[0].0, None); assert_eq!(ack_info[0].1, 1); assert_eq!(tracker.smoothed_rtt(), None); }
#[test]
fn test_rto_clamping() {
let mut tracker = mock_sent_packet_tracker();
assert_eq!(tracker.rto(), Duration::from_secs(1));
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(10)); tracker.report_received_receipts(&[1]);
assert!(tracker.rto() >= Duration::from_millis(500));
assert!(tracker.rto() <= Duration::from_secs(60));
}
#[rstest]
#[case::three_samples(&[100, 50, 150], 50)]
#[case::descending(&[200, 100, 50], 50)]
#[case::ascending(&[50, 100, 150], 50)]
#[case::single_sample(&[75], 75)]
fn test_min_rtt_tracking(#[case] rtt_samples: &[u64], #[case] expected_min_rtt_ms: u64) {
let mut tracker = mock_sent_packet_tracker();
for (i, &rtt_ms) in rtt_samples.iter().enumerate() {
let packet_id = (i + 1) as PacketId;
tracker.report_sent_packet(packet_id, vec![packet_id as u8].into());
tracker.time_source.advance(Duration::from_millis(rtt_ms));
tracker.report_received_receipts(&[packet_id]);
}
assert_eq!(
tracker.min_rtt(),
Duration::from_millis(expected_min_rtt_ms)
);
}
#[test]
fn test_retransmitted_packet_marked_on_timeout() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1, 2, 3].into());
tracker.time_source.advance(tracker.effective_rto());
match tracker.get_resend() {
ResendAction::Resend(packet_id, _) | ResendAction::TlpProbe(packet_id, _) => {
assert_eq!(packet_id, 1);
assert!(tracker.retransmitted_packets.contains(&1));
}
_ => panic!("Expected Resend or TlpProbe action"),
}
}
#[rstest]
#[case::first_timeout(1, 2, 2)]
#[case::second_timeout(2, 4, 4)]
#[case::third_timeout(3, 8, 8)]
#[case::fourth_timeout(4, 16, 16)]
fn test_rto_backoff_doubles_on_timeout(
#[case] timeout_count: u32,
#[case] expected_backoff: u32,
#[case] expected_rto_secs: u64,
) {
let mut tracker = mock_sent_packet_tracker();
assert_eq!(tracker.rto_backoff(), 1);
assert_eq!(tracker.effective_rto(), Duration::from_secs(1));
for _ in 0..timeout_count {
tracker.on_timeout();
}
assert_eq!(tracker.rto_backoff(), expected_backoff);
assert_eq!(
tracker.effective_rto(),
Duration::from_secs(expected_rto_secs)
);
}
#[test]
fn test_rto_backoff_capped_at_60_seconds() {
let mut tracker = mock_sent_packet_tracker();
for _ in 0..10 {
tracker.on_timeout();
}
assert_eq!(tracker.effective_rto(), Duration::from_secs(60));
assert_eq!(tracker.rto_backoff(), MAX_RTO_BACKOFF);
}
#[rstest]
#[case::non_retransmitted(false)]
#[case::retransmitted(true)]
fn test_rto_backoff_resets_on_ack(#[case] mark_retransmitted: bool) {
let mut tracker = mock_sent_packet_tracker();
tracker.on_timeout();
tracker.on_timeout();
assert_eq!(tracker.rto_backoff(), 4);
tracker.report_sent_packet(1, vec![1, 2, 3].into());
if mark_retransmitted {
tracker.mark_retransmitted(1);
}
tracker.time_source.advance(Duration::from_millis(50));
tracker.report_received_receipts(&[1]);
assert_eq!(tracker.rto_backoff(), 1);
if mark_retransmitted {
assert_eq!(tracker.effective_rto(), Duration::from_secs(1));
} else {
assert_eq!(tracker.effective_rto(), Duration::from_millis(500));
}
}
#[test]
fn test_death_spiral_recovery() {
let mut tracker = mock_sent_packet_tracker();
for _ in 0..6 {
tracker.on_timeout();
}
assert_eq!(tracker.rto_backoff(), 64);
tracker.report_sent_packet(1, vec![1, 2, 3].into());
tracker.mark_retransmitted(1);
tracker.time_source.advance(Duration::from_millis(50));
tracker.report_received_receipts(&[1]);
assert_eq!(tracker.rto_backoff(), 1);
}
#[test]
fn test_mixed_ack_batch_resets_on_fresh_packet() {
let mut tracker = mock_sent_packet_tracker();
for _ in 0..4 {
tracker.on_timeout();
}
assert_eq!(tracker.rto_backoff(), 16);
tracker.report_sent_packet(1, vec![1].into());
tracker.report_sent_packet(2, vec![2].into());
tracker.report_sent_packet(3, vec![3].into());
tracker.mark_retransmitted(1);
tracker.mark_retransmitted(2);
tracker.time_source.advance(Duration::from_millis(50));
tracker.report_received_receipts(&[1, 2, 3]);
assert_eq!(tracker.rto_backoff(), 1);
}
#[test]
fn test_backoff_re_elevation_after_recovery() {
let mut tracker = mock_sent_packet_tracker();
for _ in 0..4 {
tracker.on_timeout();
}
assert_eq!(tracker.rto_backoff(), 16);
tracker.report_sent_packet(1, vec![1].into());
tracker.mark_retransmitted(1);
tracker.time_source.advance(Duration::from_millis(50));
tracker.report_received_receipts(&[1]);
assert_eq!(
tracker.rto_backoff(),
1,
"Single retransmit ACK should fully recover"
);
tracker.on_timeout();
assert_eq!(
tracker.rto_backoff(),
2,
"Timeout after recovery should work normally"
);
tracker.on_timeout();
assert_eq!(
tracker.rto_backoff(),
4,
"Subsequent timeout should continue doubling"
);
tracker.report_sent_packet(10, vec![10].into());
tracker.time_source.advance(Duration::from_millis(50));
tracker.report_received_receipts(&[10]);
assert_eq!(tracker.rto_backoff(), 1, "Fresh ACK should reset to 1");
}
#[test]
fn test_get_resend_triggers_backoff() {
let mut tracker = mock_sent_packet_tracker();
assert_eq!(tracker.rto_backoff(), 1);
let initial_rto = tracker.effective_rto();
tracker.report_sent_packet(1, vec![1, 2, 3].into());
tracker.time_source.advance(initial_rto);
match tracker.get_resend() {
ResendAction::Resend(packet_id, _) => {
assert_eq!(packet_id, 1);
}
ResendAction::TlpProbe(_, _) => panic!("TLP shouldn't fire without RTT samples"),
_ => panic!("Expected Resend action"),
}
assert_eq!(tracker.rto_backoff(), 2);
}
#[test]
fn test_consecutive_resends_increase_backoff() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1, 2, 3].into());
assert_eq!(tracker.rto_backoff(), 1);
tracker.time_source.advance(Duration::from_secs(1));
match tracker.get_resend() {
ResendAction::Resend(_, payload) => {
tracker.report_sent_packet(1, payload);
}
ResendAction::TlpProbe(_, _) => panic!("TLP shouldn't fire without RTT samples"),
_ => panic!("Expected Resend"),
}
assert_eq!(tracker.rto_backoff(), 2);
assert_eq!(tracker.effective_rto(), Duration::from_secs(2));
tracker.time_source.advance(Duration::from_secs(2));
match tracker.get_resend() {
ResendAction::Resend(_, payload) => {
tracker.report_sent_packet(1, payload);
}
ResendAction::TlpProbe(_, _) => panic!("TLP shouldn't fire without RTT samples"),
_ => panic!("Expected Resend"),
}
assert_eq!(tracker.rto_backoff(), 4);
assert_eq!(tracker.effective_rto(), Duration::from_secs(4));
tracker.time_source.advance(Duration::from_secs(4));
match tracker.get_resend() {
ResendAction::Resend(_, _) => {}
ResendAction::TlpProbe(_, _) => panic!("TLP shouldn't fire without RTT samples"),
_ => panic!("Expected Resend"),
}
assert_eq!(tracker.rto_backoff(), 8);
assert_eq!(tracker.effective_rto(), Duration::from_secs(8));
}
const EXPECTED_MIN_RTO: Duration = Duration::from_millis(500);
#[test]
fn test_min_rto_is_500ms() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(10));
tracker.report_received_receipts(&[1]);
assert_eq!(
tracker.rto(),
EXPECTED_MIN_RTO,
"RTO should be clamped to 500ms minimum for fast networks"
);
}
#[test]
fn test_min_rto_allows_higher_values() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(400));
tracker.report_received_receipts(&[1]);
assert!(
tracker.rto() > EXPECTED_MIN_RTO,
"RTO should not be clamped when naturally above minimum"
);
}
#[test]
fn test_initial_rto_before_samples() {
let tracker = mock_sent_packet_tracker();
assert_eq!(
tracker.rto(),
Duration::from_secs(1),
"Initial RTO before any samples should be 1s"
);
}
#[test]
fn test_effective_rto_with_backoff_respects_min() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(10));
tracker.report_received_receipts(&[1]);
assert_eq!(tracker.rto(), EXPECTED_MIN_RTO);
assert_eq!(tracker.effective_rto(), EXPECTED_MIN_RTO);
tracker.on_timeout();
assert_eq!(tracker.effective_rto(), Duration::from_millis(1000));
tracker.on_timeout();
assert_eq!(tracker.effective_rto(), Duration::from_millis(2000));
}
#[test]
fn test_fast_loss_detection_with_500ms_rto() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(20));
tracker.report_received_receipts(&[1]);
tracker.report_sent_packet(2, vec![2].into());
tracker.time_source.advance(Duration::from_millis(39));
match tracker.get_resend() {
ResendAction::WaitUntil(_) => {} ResendAction::Resend(..)
| ResendAction::TlpProbe(..)
| ResendAction::Abandon { .. } => {
panic!("Should not fire before TLP timeout (40ms)")
}
}
tracker.time_source.advance(Duration::from_millis(2));
match tracker.get_resend() {
ResendAction::TlpProbe(id, _) => assert_eq!(id, 2, "TLP should probe packet 2"),
ResendAction::Resend(id, _) => {
assert_eq!(id, 2, "Or Resend if TLP not yet implemented")
}
ResendAction::WaitUntil(_) => panic!("Should have triggered TLP after 40ms"),
ResendAction::Abandon { .. } => panic!("unexpected Abandon"),
}
}
#[test]
fn test_timeout_intervals_actually_increase() {
let mut tracker = mock_sent_packet_tracker();
assert_eq!(tracker.effective_rto(), Duration::from_secs(1));
tracker.report_sent_packet(1, vec![1, 2, 3].into());
tracker.time_source.advance(Duration::from_millis(999));
match tracker.get_resend() {
ResendAction::WaitUntil(_) => {} ResendAction::TlpProbe(_, _) => panic!("TLP shouldn't fire without RTT samples"),
ResendAction::Resend(_, _) => panic!("Should not resend before RTO expires"),
ResendAction::Abandon { .. } => panic!("unexpected Abandon"),
}
tracker.time_source.advance(Duration::from_millis(1));
let payload = match tracker.get_resend() {
ResendAction::Resend(id, payload) => {
assert_eq!(id, 1);
payload
}
ResendAction::TlpProbe(_, _) => panic!("TLP shouldn't fire without RTT samples"),
_ => panic!("Expected Resend after 1s"),
};
assert_eq!(tracker.rto_backoff(), 2);
tracker.report_sent_packet(1, payload);
tracker.time_source.advance(Duration::from_millis(1999));
match tracker.get_resend() {
ResendAction::WaitUntil(_) => {} ResendAction::TlpProbe(_, _) => panic!("TLP shouldn't fire without RTT samples"),
ResendAction::Resend(_, _) => panic!("Should not resend before backed-off RTO (2s)"),
ResendAction::Abandon { .. } => panic!("unexpected Abandon"),
}
tracker.time_source.advance(Duration::from_millis(1));
let payload = match tracker.get_resend() {
ResendAction::Resend(id, payload) => {
assert_eq!(id, 1);
payload
}
ResendAction::TlpProbe(_, _) => panic!("TLP shouldn't fire without RTT samples"),
_ => panic!("Expected Resend after 2s"),
};
assert_eq!(tracker.rto_backoff(), 4);
tracker.report_sent_packet(1, payload);
tracker.time_source.advance(Duration::from_millis(3999));
match tracker.get_resend() {
ResendAction::WaitUntil(_) => {} ResendAction::TlpProbe(_, _) => panic!("TLP shouldn't fire without RTT samples"),
ResendAction::Resend(_, _) => panic!("Should not resend before backed-off RTO (4s)"),
ResendAction::Abandon { .. } => panic!("unexpected Abandon"),
}
tracker.time_source.advance(Duration::from_millis(1));
match tracker.get_resend() {
ResendAction::Resend(id, _) => assert_eq!(id, 1),
ResendAction::TlpProbe(_, _) => panic!("TLP shouldn't fire without RTT samples"),
_ => panic!("Expected Resend after 4s"),
}
assert_eq!(tracker.rto_backoff(), 8);
}
#[test]
fn test_tlp_fires_before_rto() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(50));
tracker.report_received_receipts(&[1]);
tracker.report_sent_packet(2, vec![2].into());
tracker.time_source.advance(Duration::from_millis(99));
match tracker.get_resend() {
ResendAction::WaitUntil(_) => {} ResendAction::Resend(..)
| ResendAction::TlpProbe(..)
| ResendAction::Abandon { .. } => {
panic!("Should not fire before TLP timeout")
}
}
tracker.time_source.advance(Duration::from_millis(2));
match tracker.get_resend() {
ResendAction::TlpProbe(id, _) => assert_eq!(id, 2, "TLP should probe packet 2"),
ResendAction::Resend(_, _) => panic!("Should be TLP probe, not full RTO resend"),
ResendAction::WaitUntil(_) => panic!("TLP should have fired at 2*SRTT"),
ResendAction::Abandon { .. } => panic!("unexpected Abandon"),
}
}
#[test]
fn test_tlp_does_not_apply_backoff() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(50));
tracker.report_received_receipts(&[1]);
assert_eq!(tracker.rto_backoff(), 1);
tracker.report_sent_packet(2, vec![2].into());
tracker.time_source.advance(Duration::from_millis(101));
match tracker.get_resend() {
ResendAction::TlpProbe(_, _) => {}
ResendAction::WaitUntil(_)
| ResendAction::Resend(..)
| ResendAction::Abandon { .. } => panic!("Expected TLP probe"),
}
assert_eq!(tracker.rto_backoff(), 1, "TLP should not increase backoff");
}
#[test]
fn test_tlp_followed_by_rto_if_no_ack() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(50));
tracker.report_received_receipts(&[1]);
tracker.report_sent_packet(2, vec![2].into());
tracker.time_source.advance(Duration::from_millis(101));
let payload = match tracker.get_resend() {
ResendAction::TlpProbe(id, payload) => {
assert_eq!(id, 2);
payload
}
ResendAction::WaitUntil(_)
| ResendAction::Resend(..)
| ResendAction::Abandon { .. } => panic!("Expected TLP probe"),
};
tracker.report_sent_packet(2, payload);
tracker.time_source.advance(Duration::from_millis(499));
match tracker.get_resend() {
ResendAction::WaitUntil(_) => {} ResendAction::Resend(..)
| ResendAction::TlpProbe(..)
| ResendAction::Abandon { .. } => {
panic!("Should still be waiting for RTO")
}
}
tracker.time_source.advance(Duration::from_millis(2));
match tracker.get_resend() {
ResendAction::Resend(id, _) => assert_eq!(id, 2, "RTO should fire after TLP failed"),
ResendAction::TlpProbe(_, _) => panic!("Should be RTO, not another TLP"),
_ => panic!("RTO should have fired"),
}
assert_eq!(tracker.rto_backoff(), 2, "RTO should increase backoff");
}
#[test]
fn test_tlp_cancelled_by_ack() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(50));
tracker.report_received_receipts(&[1]);
tracker.report_sent_packet(2, vec![2].into());
tracker.time_source.advance(Duration::from_millis(60));
tracker.report_received_receipts(&[2]);
match tracker.get_resend() {
ResendAction::WaitUntil(_) => {} ResendAction::TlpProbe(_, _) => panic!("TLP should be cancelled by ACK"),
ResendAction::Resend(_, _) => panic!("No resend needed, packet was ACKed"),
ResendAction::Abandon { .. } => panic!("unexpected Abandon"),
}
}
#[test]
fn test_tlp_minimum_timeout() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(2));
tracker.report_received_receipts(&[1]);
tracker.report_sent_packet(2, vec![2].into());
tracker.time_source.advance(Duration::from_millis(9));
match tracker.get_resend() {
ResendAction::WaitUntil(_) => {}
ResendAction::Resend(..)
| ResendAction::TlpProbe(..)
| ResendAction::Abandon { .. } => {
panic!("Should not fire before minimum TLP timeout")
}
}
tracker.time_source.advance(Duration::from_millis(2));
match tracker.get_resend() {
ResendAction::TlpProbe(id, _) => assert_eq!(id, 2),
ResendAction::WaitUntil(_)
| ResendAction::Resend(..)
| ResendAction::Abandon { .. } => {
panic!("TLP should fire at minimum 10ms")
}
}
}
#[test]
fn test_tlp_disabled_before_rtt_samples() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(500));
match tracker.get_resend() {
ResendAction::WaitUntil(_) => {}
ResendAction::TlpProbe(_, _) => panic!("TLP should not fire without RTT samples"),
ResendAction::Resend(_, _) => panic!("RTO is 1s, should not fire at 500ms"),
ResendAction::Abandon { .. } => panic!("unexpected Abandon"),
}
tracker.time_source.advance(Duration::from_millis(501));
match tracker.get_resend() {
ResendAction::Resend(id, _) => assert_eq!(id, 1),
ResendAction::TlpProbe(_, _) => panic!("Should be RTO, not TLP"),
_ => panic!("RTO should fire at 1s"),
}
}
#[test]
fn test_tlp_only_once_per_flight() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1].into());
tracker.time_source.advance(Duration::from_millis(50));
tracker.report_received_receipts(&[1]);
tracker.report_sent_packet(2, vec![2].into());
tracker.report_sent_packet(3, vec![3].into());
tracker.report_sent_packet(4, vec![4].into());
tracker.time_source.advance(Duration::from_millis(101));
match tracker.get_resend() {
ResendAction::TlpProbe(id, _) => {
assert_eq!(id, 2, "TLP should probe oldest unacked packet");
}
ResendAction::WaitUntil(_)
| ResendAction::Resend(..)
| ResendAction::Abandon { .. } => panic!("Expected TLP probe"),
}
}
#[test]
fn test_issue_4345_packet_abandoned_after_max_retransmits() {
let mut tracker = mock_sent_packet_tracker();
let payload_len = 3usize;
tracker.report_sent_packet(1, vec![1, 2, 3].into());
let mut resends = 0u32;
let mut abandoned = false;
for _ in 0..1000 {
tracker.time_source.advance(Duration::from_secs(120));
match tracker.get_resend() {
ResendAction::Resend(id, packet) => {
assert_eq!(id, 1);
tracker.report_sent_packet(id, packet);
resends += 1;
}
ResendAction::Abandon {
packet_id,
payload_len: len,
} => {
assert_eq!(packet_id, 1);
assert_eq!(len, payload_len);
abandoned = true;
break;
}
ResendAction::TlpProbe(..) => {}
ResendAction::WaitUntil(_) => {}
}
}
assert!(
abandoned,
"issue #4345: packet must be abandoned, never was"
);
assert_eq!(
resends, MAX_PACKET_RETRANSMITS,
"issue #4345: packet should be re-sent exactly MAX_PACKET_RETRANSMITS \
times before abandonment (got {resends})"
);
assert!(
tracker.pending_receipts.is_empty(),
"abandoned packet must be removed from pending_receipts"
);
assert!(
tracker.retransmit_counts.is_empty(),
"abandoned packet's retransmit count must be cleared"
);
tracker.time_source.advance(Duration::from_secs(120));
assert!(
matches!(tracker.get_resend(), ResendAction::WaitUntil(_)),
"abandoned packet must NOT be re-queued for resend"
);
}
#[test]
fn test_issue_4345_ack_resets_retransmit_count() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(1, vec![1, 2, 3].into());
for _ in 0..(MAX_PACKET_RETRANSMITS - 2) {
tracker.time_source.advance(Duration::from_secs(120));
match tracker.get_resend() {
ResendAction::Resend(id, packet) => tracker.report_sent_packet(id, packet),
other => panic!("expected Resend, got {other:?}"),
}
}
assert_eq!(
tracker.retransmit_counts.get(&1).copied(),
Some(MAX_PACKET_RETRANSMITS - 2)
);
tracker.report_received_receipts(&[1]);
assert!(
tracker.retransmit_counts.is_empty(),
"ACK must clear the retransmit count so the packet can't be abandoned later"
);
}
#[test]
fn test_issue_4345_flaky_but_alive_packet_never_abandoned() {
let mut tracker = mock_sent_packet_tracker();
let mut id = 1u32;
tracker.report_sent_packet(id, vec![1, 2, 3].into());
for cycle in 0..(MAX_PACKET_RETRANSMITS as usize * 5) {
tracker.time_source.advance(Duration::from_secs(120));
match tracker.get_resend() {
ResendAction::Resend(rid, packet) => {
tracker.report_sent_packet(rid, packet);
if (cycle + 1) % (MAX_PACKET_RETRANSMITS as usize - 1) == 0 {
tracker.report_received_receipts(&[rid]);
id += 1;
tracker.report_sent_packet(id, vec![1, 2, 3].into());
}
}
ResendAction::Abandon { .. } => {
panic!(
"issue #4345: a flaky-but-alive packet was abandoned at \
cycle {cycle} — ACKs before the limit must keep it alive"
);
}
ResendAction::TlpProbe(_, _) | ResendAction::WaitUntil(_) => {}
}
}
}
#[test]
fn test_issue_4345_late_ack_after_abandon_is_noop() {
let mut tracker = mock_sent_packet_tracker();
tracker.report_sent_packet(7, vec![9, 9, 9].into());
let mut abandoned = false;
for _ in 0..1000 {
tracker.time_source.advance(Duration::from_secs(120));
match tracker.get_resend() {
ResendAction::Resend(id, packet) => tracker.report_sent_packet(id, packet),
ResendAction::Abandon { packet_id, .. } => {
assert_eq!(packet_id, 7);
abandoned = true;
break;
}
ResendAction::TlpProbe(_, _) | ResendAction::WaitUntil(_) => {}
}
}
assert!(abandoned, "packet should have been abandoned");
let (ack_info, _) = tracker.report_received_receipts(&[7]);
assert!(
ack_info.is_empty(),
"issue #4345: a late ACK for an abandoned packet must be a no-op \
(no double release of flight size); got {ack_info:?}"
);
}
fn drive_to_abandon(tracker: &mut SentPacketTracker<VirtualTime>, packet_id: PacketId) {
for _ in 0..1000 {
tracker.time_source.advance(Duration::from_secs(120));
match tracker.get_resend() {
ResendAction::Resend(id, packet) => {
tracker.report_sent_packet(id, packet);
}
ResendAction::Abandon { packet_id: id, .. } => {
assert_eq!(
id, packet_id,
"an unexpected packet ({id}) abandoned before the target ({packet_id})"
);
return;
}
ResendAction::TlpProbe(..) | ResendAction::WaitUntil(_) => {}
}
}
panic!("packet {packet_id} was never abandoned");
}
#[test]
fn test_drop_stream_returns_exact_byte_total() {
let mut tracker = mock_sent_packet_tracker();
let stream = StreamId::next();
tracker.report_sent_stream_packet(1, vec![0u8; 3].into(), None, stream);
tracker.report_sent_stream_packet(2, vec![0u8; 5].into(), None, stream);
tracker.report_sent_stream_packet(3, vec![0u8; 7].into(), None, stream);
let released = tracker.drop_stream(stream);
assert_eq!(
released, 15,
"must return the sum of dropped payload lengths"
);
assert!(tracker.pending_receipts.is_empty());
assert!(tracker.packet_streams.is_empty());
assert!(tracker.retransmit_counts.is_empty());
assert!(
tracker.resend_queue.is_empty(),
"dropped packets must be purged from the resend queue"
);
}
#[test]
fn test_drop_stream_then_ack_is_noop() {
let mut tracker = mock_sent_packet_tracker();
let stream = StreamId::next();
tracker.report_sent_stream_packet(10, vec![1, 2, 3, 4].into(), None, stream);
tracker.report_sent_stream_packet(11, vec![5, 6].into(), None, stream);
let released = tracker.drop_stream(stream);
assert_eq!(released, 6);
let (ack_info, _) = tracker.report_received_receipts(&[10, 11]);
assert!(
ack_info.is_empty(),
"issue #4345: ACK after drop_stream must not release bytes again; got {ack_info:?}"
);
}
#[test]
fn test_drop_stream_then_abandon_path_is_noop() {
let mut tracker = mock_sent_packet_tracker();
let stream = StreamId::next();
tracker.report_sent_stream_packet(20, vec![0u8; 8].into(), None, stream);
let released = tracker.drop_stream(stream);
assert_eq!(released, 8);
for _ in 0..(MAX_PACKET_RETRANSMITS + 5) {
tracker.time_source.advance(Duration::from_secs(120));
match tracker.get_resend() {
ResendAction::WaitUntil(_) => {}
other => {
panic!("issue #4345: a dropped packet must never resend/abandon; got {other:?}")
}
}
}
}
#[test]
fn test_drop_stream_leaves_other_stream_intact() {
let mut tracker = mock_sent_packet_tracker();
let stream_a = StreamId::next();
let stream_b = StreamId::next();
tracker.report_sent_stream_packet(1, vec![0u8; 3].into(), None, stream_a);
tracker.report_sent_stream_packet(2, vec![0u8; 100].into(), None, stream_b);
tracker.report_sent_stream_packet(3, vec![0u8; 3].into(), None, stream_a);
tracker.report_sent_stream_packet(4, vec![0u8; 200].into(), None, stream_b);
assert_eq!(tracker.drop_stream(stream_a), 6);
assert!(tracker.pending_receipts.contains_key(&2));
assert!(tracker.pending_receipts.contains_key(&4));
assert_eq!(
tracker.packet_streams.get(&2),
Some(&PacketStream::Stream(stream_b))
);
assert_eq!(
tracker.packet_streams.get(&4),
Some(&PacketStream::Stream(stream_b))
);
assert!(!tracker.pending_receipts.contains_key(&1));
assert!(!tracker.pending_receipts.contains_key(&3));
let (ack_info, _) = tracker.report_received_receipts(&[2, 4]);
let acked_bytes: usize = ack_info.iter().map(|(_, size, _)| *size).sum();
assert_eq!(
acked_bytes, 300,
"stream B's bytes must still be ACK-creditable"
);
assert_eq!(
tracker.drop_stream(stream_b),
0,
"stream B was already fully ACKed, so nothing left to release"
);
}
#[test]
fn test_drop_stream_leaves_control_packets_intact() {
let mut tracker = mock_sent_packet_tracker();
let stream = StreamId::next();
tracker.report_sent_packet(1, vec![0u8; 50].into()); tracker.report_sent_stream_packet(2, vec![0u8; 9].into(), None, stream);
assert_eq!(tracker.drop_stream(stream), 9);
assert!(tracker.pending_receipts.contains_key(&1));
assert_eq!(tracker.packet_streams.get(&1), Some(&PacketStream::Control));
let (ack_info, _) = tracker.report_received_receipts(&[1]);
assert_eq!(ack_info.len(), 1);
assert_eq!(ack_info[0].1, 50);
}
#[test]
fn test_drop_stream_unknown_or_drained_returns_zero() {
let mut tracker = mock_sent_packet_tracker();
assert_eq!(tracker.drop_stream(StreamId::next()), 0);
let stream = StreamId::next();
tracker.report_sent_stream_packet(1, vec![0u8; 5].into(), None, stream);
tracker.report_received_receipts(&[1]);
assert_eq!(
tracker.drop_stream(stream),
0,
"a fully-ACKed stream has no bytes left to release"
);
assert_eq!(tracker.drop_stream(stream), 0);
}
#[test]
fn test_drop_stream_single_packet() {
let mut tracker = mock_sent_packet_tracker();
let stream = StreamId::next();
tracker.report_sent_stream_packet(1, vec![0u8; 42].into(), None, stream);
assert_eq!(tracker.drop_stream(stream), 42);
assert!(tracker.pending_receipts.is_empty());
assert!(tracker.packet_streams.is_empty());
}
#[test]
fn test_drop_stream_many_packets() {
let mut tracker = mock_sent_packet_tracker();
let stream = StreamId::next();
let n = 500u32;
let per_packet = 4usize;
for id in 0..n {
tracker.report_sent_stream_packet(id, vec![0u8; per_packet].into(), None, stream);
}
assert_eq!(
tracker.drop_stream(stream),
(n as u64) * (per_packet as u64)
);
assert!(tracker.pending_receipts.is_empty());
assert!(tracker.packet_streams.is_empty());
assert!(tracker.resend_queue.is_empty());
}
#[test]
fn test_drop_stream_sweeps_retransmitted_and_pending() {
let mut tracker = mock_sent_packet_tracker();
let stream = StreamId::next();
tracker.report_sent_stream_packet(1, vec![0u8; 10].into(), None, stream);
tracker.time_source.advance(Duration::from_secs(2));
match tracker.get_resend() {
ResendAction::Resend(id, packet) => {
assert_eq!(id, 1);
tracker.report_sent_packet(id, packet);
}
other => panic!("expected Resend for packet 1, got {other:?}"),
}
assert_eq!(
tracker.packet_streams.get(&1),
Some(&PacketStream::Stream(stream)),
"resend re-registration must preserve the stream tag"
);
assert!(tracker.retransmitted_packets.contains(&1));
tracker.report_sent_stream_packet(2, vec![0u8; 20].into(), None, stream);
assert_eq!(tracker.drop_stream(stream), 30);
assert!(tracker.pending_receipts.is_empty());
assert!(tracker.packet_streams.is_empty());
assert!(
tracker.retransmitted_packets.is_empty(),
"retransmitted-packet state for dropped packets must be cleared"
);
assert!(tracker.retransmit_counts.is_empty());
}
#[test]
fn test_drop_stream_does_not_double_release_abandoned_packet() {
let mut tracker = mock_sent_packet_tracker();
let stream = StreamId::next();
tracker.report_sent_stream_packet(1, vec![0u8; 4].into(), None, stream);
drive_to_abandon(&mut tracker, 1);
assert!(!tracker.packet_streams.contains_key(&1));
assert!(!tracker.pending_receipts.contains_key(&1));
tracker.report_sent_stream_packet(2, vec![0u8; 6].into(), None, stream);
assert_eq!(
tracker.drop_stream(stream),
6,
"issue #4345: drop_stream must not re-release bytes already released by Abandon"
);
}
#[test]
fn test_drop_stream_zero_length_packet() {
let mut tracker = mock_sent_packet_tracker();
let stream = StreamId::next();
tracker.report_sent_stream_packet(1, Box::from(&[][..]), None, stream);
assert_eq!(tracker.drop_stream(stream), 0);
assert!(tracker.pending_receipts.is_empty());
assert!(tracker.packet_streams.is_empty());
}
#[test]
fn test_drop_stream_releases_packet_out_for_resend() {
let mut tracker = mock_sent_packet_tracker();
let stream = StreamId::next();
tracker.report_sent_stream_packet(1, vec![0u8; 13].into(), None, stream);
tracker.time_source.advance(tracker.effective_rto());
match tracker.get_resend() {
ResendAction::Resend(id, _packet) => assert_eq!(id, 1),
other => panic!("expected Resend for the out-for-resend packet, got {other:?}"),
}
assert!(tracker.pending_receipts.contains_key(&1));
assert_eq!(
tracker.packet_streams.get(&1),
Some(&PacketStream::Stream(stream))
);
assert_eq!(
tracker.drop_stream(stream),
13,
"issue #4345: a packet out-for-resend must still be released by drop_stream"
);
assert!(tracker.pending_receipts.is_empty());
assert!(tracker.packet_streams.is_empty());
let (ack_info, _) = tracker.report_received_receipts(&[1]);
assert!(
ack_info.is_empty(),
"a late ACK after the gap-drop must not release bytes again"
);
}
#[test]
fn test_refresh_after_drop_stream_does_not_resurrect() {
let mut tracker = mock_sent_packet_tracker();
let stream = StreamId::next();
tracker.report_sent_stream_packet(1, vec![0u8; 13].into(), None, stream);
tracker.time_source.advance(tracker.effective_rto());
let resent_payload = match tracker.get_resend() {
ResendAction::Resend(id, packet) => {
assert_eq!(id, 1);
packet
}
other => panic!("expected Resend, got {other:?}"),
};
assert_eq!(tracker.drop_stream(stream), 13);
assert!(!tracker.pending_receipts.contains_key(&1));
let refreshed = tracker.refresh_sent_packet(1, resent_payload, None);
assert!(
!refreshed,
"refresh_sent_packet must report no-op for a dropped packet"
);
assert!(
!tracker.pending_receipts.contains_key(&1),
"dropped packet must NOT be resurrected into pending_receipts"
);
assert!(!tracker.packet_streams.contains_key(&1));
assert!(!tracker.retransmit_counts.contains_key(&1));
let (ack_info, _) = tracker.report_received_receipts(&[1]);
assert!(
ack_info.is_empty(),
"ACK of a dropped-then-refreshed packet must release zero bytes; got {ack_info:?}"
);
for _ in 0..(MAX_PACKET_RETRANSMITS + 5) {
tracker.time_source.advance(Duration::from_secs(120));
match tracker.get_resend() {
ResendAction::WaitUntil(_) => {}
other => panic!(
"dropped packet must never resend/abandon after refresh no-op; got {other:?}"
),
}
}
}
#[test]
fn test_report_sent_packet_resurrects_dropped_packet_footgun() {
let mut tracker = mock_sent_packet_tracker();
let stream = StreamId::next();
tracker.report_sent_stream_packet(1, vec![0u8; 13].into(), None, stream);
assert_eq!(tracker.drop_stream(stream), 13);
tracker.report_sent_packet(1, vec![0u8; 13].into());
assert!(
tracker.pending_receipts.contains_key(&1),
"report_sent_packet IS insert-capable (resurrects) — this is why the \
recv loop must use refresh_sent_packet instead"
);
assert_eq!(
tracker.packet_streams.get(&1),
Some(&PacketStream::Control),
"the resurrected packet is mis-tagged Control (the double-release footgun)"
);
}
}