#![allow(dead_code)]
use super::ack::RecordNumber;
use super::reliability::Action;
use alloc::vec::Vec;
use core::time::Duration;
const INITIAL_TIMEOUT: Duration = Duration::from_millis(1000);
const MAX_TIMEOUT: Duration = Duration::from_secs(60);
const MAX_RETRANSMITS: u32 = 6;
type Instant = Duration;
pub(crate) struct InFlightRecord {
pub(crate) record_number: RecordNumber,
pub(crate) datagram: Vec<u8>,
}
pub(crate) struct Retransmit13 {
in_flight: Vec<InFlightRecord>,
deadline: Option<Instant>,
timeout: Duration,
attempts: u32,
}
impl Retransmit13 {
pub(crate) fn new() -> Self {
Self {
in_flight: Vec::new(),
deadline: None,
timeout: INITIAL_TIMEOUT,
attempts: 0,
}
}
pub(crate) fn on_record_sent(&mut self, rec: InFlightRecord, now: Instant) {
let was_empty = self.in_flight.is_empty();
self.in_flight.push(rec);
if was_empty {
self.timeout = INITIAL_TIMEOUT;
self.attempts = 0;
self.deadline = Some(now + INITIAL_TIMEOUT);
}
}
pub(crate) fn on_ack(&mut self, acks: &[RecordNumber]) {
if acks.is_empty() {
return;
}
self.in_flight.retain(|r| !acks.contains(&r.record_number));
if self.in_flight.is_empty() {
self.deadline = None;
self.timeout = INITIAL_TIMEOUT;
self.attempts = 0;
}
}
pub(crate) fn release_epoch(&mut self, epoch: u64) {
if self.in_flight.is_empty() {
return;
}
self.in_flight.retain(|r| r.record_number.epoch != epoch);
if self.in_flight.is_empty() {
self.deadline = None;
self.timeout = INITIAL_TIMEOUT;
self.attempts = 0;
}
}
pub(crate) fn next_timeout(&self) -> Option<Instant> {
self.deadline
}
pub(crate) fn on_timeout(&mut self, now: Instant) -> Action {
let Some(deadline) = self.deadline else {
return Action::Idle;
};
if self.in_flight.is_empty() {
self.deadline = None;
return Action::Idle;
}
if now < deadline {
return Action::Idle;
}
if self.attempts >= MAX_RETRANSMITS {
self.deadline = None;
return Action::GiveUp;
}
self.attempts += 1;
self.timeout = (self.timeout * 2).min(MAX_TIMEOUT);
self.deadline = Some(now + self.timeout);
Action::Retransmit
}
pub(crate) fn in_flight_datagrams(&self) -> impl Iterator<Item = &[u8]> {
self.in_flight.iter().map(|r| r.datagram.as_slice())
}
pub(crate) fn in_flight_len(&self) -> usize {
self.in_flight.len()
}
}
impl Default for Retransmit13 {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloc::vec;
fn rec(epoch: u64, seq: u64, datagram: &[u8]) -> InFlightRecord {
InFlightRecord {
record_number: RecordNumber { epoch, seq },
datagram: datagram.to_vec(),
}
}
#[test]
fn new_is_idle() {
let r = Retransmit13::new();
assert!(r.next_timeout().is_none());
assert_eq!(r.in_flight_len(), 0);
}
#[test]
fn on_record_sent_starts_timer() {
let mut r = Retransmit13::new();
assert_eq!(r.next_timeout(), None);
r.on_record_sent(rec(0, 1, b"hello"), Duration::from_secs(0));
assert_eq!(r.next_timeout(), Some(Duration::from_secs(1)));
assert_eq!(r.in_flight_len(), 1);
}
#[test]
fn sending_more_records_does_not_re_arm_timer() {
let mut r = Retransmit13::new();
r.on_record_sent(rec(0, 1, b"a"), Duration::from_secs(0));
let first = r.next_timeout();
r.on_record_sent(rec(0, 2, b"b"), Duration::from_millis(500));
assert_eq!(r.next_timeout(), first);
assert_eq!(r.in_flight_len(), 2);
}
#[test]
fn ack_clears_matching_record_only() {
let mut r = Retransmit13::new();
r.on_record_sent(rec(1, 10, b"x"), Duration::from_secs(0));
r.on_record_sent(rec(1, 11, b"y"), Duration::from_secs(0));
r.on_record_sent(rec(1, 12, b"z"), Duration::from_secs(0));
r.on_ack(&[RecordNumber { epoch: 1, seq: 11 }]);
assert_eq!(r.in_flight_len(), 2);
assert!(r.next_timeout().is_some());
let remaining: Vec<&[u8]> = r.in_flight_datagrams().collect();
assert_eq!(remaining, vec![b"x".as_slice(), b"z".as_slice()]);
}
#[test]
fn full_ack_cancels_timer() {
let mut r = Retransmit13::new();
r.on_record_sent(rec(0, 1, b"a"), Duration::from_secs(0));
r.on_record_sent(rec(0, 2, b"b"), Duration::from_secs(0));
assert!(r.next_timeout().is_some());
r.on_ack(&[
RecordNumber { epoch: 0, seq: 1 },
RecordNumber { epoch: 0, seq: 2 },
]);
assert_eq!(r.in_flight_len(), 0);
assert_eq!(r.next_timeout(), None);
}
#[test]
fn unknown_ack_is_ignored() {
let mut r = Retransmit13::new();
r.on_record_sent(rec(0, 1, b"a"), Duration::from_secs(0));
let before = r.next_timeout();
r.on_ack(&[RecordNumber {
epoch: 999,
seq: 999,
}]);
assert_eq!(r.in_flight_len(), 1);
assert_eq!(r.next_timeout(), before);
}
#[test]
fn empty_ack_is_no_op() {
let mut r = Retransmit13::new();
r.on_record_sent(rec(0, 1, b"a"), Duration::from_secs(0));
let before = r.next_timeout();
r.on_ack(&[]);
assert_eq!(r.in_flight_len(), 1);
assert_eq!(r.next_timeout(), before);
}
#[test]
fn timeout_triggers_retransmit_of_all_in_flight() {
let mut r = Retransmit13::new();
r.on_record_sent(rec(0, 1, b"alpha"), Duration::from_secs(0));
r.on_record_sent(rec(0, 2, b"beta"), Duration::from_secs(0));
assert_eq!(r.on_timeout(Duration::from_millis(500)), Action::Idle);
assert_eq!(r.on_timeout(Duration::from_secs(1)), Action::Retransmit);
let datagrams: Vec<&[u8]> = r.in_flight_datagrams().collect();
assert_eq!(datagrams, vec![b"alpha".as_slice(), b"beta".as_slice()]);
assert_eq!(r.next_timeout(), Some(Duration::from_secs(3)));
}
#[test]
fn backoff_doubles_and_caps_at_max() {
let mut r = Retransmit13::new();
r.on_record_sent(rec(0, 1, b"x"), Duration::from_secs(0));
let expected_gaps = [
Duration::from_secs(2),
Duration::from_secs(4),
Duration::from_secs(8),
Duration::from_secs(16),
Duration::from_secs(32),
MAX_TIMEOUT, ];
assert_eq!(expected_gaps.len() as u32, MAX_RETRANSMITS);
for expected in expected_gaps {
let deadline = r.next_timeout().expect("armed");
assert_eq!(r.on_timeout(deadline), Action::Retransmit);
let next = r.next_timeout().expect("re-armed");
assert_eq!(next - deadline, expected);
}
}
#[test]
fn give_up_after_max_retransmits() {
let mut r = Retransmit13::new();
r.on_record_sent(rec(0, 1, b"x"), Duration::from_secs(0));
for _ in 0..MAX_RETRANSMITS {
let deadline = r.next_timeout().expect("armed");
assert_eq!(r.on_timeout(deadline), Action::Retransmit);
}
let deadline = r.next_timeout().expect("still armed");
assert_eq!(r.on_timeout(deadline), Action::GiveUp);
assert_eq!(r.next_timeout(), None);
assert_eq!(r.in_flight_len(), 1);
}
#[test]
fn release_epoch_drops_only_that_epoch() {
let mut r = Retransmit13::new();
r.on_record_sent(rec(0, 0, b"plain"), Duration::from_secs(0));
r.on_record_sent(rec(2, 0, b"enc0"), Duration::from_secs(0));
r.on_record_sent(rec(2, 1, b"enc1"), Duration::from_secs(0));
r.release_epoch(0);
assert_eq!(r.in_flight_len(), 2);
assert!(r.next_timeout().is_some());
let remaining: Vec<&[u8]> = r.in_flight_datagrams().collect();
assert_eq!(remaining, vec![b"enc0".as_slice(), b"enc1".as_slice()]);
r.release_epoch(2);
assert_eq!(r.in_flight_len(), 0);
assert_eq!(r.next_timeout(), None);
}
#[test]
fn release_epoch_on_empty_is_no_op() {
let mut r = Retransmit13::new();
r.release_epoch(0);
assert_eq!(r.in_flight_len(), 0);
assert_eq!(r.next_timeout(), None);
}
#[test]
fn idle_with_no_in_flight() {
let mut r = Retransmit13::new();
assert_eq!(r.on_timeout(Duration::from_secs(10)), Action::Idle);
assert_eq!(r.next_timeout(), None);
}
#[test]
fn timer_resets_to_initial_after_full_drain() {
let mut r = Retransmit13::new();
r.on_record_sent(rec(0, 1, b"x"), Duration::from_secs(0));
assert_eq!(r.on_timeout(Duration::from_secs(1)), Action::Retransmit);
r.on_ack(&[RecordNumber { epoch: 0, seq: 1 }]);
assert_eq!(r.next_timeout(), None);
r.on_record_sent(rec(0, 2, b"y"), Duration::from_secs(10));
assert_eq!(r.next_timeout(), Some(Duration::from_secs(11)));
}
#[test]
fn ack_does_not_cancel_until_all_records_drained() {
let mut r = Retransmit13::new();
r.on_record_sent(rec(0, 1, b"a"), Duration::from_secs(0));
r.on_record_sent(rec(0, 2, b"b"), Duration::from_secs(0));
r.on_record_sent(rec(0, 3, b"c"), Duration::from_secs(0));
r.on_ack(&[
RecordNumber { epoch: 0, seq: 1 },
RecordNumber { epoch: 0, seq: 3 },
]);
assert_eq!(r.in_flight_len(), 1);
assert!(r.next_timeout().is_some());
r.on_ack(&[RecordNumber { epoch: 0, seq: 2 }]);
assert_eq!(r.in_flight_len(), 0);
assert!(r.next_timeout().is_none());
}
}