use crate::EventSink;
use crate::api::Options;
use crate::api::SocketEvent;
use crate::api::SocketTime;
use crate::api::StreamId;
use crate::api::handover::HandoverReadiness;
use crate::api::handover::SocketHandoverState;
use crate::math::is_divisible_by_4;
use crate::math::round_down_to_4;
use crate::math::round_up_to_4;
use crate::packet::chunk::Chunk;
use crate::packet::data::Data;
use crate::packet::data_chunk;
use crate::packet::idata_chunk;
use crate::packet::sack_chunk::SackChunk;
use crate::timer::Timer;
use crate::timer::{self};
use crate::tx::outstanding_data::ChunkState;
use crate::tx::outstanding_data::OutstandingData;
use crate::tx::send_queue::DataToSend;
use crate::types::OutgoingMessageId;
use crate::types::Tsn;
use std::cell::RefCell;
use std::cmp::max;
use std::cmp::min;
use std::rc::Rc;
use std::time::Duration;
#[derive(Debug, PartialEq)]
enum CongestionAlgorithmPhase {
SlowStart,
CongestionAvoidance,
}
const MAX_EXPIRY: Duration = Duration::from_secs(3600);
#[derive(Debug, PartialEq)]
pub enum HandleSackResult {
Invalid,
Valid { rtt: Option<Duration>, reset_error_counter: bool },
}
pub struct RetransmissionQueue {
mtu: usize,
cwnd_mtus_min: usize,
avoid_fragmentation_cwnd_mtus: usize,
partial_reliability: bool,
data_chunk_header_size: usize,
use_message_interleaving: bool,
cwnd: usize,
a_rwnd: usize,
ssthresh: usize,
partial_bytes_acked: usize,
rtx_packets_count: usize,
rtx_bytes_count: u64,
fast_recovery_exit_tsn: Option<Tsn>,
outstanding_data: OutstandingData,
t3_rtx: Timer,
events: Rc<RefCell<dyn EventSink>>,
}
impl RetransmissionQueue {
pub fn new(
events: Rc<RefCell<dyn EventSink>>,
my_initial_tsn: Tsn,
a_rwnd: u32,
options: &Options,
supports_partial_reliability: bool,
use_message_interleaving: bool,
) -> Self {
let data_chunk_header_size = if use_message_interleaving {
idata_chunk::HEADER_SIZE
} else {
data_chunk::HEADER_SIZE
};
Self {
mtu: options.mtu,
cwnd_mtus_min: options.cwnd_mtus_min,
avoid_fragmentation_cwnd_mtus: options.avoid_fragmentation_cwnd_mtus,
partial_reliability: supports_partial_reliability,
data_chunk_header_size,
use_message_interleaving,
cwnd: options.cwnd_mtus_initial * options.mtu,
a_rwnd: a_rwnd as usize,
ssthresh: a_rwnd as usize,
partial_bytes_acked: 0,
rtx_packets_count: 0,
rtx_bytes_count: 0,
fast_recovery_exit_tsn: None,
outstanding_data: OutstandingData::new(data_chunk_header_size, my_initial_tsn - 1),
t3_rtx: Timer::new(
options.rto_initial,
timer::BackoffAlgorithm::Exponential,
None,
options.max_timer_backoff_duration,
),
events,
}
}
fn start_t3_rtx_if_outstanding_data(&mut self, now: SocketTime) {
if self.outstanding_data.is_empty() {
} else {
if !self.t3_rtx.is_running() {
self.t3_rtx.start(now);
}
}
}
pub fn next_timeout(&self) -> Option<SocketTime> {
self.t3_rtx.next_expiry()
}
fn is_sack_valid(&self, sack: &SackChunk) -> bool {
if sack.cumulative_tsn_ack < self.outstanding_data.last_cumulative_acked_tsn() {
false
} else {
sack.cumulative_tsn_ack <= self.outstanding_data.highest_outstanding_tsn()
}
}
fn maybe_exit_fast_recovery(&mut self, cumulative_tsn_ack: Tsn) {
if let Some(fast_recovery_exit_tsn) = self.fast_recovery_exit_tsn {
if cumulative_tsn_ack >= fast_recovery_exit_tsn {
self.fast_recovery_exit_tsn = None;
}
}
}
fn is_in_fast_recovery(&self) -> bool {
self.fast_recovery_exit_tsn.is_some()
}
fn phase(&self) -> CongestionAlgorithmPhase {
if self.cwnd <= self.ssthresh {
CongestionAlgorithmPhase::SlowStart
} else {
CongestionAlgorithmPhase::CongestionAvoidance
}
}
fn handle_increased_cumulative_tsn_ack(
&mut self,
unacked_packet_bytes: usize,
total_packet_bytes_acked: usize,
) {
let is_fully_utilized = unacked_packet_bytes + self.mtu >= self.cwnd;
let old_cwnd = self.cwnd;
match self.phase() {
CongestionAlgorithmPhase::SlowStart => {
if is_fully_utilized && !self.is_in_fast_recovery() {
self.cwnd += min(total_packet_bytes_acked, self.mtu);
log::debug!("SS increase cwnd={} ({})", self.cwnd, old_cwnd);
}
}
CongestionAlgorithmPhase::CongestionAvoidance => {
let old_pba = self.partial_bytes_acked;
self.partial_bytes_acked += total_packet_bytes_acked;
if self.partial_bytes_acked >= self.cwnd && is_fully_utilized {
self.partial_bytes_acked -= self.cwnd;
self.cwnd += self.mtu;
log::debug!(
"CA increase cwnd={} ({}), ssthresh={}, pba={} ({})",
self.cwnd,
old_cwnd,
self.ssthresh,
self.partial_bytes_acked,
old_pba
);
} else {
log::debug!(
"CA unchanged cwnd={} ({}), ssthresh={}, pba={} ({})",
self.cwnd,
old_cwnd,
self.ssthresh,
self.partial_bytes_acked,
old_pba
);
}
}
}
}
fn handle_packet_loss(&mut self, _highest_tsn_acked: Tsn) {
if !self.is_in_fast_recovery() {
let old_cwnd = self.cwnd;
let old_pba = self.partial_bytes_acked;
self.ssthresh = max(self.cwnd / 2, self.cwnd_mtus_min * self.mtu);
self.cwnd = self.ssthresh;
self.partial_bytes_acked = 0;
log::debug!(
"packet loss detected (not fast recovery). cwnd={} ({}), ssthresh={}, pba={} ({})",
self.cwnd,
old_cwnd,
self.ssthresh,
self.partial_bytes_acked,
old_pba
);
self.fast_recovery_exit_tsn = Some(self.outstanding_data.highest_outstanding_tsn());
log::debug!(
"fast recovery initiated with exit_point={}",
self.fast_recovery_exit_tsn.unwrap()
);
} else {
log::debug!("packet loss detected (fast recovery). No changes.");
}
}
pub fn update_rto(&mut self, rto: Duration) {
self.t3_rtx.set_duration(rto);
}
pub fn handle_sack(&mut self, now: SocketTime, sack: &SackChunk) -> HandleSackResult {
if !self.is_sack_valid(sack) {
return HandleSackResult::Invalid;
}
let old_last_cumulative_tsn_ack = self.outstanding_data.last_cumulative_acked_tsn();
let old_unacked_packet_bytes = self.outstanding_data.unacked_packet_bytes();
let old_unacked_payload_bytes = self.outstanding_data.unacked_payload_bytes();
let old_rwnd = self.rwnd();
let rtt = if sack.gap_ack_blocks.is_empty() {
self.outstanding_data.measure_rtt(now, sack.cumulative_tsn_ack)
} else {
None
};
self.maybe_exit_fast_recovery(sack.cumulative_tsn_ack);
let ack_info = self.outstanding_data.handle_sack(
sack.cumulative_tsn_ack,
&sack.gap_ack_blocks,
self.is_in_fast_recovery(),
);
for lid in ack_info.acked_lifecycle_ids {
self.events.borrow_mut().add(SocketEvent::OnLifecycleMessageDelivered(lid.clone()));
self.events.borrow_mut().add(SocketEvent::OnLifecycleEnd(lid));
}
for lid in ack_info.abandoned_lifecycle_ids {
self.events.borrow_mut().add(SocketEvent::OnLifecycleMessageMaybeExpired(lid.clone()));
self.events.borrow_mut().add(SocketEvent::OnLifecycleEnd(lid));
}
self.a_rwnd = sack.a_rwnd as usize;
log::debug!(
"Received SACK, cum_tsn_ack={} ({}), unacked_payload_bytes={} ({}), rwnd={} ({})",
sack.cumulative_tsn_ack,
old_last_cumulative_tsn_ack,
self.outstanding_data.unacked_payload_bytes(),
old_unacked_payload_bytes,
self.rwnd(),
old_rwnd
);
if sack.cumulative_tsn_ack > old_last_cumulative_tsn_ack {
self.t3_rtx.stop();
self.handle_increased_cumulative_tsn_ack(
old_unacked_packet_bytes,
ack_info.packet_bytes_acked,
);
}
if ack_info.has_packet_loss {
self.handle_packet_loss(ack_info.highest_tsn_acked);
}
let reset_error_counter = ack_info.payload_bytes_acked > 0;
self.start_t3_rtx_if_outstanding_data(now);
HandleSackResult::Valid { rtt, reset_error_counter }
}
pub fn handle_timeout(&mut self, now: SocketTime) -> bool {
if !self.t3_rtx.expire(now) {
return false;
}
let old_cwnd = self.cwnd;
let old_unacked_packet_bytes = self.unacked_packet_bytes();
self.ssthresh = max(self.cwnd / 2, 4 * self.mtu);
self.cwnd = self.mtu;
self.partial_bytes_acked = 0;
self.outstanding_data.nack_all();
log::debug!(
"t3-rtx expired. new cwnd={} ({}), ssthresh={}, unacked_packet_bytes {} ({})",
self.cwnd,
old_cwnd,
self.ssthresh,
self.unacked_packet_bytes(),
old_unacked_packet_bytes
);
true
}
pub fn has_data_to_be_fast_retransmitted(&self) -> bool {
self.outstanding_data.has_data_to_be_fast_retransmitted()
}
pub fn get_chunks_for_fast_retransmit(
&mut self,
now: SocketTime,
bytes_remaining_in_packet: usize,
) -> Vec<(Tsn, Data)> {
debug_assert!(is_divisible_by_4!(bytes_remaining_in_packet));
let old_unacked_packet_bytes = self.unacked_packet_bytes();
let to_be_sent = self
.outstanding_data
.get_chunks_to_be_fast_retransmitted(now, bytes_remaining_in_packet);
debug_assert!(!to_be_sent.is_empty());
if to_be_sent[0].0 == self.outstanding_data.last_cumulative_acked_tsn() + 1 {
self.t3_rtx.stop();
}
if !self.t3_rtx.is_running() {
self.t3_rtx.start(now);
}
let packet_bytes_retransmitted: usize = to_be_sent
.iter()
.map(|(_, data)| round_up_to_4!(self.data_chunk_header_size + data.payload.len()))
.sum();
self.rtx_packets_count += 1;
self.rtx_bytes_count += packet_bytes_retransmitted as u64;
log::debug!(
"Fast-retransmitting TSN {} - {} bytes. unacked_packet_bytes={} ({})",
to_be_sent.iter().map(|(tsn, _)| tsn.to_string()).collect::<Vec<_>>().join(","),
packet_bytes_retransmitted,
self.unacked_packet_bytes(),
old_unacked_packet_bytes
);
to_be_sent
}
pub fn get_chunks_to_send(
&mut self,
now: SocketTime,
bytes_remaining_in_packet: usize,
mut produce: impl FnMut(usize, &[(StreamId, OutgoingMessageId)]) -> Option<DataToSend>,
) -> Vec<(Tsn, Data)> {
debug_assert!(is_divisible_by_4!(bytes_remaining_in_packet));
let old_unacked_packet_bytes = self.unacked_packet_bytes();
let old_rwnd = self.rwnd();
let mut max_bytes =
round_down_to_4!(min(self.max_bytes_to_send(), bytes_remaining_in_packet));
let mut to_be_sent = self.outstanding_data.get_chunks_to_be_retransmitted(now, max_bytes);
let packet_bytes_retransmitted: usize = to_be_sent
.iter()
.map(|(_, data)| round_up_to_4!(self.data_chunk_header_size + data.payload.len()))
.sum();
max_bytes -= packet_bytes_retransmitted;
if !to_be_sent.is_empty() {
self.rtx_packets_count += 1;
self.rtx_bytes_count += packet_bytes_retransmitted as u64;
}
while max_bytes > self.data_chunk_header_size {
debug_assert!(is_divisible_by_4!(max_bytes));
if let Some(chunk) = produce(
max_bytes - self.data_chunk_header_size,
&self.outstanding_data.get_unsent_messages_to_discard(),
) {
let chunk_size =
round_up_to_4!(self.data_chunk_header_size + chunk.data.payload.len());
max_bytes -= chunk_size;
let max_retransmissions = self.chunk_max_retransmissions(&chunk);
let expires_at = self.chunk_expires_at(now, &chunk);
if let Some(tsn) = self.outstanding_data.insert(
chunk.message_id,
&chunk.data,
now,
max_retransmissions,
expires_at,
chunk.lifecycle_id.clone(),
) {
if let Some(lid) = chunk.lifecycle_id {
self.events.borrow_mut().add(SocketEvent::OnLifecycleMessageFullySent(lid));
}
to_be_sent.push((tsn, chunk.data));
}
} else {
break;
}
}
if !to_be_sent.is_empty() {
if !self.t3_rtx.is_running() {
self.t3_rtx.start(now);
}
let sent_bytes: usize = to_be_sent
.iter()
.map(|(_, data)| round_up_to_4!(self.data_chunk_header_size + data.payload.len()))
.sum();
log::debug!(
"Sending TSN {} - {} bytes. unacked_packet_bytes={} ({}), cwnd={}, rwnd={} ({})",
to_be_sent.iter().map(|(tsn, _)| tsn.to_string()).collect::<Vec<_>>().join(","),
sent_bytes,
self.unacked_packet_bytes(),
old_unacked_packet_bytes,
self.cwnd,
self.rwnd(),
old_rwnd
);
}
to_be_sent
}
pub fn can_send_data(&self) -> bool {
self.outstanding_data.has_data_to_be_retransmitted()
}
fn chunk_max_retransmissions(&self, chunk: &DataToSend) -> u16 {
if self.partial_reliability { chunk.max_retransmissions } else { u16::MAX }
}
fn chunk_expires_at(&self, now: SocketTime, chunk: &DataToSend) -> SocketTime {
if self.partial_reliability { chunk.expires_at } else { now + MAX_EXPIRY }
}
pub fn get_chunk_states_for_testing(&self) -> Vec<(Tsn, ChunkState)> {
self.outstanding_data.get_chunk_states_for_testing()
}
pub fn next_tsn(&self) -> Tsn {
self.outstanding_data.next_tsn()
}
pub fn last_assigned_tsn(&self) -> Tsn {
self.outstanding_data.next_tsn() - 1
}
pub fn cwnd(&self) -> usize {
self.cwnd
}
pub fn set_cwnd(&mut self, cwnd: usize) {
self.cwnd = cwnd;
}
pub fn rwnd(&self) -> usize {
self.a_rwnd.saturating_sub(self.outstanding_data.unacked_payload_bytes())
}
pub fn rtx_packets_count(&self) -> usize {
self.rtx_packets_count
}
pub fn rtx_bytes_count(&self) -> u64 {
self.rtx_bytes_count
}
pub fn unacked_packet_bytes(&self) -> usize {
self.outstanding_data.unacked_packet_bytes()
}
pub fn unacked_items(&self) -> usize {
self.outstanding_data.unacked_items()
}
fn max_bytes_to_send(&self) -> usize {
let left = self.cwnd.saturating_sub(self.unacked_packet_bytes());
if self.unacked_items() == 0 {
return left;
}
min(self.rwnd(), left)
}
pub fn should_send_forward_tsn(&mut self, now: SocketTime) -> bool {
if !self.partial_reliability {
return false;
}
self.outstanding_data.expire_outstanding_chunks(now);
self.outstanding_data.should_send_forward_tsn()
}
pub fn create_forward_tsn(&mut self) -> Chunk {
debug_assert!(self.partial_reliability);
if !self.use_message_interleaving {
Chunk::ForwardTsn(self.outstanding_data.create_forward_tsn())
} else {
Chunk::IForwardTsn(self.outstanding_data.create_iforward_tsn())
}
}
pub fn begin_reset_streams(&mut self) {
self.outstanding_data.begin_reset_streams();
}
pub(crate) fn get_handover_readiness(&self) -> HandoverReadiness {
HandoverReadiness::RETRANSMISSION_QUEUE_OUTSTANDING_DATA & !self.outstanding_data.is_empty()
| (HandoverReadiness::RETRANSMISSION_QUEUE_FAST_RECOVERY
& self.fast_recovery_exit_tsn.is_some())
| (HandoverReadiness::RETRANSMISSION_QUEUE_NOT_EMPTY
& self.outstanding_data.has_data_to_be_retransmitted())
}
pub(crate) fn add_to_handover_state(&self, state: &mut SocketHandoverState) {
state.tx.next_tsn = self.next_tsn().0;
state.tx.cwnd = self.cwnd as u32;
state.tx.a_rwnd = self.a_rwnd as u32;
state.tx.ssthresh = self.ssthresh as u32;
state.tx.partial_bytes_acked = self.partial_bytes_acked as u32;
}
pub(crate) fn restore_from_state(&mut self, state: &SocketHandoverState) {
self.outstanding_data.reset_sequence_numbers(Tsn(state.tx.next_tsn.wrapping_sub(1)));
self.cwnd = state.tx.cwnd as usize;
self.a_rwnd = state.tx.a_rwnd as usize;
self.ssthresh = state.tx.ssthresh as usize;
self.partial_bytes_acked = state.tx.partial_bytes_acked as usize;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::api::LifecycleId;
use crate::api::Message;
use crate::api::PpId;
use crate::api::SendOptions;
use crate::events::Events;
use crate::packet::SkippedStream;
use crate::packet::sack_chunk::GapAckBlock;
use crate::testing::event_helpers::expect_buffered_amount_low;
use crate::testing::event_helpers::expect_no_event;
use crate::testing::event_helpers::expect_on_lifecycle_end;
use crate::testing::event_helpers::expect_on_lifecycle_message_delivered;
use crate::testing::event_helpers::expect_on_lifecycle_message_fully_sent;
use crate::testing::event_helpers::expect_on_lifecycle_message_maybe_sent;
use crate::tx::send_queue::SendQueue;
use crate::types::Mid;
use crate::types::Ssn;
use crate::types::StreamKey;
use std::collections::VecDeque;
const A_RWND: u32 = 100000;
const MTU: usize = 1280;
const START_TIME: SocketTime = SocketTime::zero();
fn make_events() -> Rc<RefCell<Events>> {
Rc::new(RefCell::new(Events::new()))
}
fn create_queue(
supports_partial_reliability: bool,
use_message_interleaving: bool,
events: Rc<RefCell<Events>>,
) -> RetransmissionQueue {
RetransmissionQueue::new(
events,
Tsn(10),
A_RWND,
&Options::default(),
supports_partial_reliability,
use_message_interleaving,
)
}
fn get_tsns(chunks: &[(Tsn, Data)]) -> Vec<Tsn> {
chunks.iter().map(|(tsn, _)| *tsn).collect()
}
fn get_sid_tsns(chunks: &[(Tsn, Data)]) -> Vec<(StreamId, Tsn)> {
chunks.iter().map(|(tsn, data)| (data.stream_key.id(), *tsn)).collect()
}
fn add_message(sq: &mut SendQueue, now: SocketTime) {
sq.add(
now,
Message::new(StreamId(1), PpId(53), vec![1, 2, 4, 5, 6]),
&SendOptions::default(),
);
}
fn handle_sack(
rtx: &mut RetransmissionQueue,
now: SocketTime,
cumulative_tsn_ack: Tsn,
) -> HandleSackResult {
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack,
a_rwnd: A_RWND,
gap_ack_blocks: vec![],
duplicate_tsns: vec![],
},
)
}
#[test]
fn initial_acked_prev_tsn() {
let events = Rc::new(RefCell::new(Events::new()));
let rtx = create_queue(false, false, events);
assert_eq!(rtx.get_chunk_states_for_testing(), vec![(Tsn(9), ChunkState::Acked)]);
}
#[test]
fn send_one_chunk() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(false, false, Rc::clone(&events));
add_message(&mut sq, now);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
vec![Tsn(10)]
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
vec![(Tsn(9), ChunkState::Acked), (Tsn(10), ChunkState::InFlight)]
);
}
#[test]
fn send_one_chunk_and_ack() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(false, false, Rc::clone(&events));
add_message(&mut sq, now);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
vec![Tsn(10)]
);
handle_sack(&mut rtx, now, Tsn(10));
assert_eq!(rtx.get_chunk_states_for_testing(), vec![(Tsn(10), ChunkState::Acked)]);
}
#[test]
fn send_three_chunks_and_ack_two() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(false, false, Rc::clone(&events));
add_message(&mut sq, now);
add_message(&mut sq, now);
add_message(&mut sq, now);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
(10..=12).map(Tsn).collect::<Vec<_>>()
);
handle_sack(&mut rtx, now, Tsn(11));
assert_eq!(
rtx.get_chunk_states_for_testing(),
vec![(Tsn(11), ChunkState::Acked), (Tsn(12), ChunkState::InFlight)]
);
}
#[test]
fn ack_with_gap_blocks_from_rfc9260_section334() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(false, false, Rc::clone(&events));
for _ in 0..8 {
add_message(&mut sq, now);
}
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
(10..=17).map(Tsn).collect::<Vec<_>>()
);
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(12),
a_rwnd: A_RWND,
gap_ack_blocks: vec![GapAckBlock::new(2, 3), GapAckBlock::new(5, 5)],
duplicate_tsns: vec![],
},
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
vec![
(Tsn(12), ChunkState::Acked),
(Tsn(13), ChunkState::Nacked),
(Tsn(14), ChunkState::Acked),
(Tsn(15), ChunkState::Acked),
(Tsn(16), ChunkState::Nacked),
(Tsn(17), ChunkState::Acked),
]
);
}
#[test]
fn resend_packets_when_nacked_three_times() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(false, false, Rc::clone(&events));
for _ in 0..8 {
add_message(&mut sq, now);
}
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(10), Tsn(11), Tsn(12), Tsn(13), Tsn(14), Tsn(15), Tsn(16), Tsn(17)]
);
add_message(&mut sq, now);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(18)]
);
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(12),
a_rwnd: A_RWND,
gap_ack_blocks: vec![GapAckBlock::new(2, 3), GapAckBlock::new(5, 6)],
duplicate_tsns: vec![],
},
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
vec![
(Tsn(12), ChunkState::Acked),
(Tsn(13), ChunkState::Nacked),
(Tsn(14), ChunkState::Acked),
(Tsn(15), ChunkState::Acked),
(Tsn(16), ChunkState::Nacked),
(Tsn(17), ChunkState::Acked),
(Tsn(18), ChunkState::Acked),
]
);
add_message(&mut sq, now);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(19)]
);
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(12),
a_rwnd: A_RWND,
gap_ack_blocks: vec![GapAckBlock::new(2, 3), GapAckBlock::new(5, 7)],
duplicate_tsns: vec![],
},
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
vec![
(Tsn(12), ChunkState::Acked),
(Tsn(13), ChunkState::Nacked),
(Tsn(14), ChunkState::Acked),
(Tsn(15), ChunkState::Acked),
(Tsn(16), ChunkState::Nacked),
(Tsn(17), ChunkState::Acked),
(Tsn(18), ChunkState::Acked),
(Tsn(19), ChunkState::Acked),
]
);
add_message(&mut sq, now);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(20)]
);
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(12),
a_rwnd: A_RWND,
gap_ack_blocks: vec![GapAckBlock::new(2, 3), GapAckBlock::new(5, 8)],
duplicate_tsns: vec![],
},
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
vec![
(Tsn(12), ChunkState::Acked),
(Tsn(13), ChunkState::ToBeRetransmitted),
(Tsn(14), ChunkState::Acked),
(Tsn(15), ChunkState::Acked),
(Tsn(16), ChunkState::ToBeRetransmitted),
(Tsn(17), ChunkState::Acked),
(Tsn(18), ChunkState::Acked),
(Tsn(19), ChunkState::Acked),
(Tsn(20), ChunkState::Acked),
]
);
assert_eq!(get_tsns(&rtx.get_chunks_for_fast_retransmit(now, MTU)), vec![Tsn(13), Tsn(16)]);
assert_eq!(
rtx.get_chunk_states_for_testing(),
vec![
(Tsn(12), ChunkState::Acked),
(Tsn(13), ChunkState::InFlight),
(Tsn(14), ChunkState::Acked),
(Tsn(15), ChunkState::Acked),
(Tsn(16), ChunkState::InFlight),
(Tsn(17), ChunkState::Acked),
(Tsn(18), ChunkState::Acked),
(Tsn(19), ChunkState::Acked),
(Tsn(20), ChunkState::Acked),
]
);
}
#[test]
fn restarts_t3_rtx_on_retransmit_first_outstanding_tsn() {
let mut now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(false, false, Rc::clone(&events));
for _ in 0..3 {
add_message(&mut sq, now);
}
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(10), Tsn(11), Tsn(12)]
);
now = now + Duration::from_millis(100);
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(10),
a_rwnd: A_RWND,
gap_ack_blocks: vec![GapAckBlock::new(2, 2)],
duplicate_tsns: vec![],
},
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
vec![
(Tsn(10), ChunkState::Acked),
(Tsn(11), ChunkState::Nacked),
(Tsn(12), ChunkState::Acked),
]
);
add_message(&mut sq, now);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(13)]
);
now = now + Duration::from_millis(100);
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(10),
a_rwnd: A_RWND,
gap_ack_blocks: vec![GapAckBlock::new(2, 3)],
duplicate_tsns: vec![],
},
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
vec![
(Tsn(10), ChunkState::Acked),
(Tsn(11), ChunkState::Nacked),
(Tsn(12), ChunkState::Acked),
(Tsn(13), ChunkState::Acked),
]
);
add_message(&mut sq, now);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(14)]
);
now = now + Duration::from_millis(100);
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(10),
a_rwnd: A_RWND,
gap_ack_blocks: vec![GapAckBlock::new(2, 4)],
duplicate_tsns: vec![],
},
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
vec![
(Tsn(10), ChunkState::Acked),
(Tsn(11), ChunkState::ToBeRetransmitted),
(Tsn(12), ChunkState::Acked),
(Tsn(13), ChunkState::Acked),
(Tsn(14), ChunkState::Acked),
]
);
let prev_timeout = rtx.next_timeout().unwrap();
assert_eq!(get_tsns(&rtx.get_chunks_for_fast_retransmit(now, MTU)), vec![Tsn(11)]);
assert_eq!(
rtx.get_chunk_states_for_testing(),
vec![
(Tsn(10), ChunkState::Acked),
(Tsn(11), ChunkState::InFlight),
(Tsn(12), ChunkState::Acked),
(Tsn(13), ChunkState::Acked),
(Tsn(14), ChunkState::Acked),
]
);
assert!(rtx.next_timeout().unwrap() > prev_timeout);
}
#[test]
fn can_only_produce_two_packets_but_wants_to_send_three() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(false, false, Rc::clone(&events));
for _ in 0..2 {
add_message(&mut sq, now);
}
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(10), Tsn(11)]
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
vec![
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::InFlight),
(Tsn(11), ChunkState::InFlight),
]
);
}
#[test]
fn retransmits_on_t3_expiry() {
let mut now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(false, false, Rc::clone(&events));
add_message(&mut sq, now);
assert!(!rtx.should_send_forward_tsn(now));
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(10)]
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[(Tsn(9), ChunkState::Acked), (Tsn(10), ChunkState::InFlight),]
);
now = rtx.next_timeout().unwrap();
rtx.handle_timeout(now);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[(Tsn(9), ChunkState::Acked), (Tsn(10), ChunkState::ToBeRetransmitted),]
);
assert!(!rtx.should_send_forward_tsn(now));
assert_eq!(
rtx.get_chunk_states_for_testing(),
[(Tsn(9), ChunkState::Acked), (Tsn(10), ChunkState::ToBeRetransmitted),]
);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(10)]
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[(Tsn(9), ChunkState::Acked), (Tsn(10), ChunkState::InFlight),]
);
}
#[test]
fn limited_retransmission_only_with_rfc3758_support() {
let mut now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue( false, false, events);
sq.add(
now,
Message::new(StreamId(1), PpId(53), vec![1, 2, 4, 5, 6]),
&SendOptions { max_retransmissions: Some(0), ..SendOptions::default() },
);
assert!(!rtx.should_send_forward_tsn(now));
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(10)]
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[(Tsn(9), ChunkState::Acked), (Tsn(10), ChunkState::InFlight),]
);
now = rtx.next_timeout().unwrap();
rtx.handle_timeout(now);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[(Tsn(9), ChunkState::Acked), (Tsn(10), ChunkState::ToBeRetransmitted),]
);
assert!(!rtx.should_send_forward_tsn(now));
}
#[test]
fn limits_retransmissions_as_udp() {
let mut now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue( true, false, events);
sq.add(
now,
Message::new(StreamId(1), PpId(53), vec![1, 2, 4, 5, 6]),
&SendOptions { max_retransmissions: Some(0), ..SendOptions::default() },
);
assert!(!rtx.should_send_forward_tsn(now));
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(10)]
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[(Tsn(9), ChunkState::Acked), (Tsn(10), ChunkState::InFlight),]
);
now = rtx.next_timeout().unwrap();
rtx.handle_timeout(now);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[(Tsn(9), ChunkState::Acked), (Tsn(10), ChunkState::Abandoned),]
);
assert!(rtx.should_send_forward_tsn(now));
assert!(rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes)).is_empty());
}
#[test]
fn limits_retransmissions_to_three_sends() {
let mut now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue( true, false, events);
sq.add(
now,
Message::new(StreamId(1), PpId(53), vec![1, 2, 4, 5, 6]),
&SendOptions { max_retransmissions: Some(3), ..SendOptions::default() },
);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(10)]
);
now = rtx.next_timeout().unwrap();
rtx.handle_timeout(now);
assert!(!rtx.should_send_forward_tsn(now));
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(10)]
);
now = rtx.next_timeout().unwrap();
rtx.handle_timeout(now);
assert!(!rtx.should_send_forward_tsn(now));
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(10)]
);
now = rtx.next_timeout().unwrap();
rtx.handle_timeout(now);
assert!(!rtx.should_send_forward_tsn(now));
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(10)]
);
now = rtx.next_timeout().unwrap();
rtx.handle_timeout(now);
assert!(rtx.should_send_forward_tsn(now));
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[]
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[(Tsn(9), ChunkState::Acked), (Tsn(10), ChunkState::Abandoned),]
);
}
#[test]
fn retransmits_when_send_buffer_is_full_t3_expiry() {
let mut now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue( true, false, events);
const CWND: usize = 1200;
rtx.set_cwnd(CWND);
assert_eq!(rtx.cwnd(), CWND);
assert_eq!(rtx.unacked_packet_bytes(), 0);
assert_eq!(rtx.unacked_items(), 0);
sq.add(
now,
Message::new(StreamId(1), PpId(53), vec![0; 1000]),
&SendOptions { max_retransmissions: Some(3), ..SendOptions::default() },
);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, 1500, |bytes, _| sq.produce(now, bytes))),
[Tsn(10)]
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[(Tsn(9), ChunkState::Acked), (Tsn(10), ChunkState::InFlight),]
);
assert_eq!(rtx.unacked_packet_bytes(), 1000 + data_chunk::HEADER_SIZE);
assert_eq!(rtx.unacked_items(), 1);
now = rtx.next_timeout().unwrap();
rtx.handle_timeout(now);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[(Tsn(9), ChunkState::Acked), (Tsn(10), ChunkState::ToBeRetransmitted),]
);
assert_eq!(rtx.unacked_packet_bytes(), 0);
assert_eq!(rtx.unacked_items(), 0);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, 1500, |bytes, _| sq.produce(now, bytes))),
[Tsn(10)]
);
assert_eq!(rtx.unacked_packet_bytes(), 1000 + data_chunk::HEADER_SIZE);
assert_eq!(rtx.unacked_items(), 1);
}
#[test]
fn produces_valid_forward_tsn() {
let mut now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue( true, false, events);
sq.add(
now,
Message::new(StreamId(1), PpId(53), vec![0; 4 * 4]),
&SendOptions { max_retransmissions: Some(0), ..SendOptions::default() },
);
let bytes = 4 + data_chunk::HEADER_SIZE;
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, bytes, |bytes, _| sq.produce(now, bytes))),
[Tsn(10)]
);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, bytes, |bytes, _| sq.produce(now, bytes))),
[Tsn(11)]
);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, bytes, |bytes, _| sq.produce(now, bytes))),
[Tsn(12)]
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::InFlight),
(Tsn(11), ChunkState::InFlight),
(Tsn(12), ChunkState::InFlight),
]
);
handle_sack(&mut rtx, now, Tsn(10));
now = rtx.next_timeout().unwrap();
rtx.handle_timeout(now);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[
(Tsn(10), ChunkState::Acked),
(Tsn(11), ChunkState::Abandoned),
(Tsn(12), ChunkState::Abandoned),
(Tsn(13), ChunkState::Abandoned),
]
);
assert!(rtx.should_send_forward_tsn(now));
let Chunk::ForwardTsn(fwd) = rtx.create_forward_tsn() else {
panic!();
};
assert_eq!(fwd.new_cumulative_tsn, Tsn(13));
assert_eq!(fwd.skipped_streams, vec![SkippedStream::ForwardTsn(StreamId(1), Ssn(0))]);
}
#[test]
fn produces_valid_forward_tsn_when_fully_sent() {
let mut now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue( true, false, events);
sq.add(
now,
Message::new(StreamId(1), PpId(53), vec![0; 3 * 4]),
&SendOptions { max_retransmissions: Some(0), ..SendOptions::default() },
);
let bytes = 4 + data_chunk::HEADER_SIZE;
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, bytes, |bytes, _| sq.produce(now, bytes))),
[Tsn(10)]
);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, bytes, |bytes, _| sq.produce(now, bytes))),
[Tsn(11)]
);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, bytes, |bytes, _| sq.produce(now, bytes))),
[Tsn(12)]
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::InFlight),
(Tsn(11), ChunkState::InFlight),
(Tsn(12), ChunkState::InFlight),
]
);
handle_sack(&mut rtx, now, Tsn(10));
now = rtx.next_timeout().unwrap();
rtx.handle_timeout(now);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[
(Tsn(10), ChunkState::Acked),
(Tsn(11), ChunkState::Abandoned),
(Tsn(12), ChunkState::Abandoned),
]
);
assert!(rtx.should_send_forward_tsn(now));
let Chunk::ForwardTsn(fwd) = rtx.create_forward_tsn() else {
panic!();
};
assert_eq!(fwd.new_cumulative_tsn, Tsn(12));
assert_eq!(fwd.skipped_streams, vec![SkippedStream::ForwardTsn(StreamId(1), Ssn(0))]);
}
#[test]
#[ignore]
fn produces_valid_i_forward_tsn() {
let mut now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let mut sq = SendQueue::new(MTU, &Options::default(), events.clone());
let mut rtx = create_queue(
true, true,
events,
);
let s = SendOptions { max_retransmissions: Some(0), ..SendOptions::default() };
sq.set_priority(StreamId(1), 1);
sq.set_priority(StreamId(2), 1);
sq.set_priority(StreamId(3), 1);
sq.set_priority(StreamId(4), 1);
sq.add(now, Message::new(StreamId(1), PpId(53), vec![0; 2 * MTU]), &s);
sq.add(now, Message::new(StreamId(2), PpId(53), vec![0; 2 * MTU]), &s);
sq.add(now, Message::new(StreamId(3), PpId(53), vec![0; 2 * MTU]), &s);
sq.add(now, Message::new(StreamId(4), PpId(53), vec![0; 2 * MTU]), &s);
let bytes = MTU;
assert_eq!(
get_sid_tsns(&rtx.get_chunks_to_send(now, bytes, |bytes, _| sq.produce(now, bytes))),
[(StreamId(1), Tsn(10))]
);
assert_eq!(
get_sid_tsns(&rtx.get_chunks_to_send(now, bytes, |bytes, _| sq.produce(now, bytes))),
[(StreamId(2), Tsn(11))]
);
assert_eq!(
get_sid_tsns(&rtx.get_chunks_to_send(now, bytes, |bytes, _| sq.produce(now, bytes))),
[(StreamId(3), Tsn(12))]
);
assert_eq!(
get_sid_tsns(&rtx.get_chunks_to_send(now, bytes, |bytes, _| sq.produce(now, bytes))),
[(StreamId(4), Tsn(13))]
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::InFlight),
(Tsn(11), ChunkState::InFlight),
(Tsn(12), ChunkState::InFlight),
(Tsn(13), ChunkState::InFlight),
]
);
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(9),
a_rwnd: A_RWND,
gap_ack_blocks: vec![GapAckBlock::new(4, 4)],
duplicate_tsns: vec![],
},
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::Nacked),
(Tsn(11), ChunkState::Nacked),
(Tsn(12), ChunkState::Nacked),
(Tsn(13), ChunkState::Acked),
]
);
now = rtx.next_timeout().unwrap();
rtx.handle_timeout(now);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::Abandoned),
(Tsn(11), ChunkState::Abandoned),
(Tsn(12), ChunkState::Abandoned),
(Tsn(13), ChunkState::Acked),
(Tsn(14), ChunkState::Abandoned),
(Tsn(15), ChunkState::Abandoned),
(Tsn(16), ChunkState::Abandoned),
]
);
assert!(rtx.should_send_forward_tsn(now));
let Chunk::IForwardTsn(fwd) = rtx.create_forward_tsn() else {
panic!();
};
assert_eq!(fwd.new_cumulative_tsn, Tsn(12));
assert_eq!(
fwd.skipped_streams,
vec![
SkippedStream::IForwardTsn(StreamKey::Ordered(StreamId(1)), Mid(0)),
SkippedStream::IForwardTsn(StreamKey::Ordered(StreamId(2)), Mid(0)),
SkippedStream::IForwardTsn(StreamKey::Ordered(StreamId(3)), Mid(0))
]
);
}
#[test]
fn measure_rtt() {
let mut now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue( true, false, events);
add_message(&mut sq, now);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(10)]
);
now = now + Duration::from_millis(123);
let HandleSackResult::Valid { rtt, .. } = handle_sack(&mut rtx, now, Tsn(10)) else {
panic!()
};
assert_eq!(rtt.unwrap(), Duration::from_millis(123));
}
#[test]
fn validate_cum_tsn_at_rest() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let mut rtx = create_queue( true, false, events);
assert_eq!(handle_sack(&mut rtx, now, Tsn(8)), HandleSackResult::Invalid);
assert_eq!(
handle_sack(&mut rtx, now, Tsn(9)),
HandleSackResult::Valid { rtt: None, reset_error_counter: false }
);
assert_eq!(handle_sack(&mut rtx, now, Tsn(10)), HandleSackResult::Invalid);
}
#[test]
fn validate_cum_tsn_ack_on_inflight_data() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue( true, false, events);
add_message(&mut sq, now);
add_message(&mut sq, now);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
vec![Tsn(10), Tsn(11)]
);
assert_eq!(handle_sack(&mut rtx, now, Tsn(8)), HandleSackResult::Invalid);
assert_eq!(
handle_sack(&mut rtx, now, Tsn(9)),
HandleSackResult::Valid { rtt: None, reset_error_counter: false }
);
assert_eq!(
handle_sack(&mut rtx, now, Tsn(10)),
HandleSackResult::Valid { rtt: Some(Duration::ZERO), reset_error_counter: true }
);
assert_eq!(
handle_sack(&mut rtx, now, Tsn(11)),
HandleSackResult::Valid { rtt: Some(Duration::ZERO), reset_error_counter: true }
);
assert_eq!(handle_sack(&mut rtx, now, Tsn(12)), HandleSackResult::Invalid);
}
#[test]
fn handle_gap_ack_blocks_matching_no_inflight_data() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let mut sq = SendQueue::new(MTU, &Options::default(), events.clone());
let mut rtx = create_queue( true, false, events);
sq.add(now, Message::new(StreamId(1), PpId(53), vec![0; 4 * 8]), &SendOptions::default());
for _ in 0..8 {
rtx.get_chunks_to_send(now, data_chunk::HEADER_SIZE + 4, |bytes, _| {
sq.produce(now, bytes)
});
}
assert_eq!(
rtx.get_chunk_states_for_testing(),
[
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::InFlight),
(Tsn(11), ChunkState::InFlight),
(Tsn(12), ChunkState::InFlight),
(Tsn(13), ChunkState::InFlight),
(Tsn(14), ChunkState::InFlight),
(Tsn(15), ChunkState::InFlight),
(Tsn(16), ChunkState::InFlight),
(Tsn(17), ChunkState::InFlight),
]
);
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(9),
a_rwnd: A_RWND,
gap_ack_blocks: vec![GapAckBlock::new(11, 16)],
duplicate_tsns: vec![],
},
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::InFlight),
(Tsn(11), ChunkState::InFlight),
(Tsn(12), ChunkState::InFlight),
(Tsn(13), ChunkState::InFlight),
(Tsn(14), ChunkState::InFlight),
(Tsn(15), ChunkState::InFlight),
(Tsn(16), ChunkState::InFlight),
(Tsn(17), ChunkState::InFlight),
]
);
}
#[test]
fn handle_invalid_gap_ack_blocks() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let mut rtx = create_queue( true, false, events);
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(9),
a_rwnd: A_RWND,
gap_ack_blocks: vec![GapAckBlock::new(3, 4)],
duplicate_tsns: vec![],
},
);
assert_eq!(rtx.get_chunk_states_for_testing(), [(Tsn(9), ChunkState::Acked)]);
}
#[test]
fn gap_ack_blocks_do_not_move_cum_tsn_ack() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(false, false, events);
for _ in 0..8 {
add_message(&mut sq, now);
}
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
(10..=17).map(Tsn).collect::<Vec<_>>()
);
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(9),
a_rwnd: A_RWND,
gap_ack_blocks: vec![GapAckBlock::new(1, 5)],
duplicate_tsns: vec![],
},
);
assert_eq!(
rtx.get_chunk_states_for_testing(),
vec![
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::Acked),
(Tsn(11), ChunkState::Acked),
(Tsn(12), ChunkState::Acked),
(Tsn(13), ChunkState::Acked),
(Tsn(14), ChunkState::Acked),
(Tsn(15), ChunkState::InFlight),
(Tsn(16), ChunkState::InFlight),
(Tsn(17), ChunkState::InFlight),
]
);
}
#[test]
fn stays_within_available_size() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
const MTU: usize = 1191;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(false, false, events);
const BYTES_REMAINING_STEP_0: usize = 1176;
const FIRST_DATA_PAYLOAD_SIZE: usize = 183;
const BYTES_REMAINING_STEP_1: usize = BYTES_REMAINING_STEP_0
- round_up_to_4!(FIRST_DATA_PAYLOAD_SIZE + data_chunk::HEADER_SIZE);
const SECOND_DATA_PAYLOAD_SIZE: usize = 957;
const BYTES_REMAINING_STEP_2: usize = BYTES_REMAINING_STEP_1
- round_up_to_4!(SECOND_DATA_PAYLOAD_SIZE + data_chunk::HEADER_SIZE);
assert_eq!(BYTES_REMAINING_STEP_2, 0);
sq.add(
now,
Message::new(StreamId(1), PpId(53), vec![0; FIRST_DATA_PAYLOAD_SIZE]),
&SendOptions::default(),
);
sq.add(
now,
Message::new(StreamId(1), PpId(53), vec![0; SECOND_DATA_PAYLOAD_SIZE]),
&SendOptions::default(),
);
let mut expected_sizes = VecDeque::from(vec![
BYTES_REMAINING_STEP_0 - data_chunk::HEADER_SIZE,
BYTES_REMAINING_STEP_1 - data_chunk::HEADER_SIZE,
]);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, BYTES_REMAINING_STEP_0, |bytes, _| {
assert_eq!(bytes, expected_sizes.pop_front().unwrap());
sq.produce(now, bytes)
})),
vec![Tsn(10), Tsn(11)]
);
assert!(expected_sizes.is_empty());
}
#[test]
fn accounts_nacked_abandoned_chunks_as_not_outstanding() {
let mut now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(true, false, events);
sq.add(
now,
Message::new(StreamId(1), PpId(53), vec![0; 16]),
&SendOptions { max_retransmissions: Some(0), ..SendOptions::default() },
);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, data_chunk::HEADER_SIZE + 4, |bytes, _| {
sq.produce(now, bytes)
})),
vec![Tsn(10)]
);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, data_chunk::HEADER_SIZE + 4, |bytes, _| {
sq.produce(now, bytes)
})),
vec![Tsn(11)]
);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, data_chunk::HEADER_SIZE + 4, |bytes, _| {
sq.produce(now, bytes)
})),
vec![Tsn(12)]
);
assert_eq!(sq.total_buffered_amount(), 4);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::InFlight),
(Tsn(11), ChunkState::InFlight),
(Tsn(12), ChunkState::InFlight),
]
);
assert_eq!(rtx.unacked_packet_bytes(), (data_chunk::HEADER_SIZE + 4) * 3);
assert_eq!(rtx.unacked_items(), 3);
now = rtx.next_timeout().unwrap();
rtx.handle_timeout(now);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::Abandoned),
(Tsn(11), ChunkState::Abandoned),
(Tsn(12), ChunkState::Abandoned),
(Tsn(13), ChunkState::Abandoned),
]
);
assert_eq!(rtx.unacked_packet_bytes(), 0);
assert_eq!(rtx.unacked_items(), 0);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, discard| {
for (stream_id, message_id) in discard {
sq.discard(*stream_id, *message_id);
}
sq.produce(now, bytes)
})),
vec![]
);
assert_eq!(sq.total_buffered_amount(), 0);
handle_sack(&mut rtx, now, Tsn(10));
assert_eq!(rtx.unacked_packet_bytes(), 0);
assert_eq!(rtx.unacked_items(), 0);
handle_sack(&mut rtx, now, Tsn(11));
assert_eq!(rtx.unacked_packet_bytes(), 0);
assert_eq!(rtx.unacked_items(), 0);
handle_sack(&mut rtx, now, Tsn(12));
assert_eq!(rtx.unacked_packet_bytes(), 0);
assert_eq!(rtx.unacked_items(), 0);
handle_sack(&mut rtx, now, Tsn(13));
assert_eq!(rtx.unacked_packet_bytes(), 0);
assert_eq!(rtx.unacked_items(), 0);
}
#[test]
fn expire_from_send_queue_when_partially_sent() {
let mut now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(true, false, events);
sq.add(
now,
Message::new(StreamId(1), PpId(53), vec![0; 16]),
&SendOptions { max_retransmissions: Some(0), ..SendOptions::default() },
);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, data_chunk::HEADER_SIZE + 4, |bytes, _| {
sq.produce(now, bytes)
})),
vec![Tsn(10)]
);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, data_chunk::HEADER_SIZE + 4, |bytes, _| {
sq.produce(now, bytes)
})),
vec![Tsn(11)]
);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, data_chunk::HEADER_SIZE + 4, |bytes, _| {
sq.produce(now, bytes)
})),
vec![Tsn(12)]
);
assert_eq!(sq.total_buffered_amount(), 4);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::InFlight),
(Tsn(11), ChunkState::InFlight),
(Tsn(12), ChunkState::InFlight),
]
);
now = rtx.next_timeout().unwrap();
rtx.handle_timeout(now);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::Abandoned),
(Tsn(11), ChunkState::Abandoned),
(Tsn(12), ChunkState::Abandoned),
(Tsn(13), ChunkState::Abandoned),
]
);
}
#[test]
fn expire_correct_message_from_send_queue() {
let mut now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(true, true, events);
const MAX_SIZE_IN_FRAGMENT: usize = round_down_to_4!(MTU - idata_chunk::HEADER_SIZE);
sq.enable_message_interleaving(true);
sq.set_priority(StreamId(1), 1);
sq.set_priority(StreamId(2), 1);
sq.add(
now,
Message::new(StreamId(1), PpId(53), vec![0; MAX_SIZE_IN_FRAGMENT * 2]),
&SendOptions { max_retransmissions: Some(0), ..SendOptions::default() },
);
sq.add(
now,
Message::new(StreamId(2), PpId(54), vec![0; MAX_SIZE_IN_FRAGMENT * 2]),
&SendOptions::default(),
);
let (_, chunk) =
rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes)).pop().unwrap();
assert_eq!(chunk.stream_key, StreamKey::Ordered(StreamId(1)));
assert!(!chunk.is_end);
let (_, chunk) =
rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes)).pop().unwrap();
assert_eq!(chunk.stream_key, StreamKey::Ordered(StreamId(2)));
assert!(!chunk.is_end);
assert_eq!(sq.total_buffered_amount(), MAX_SIZE_IN_FRAGMENT * 2);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::InFlight),
(Tsn(11), ChunkState::InFlight),
]
);
now = rtx.next_timeout().unwrap();
rtx.handle_timeout(now);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::Abandoned),
(Tsn(11), ChunkState::ToBeRetransmitted),
(Tsn(12), ChunkState::Abandoned),
]
);
let mut discarded_stream_ids = Vec::<StreamId>::new();
rtx.get_chunks_to_send(now, MTU, |bytes, discard| {
for (stream_id, message_id) in discard {
discarded_stream_ids.push(*stream_id);
sq.discard(*stream_id, *message_id);
}
sq.produce(now, bytes)
});
assert_eq!(discarded_stream_ids, vec![StreamId(1)]);
}
#[test]
fn inserts_placeholder_for_every_discarded_stream() {
let mut now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(true, true, events);
const MAX_SIZE_IN_FRAGMENT: usize = round_down_to_4!(MTU - idata_chunk::HEADER_SIZE);
sq.enable_message_interleaving(true);
sq.set_priority(StreamId(1), 1);
sq.set_priority(StreamId(2), 1);
sq.add(
now,
Message::new(StreamId(1), PpId(53), vec![0; MAX_SIZE_IN_FRAGMENT * 2]),
&SendOptions { max_retransmissions: Some(0), ..SendOptions::default() },
);
sq.add(
now,
Message::new(StreamId(2), PpId(54), vec![0; MAX_SIZE_IN_FRAGMENT * 2]),
&SendOptions { max_retransmissions: Some(0), ..SendOptions::default() },
);
let (_, chunk) =
rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes)).pop().unwrap();
assert_eq!(chunk.stream_key, StreamKey::Ordered(StreamId(1)));
assert!(!chunk.is_end);
let (_, chunk) =
rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes)).pop().unwrap();
assert_eq!(chunk.stream_key, StreamKey::Ordered(StreamId(2)));
assert!(!chunk.is_end);
assert_eq!(sq.total_buffered_amount(), MAX_SIZE_IN_FRAGMENT * 2);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::InFlight),
(Tsn(11), ChunkState::InFlight),
]
);
now = rtx.next_timeout().unwrap();
rtx.handle_timeout(now);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::Abandoned),
(Tsn(11), ChunkState::Abandoned),
(Tsn(12), ChunkState::Abandoned),
(Tsn(13), ChunkState::Abandoned),
]
);
let mut discarded_stream_ids = Vec::<StreamId>::new();
rtx.get_chunks_to_send(now, MTU, |bytes, discard| {
for (stream_id, message_id) in discard {
discarded_stream_ids.push(*stream_id);
sq.discard(*stream_id, *message_id);
}
sq.produce(now, bytes)
});
discarded_stream_ids.sort();
assert_eq!(discarded_stream_ids, vec![StreamId(1), StreamId(2)]);
}
#[test]
fn limits_retransmissions_only_when_nacked_three_times() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(true, false, Rc::clone(&events));
for _ in 0..8 {
sq.add(
now,
Message::new(StreamId(1), PpId(53), vec![0; 4]),
&SendOptions { max_retransmissions: Some(0), ..SendOptions::default() },
);
}
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(10), Tsn(11), Tsn(12), Tsn(13), Tsn(14), Tsn(15), Tsn(16), Tsn(17)]
);
add_message(&mut sq, now);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(18)]
);
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(12),
a_rwnd: A_RWND,
gap_ack_blocks: vec![GapAckBlock::new(2, 3), GapAckBlock::new(5, 6)],
duplicate_tsns: vec![],
},
);
assert!(!rtx.should_send_forward_tsn(now));
add_message(&mut sq, now);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(19)]
);
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(12),
a_rwnd: A_RWND,
gap_ack_blocks: vec![GapAckBlock::new(2, 3), GapAckBlock::new(5, 7)],
duplicate_tsns: vec![],
},
);
assert!(!rtx.should_send_forward_tsn(now));
add_message(&mut sq, now);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(20)]
);
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(12),
a_rwnd: A_RWND,
gap_ack_blocks: vec![GapAckBlock::new(2, 3), GapAckBlock::new(5, 8)],
duplicate_tsns: vec![],
},
);
assert!(rtx.should_send_forward_tsn(now));
}
#[test]
fn abandons_rtx_limit2_when_nacked_nine_times() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(true, false, Rc::clone(&events));
for _ in 0..10 {
sq.add(
now,
Message::new(StreamId(1), PpId(53), vec![0; 4]),
&SendOptions { max_retransmissions: Some(2), ..SendOptions::default() },
);
}
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
(10..=19).map(Tsn).collect::<Vec<_>>()
);
for x in 11..=18 {
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(9),
a_rwnd: A_RWND,
gap_ack_blocks: vec![GapAckBlock::new(2, x - 9)],
duplicate_tsns: vec![],
},
);
assert!(!rtx.should_send_forward_tsn(now));
if rtx.has_data_to_be_fast_retransmitted() {
rtx.get_chunks_for_fast_retransmit(now, MTU);
} else {
rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes));
}
}
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(9),
a_rwnd: A_RWND,
gap_ack_blocks: vec![GapAckBlock::new(2, 10)],
duplicate_tsns: vec![],
},
);
assert!(rtx.should_send_forward_tsn(now));
assert_eq!(
rtx.get_chunk_states_for_testing(),
[
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::Abandoned),
(Tsn(11), ChunkState::Acked),
(Tsn(12), ChunkState::Acked),
(Tsn(13), ChunkState::Acked),
(Tsn(14), ChunkState::Acked),
(Tsn(15), ChunkState::Acked),
(Tsn(16), ChunkState::Acked),
(Tsn(17), ChunkState::Acked),
(Tsn(18), ChunkState::Acked),
(Tsn(19), ChunkState::Acked),
]
);
}
#[test]
fn cwnd_recovers_when_acking() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue( true, false, events);
const CWND: usize = 1200;
rtx.set_cwnd(CWND);
sq.add(
now,
Message::new(StreamId(1), PpId(53), vec![0; 1000]),
&SendOptions { max_retransmissions: Some(3), ..SendOptions::default() },
);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, 1500, |bytes, _| sq.produce(now, bytes))),
[Tsn(10)]
);
assert_eq!(rtx.unacked_packet_bytes(), 1000 + data_chunk::HEADER_SIZE);
handle_sack(&mut rtx, now, Tsn(10));
assert_eq!(rtx.cwnd(), CWND + 1000 + data_chunk::HEADER_SIZE);
}
#[test]
fn ready_for_handover_when_has_no_outstanding_data() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue( true, false, events);
assert!(rtx.get_handover_readiness().is_ready());
add_message(&mut sq, now);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, 1500, |bytes, _| sq.produce(now, bytes))),
[Tsn(10)]
);
assert!(
rtx.get_handover_readiness()
.contains(HandoverReadiness::RETRANSMISSION_QUEUE_OUTSTANDING_DATA)
);
handle_sack(&mut rtx, now, Tsn(10));
assert!(rtx.get_handover_readiness().is_ready());
}
#[test]
fn ready_for_handover_when_nothing_to_retransmit() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(false, false, Rc::clone(&events));
for _ in 0..8 {
add_message(&mut sq, now);
}
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(10), Tsn(11), Tsn(12), Tsn(13), Tsn(14), Tsn(15), Tsn(16), Tsn(17)]
);
assert_eq!(
rtx.get_handover_readiness(),
HandoverReadiness::RETRANSMISSION_QUEUE_OUTSTANDING_DATA
);
add_message(&mut sq, now);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(18)]
);
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(12),
a_rwnd: A_RWND,
gap_ack_blocks: vec![GapAckBlock::new(2, 3), GapAckBlock::new(5, 6)],
duplicate_tsns: vec![],
},
);
add_message(&mut sq, now);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(19)]
);
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(12),
a_rwnd: A_RWND,
gap_ack_blocks: vec![GapAckBlock::new(2, 3), GapAckBlock::new(5, 7)],
duplicate_tsns: vec![],
},
);
add_message(&mut sq, now);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(20)]
);
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(12),
a_rwnd: A_RWND,
gap_ack_blocks: vec![GapAckBlock::new(2, 3), GapAckBlock::new(5, 8)],
duplicate_tsns: vec![],
},
);
assert_eq!(
rtx.get_handover_readiness(),
HandoverReadiness::RETRANSMISSION_QUEUE_OUTSTANDING_DATA
| HandoverReadiness::RETRANSMISSION_QUEUE_NOT_EMPTY
| HandoverReadiness::RETRANSMISSION_QUEUE_FAST_RECOVERY
);
assert_eq!(get_tsns(&rtx.get_chunks_for_fast_retransmit(now, MTU)), vec![Tsn(13), Tsn(16)]);
assert_eq!(
rtx.get_handover_readiness(),
HandoverReadiness::RETRANSMISSION_QUEUE_OUTSTANDING_DATA
| HandoverReadiness::RETRANSMISSION_QUEUE_FAST_RECOVERY
);
handle_sack(&mut rtx, now, Tsn(20));
assert!(rtx.get_handover_readiness().is_ready());
}
fn handover_queue(
rtx: RetransmissionQueue,
events: Rc<RefCell<Events>>,
) -> RetransmissionQueue {
assert!(rtx.get_handover_readiness().is_ready());
let mut state = SocketHandoverState::default();
rtx.add_to_handover_state(&mut state);
let mut rtx = create_queue(false, false, events);
rtx.restore_from_state(&state);
rtx
}
#[test]
fn handover_test() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(false, false, Rc::clone(&events));
add_message(&mut sq, now);
add_message(&mut sq, now);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(10), Tsn(11)]
);
handle_sack(&mut rtx, now, Tsn(11));
let mut rtx = handover_queue(rtx, Rc::clone(&events));
add_message(&mut sq, now);
add_message(&mut sq, now);
add_message(&mut sq, now);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(12), Tsn(13), Tsn(14)]
);
handle_sack(&mut rtx, now, Tsn(13));
assert_eq!(
rtx.get_chunk_states_for_testing(),
[(Tsn(13), ChunkState::Acked), (Tsn(14), ChunkState::InFlight),]
);
}
#[test]
fn can_always_retransmit_one_packet() {
let mut now = START_TIME;
let options = Options { mtu: MTU, ..Default::default() };
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &options, events_clone);
let mut rtx = RetransmissionQueue::new(events, Tsn(10), A_RWND, &options, false, false);
const MAX_SIZE_IN_FRAGMENT: usize = round_down_to_4!(MTU - data_chunk::HEADER_SIZE);
for tsn in 10..=14 {
sq.add(
now,
Message::new(StreamId(1), PpId(53), vec![0; MAX_SIZE_IN_FRAGMENT]),
&SendOptions::default(),
);
assert_eq!(
get_tsns(&rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes))),
[Tsn(tsn)]
);
}
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(9),
a_rwnd: 0,
gap_ack_blocks: vec![GapAckBlock::new(3, 3)],
duplicate_tsns: vec![],
},
);
now = rtx.next_timeout().unwrap();
rtx.handle_timeout(now);
assert_eq!(
rtx.get_chunk_states_for_testing(),
[
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::ToBeRetransmitted),
(Tsn(11), ChunkState::ToBeRetransmitted),
(Tsn(12), ChunkState::Acked),
(Tsn(13), ChunkState::ToBeRetransmitted),
(Tsn(14), ChunkState::ToBeRetransmitted),
]
);
let c = rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes));
assert_eq!(get_tsns(&c), [Tsn(10)]);
assert!(rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes)).is_empty());
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(9),
a_rwnd: 0,
gap_ack_blocks: vec![GapAckBlock::new(3, 3)],
duplicate_tsns: vec![],
},
);
assert!(rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes)).is_empty());
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(9),
a_rwnd: 0,
gap_ack_blocks: vec![GapAckBlock::new(3, 3)],
duplicate_tsns: vec![],
},
);
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(10),
a_rwnd: 0,
gap_ack_blocks: vec![GapAckBlock::new(2, 2)],
duplicate_tsns: vec![],
},
);
let c = rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes));
assert_eq!(get_tsns(&c), [Tsn(11)]);
assert!(rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes)).is_empty());
rtx.handle_sack(
now,
&SackChunk {
cumulative_tsn_ack: Tsn(12),
a_rwnd: (5 * MTU) as u32,
gap_ack_blocks: vec![],
duplicate_tsns: vec![],
},
);
let c = rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes));
assert_eq!(get_tsns(&c), [Tsn(13)]);
let c = rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes));
assert_eq!(get_tsns(&c), [Tsn(14)]);
assert!(rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes)).is_empty());
}
#[test]
fn can_always_send_one_more_packet() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = RetransmissionQueue::new(
events,
Tsn(10),
0, &Options::default(),
false,
false,
);
sq.add(now, Message::new(StreamId(1), PpId(53), vec![1, 2, 3]), &SendOptions::default());
let chunks = rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes));
assert_eq!(get_tsns(&chunks), vec![Tsn(10)]);
}
#[test]
fn adds_lifecycle_fully_sent_event() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(false, false, Rc::clone(&events));
sq.add(
now,
Message::new(StreamId(1), PpId(53), vec![0; 100]),
&SendOptions { lifecycle_id: LifecycleId::new(1), ..SendOptions::default() },
);
rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes));
expect_buffered_amount_low!(events.borrow_mut().next_event());
let event = events.borrow_mut().next_event();
assert_eq!(expect_on_lifecycle_message_fully_sent!(event), LifecycleId::from(1));
expect_no_event!(events.borrow_mut().next_event());
}
#[test]
fn adds_lifecycle_delivered_event() {
let now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(false, false, Rc::clone(&events));
sq.add(
now,
Message::new(StreamId(1), PpId(53), vec![0; 100]),
&SendOptions { lifecycle_id: LifecycleId::new(1), ..SendOptions::default() },
);
rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes));
while events.borrow_mut().next_event().is_some() {}
handle_sack(&mut rtx, now, Tsn(10));
assert_eq!(
expect_on_lifecycle_message_delivered!(events.borrow_mut().next_event()),
LifecycleId::from(1)
);
assert_eq!(
expect_on_lifecycle_end!(events.borrow_mut().next_event()),
LifecycleId::from(1)
);
expect_no_event!(events.borrow_mut().next_event());
}
#[test]
fn adds_lifecycle_expired_event() {
let mut now = START_TIME;
let events = Rc::new(RefCell::new(Events::new()));
let events_clone = Rc::clone(&events) as Rc<RefCell<dyn EventSink>>;
let mut sq = SendQueue::new(MTU, &Options::default(), events_clone);
let mut rtx = create_queue(true, false, Rc::clone(&events));
sq.add(
now,
Message::new(StreamId(1), PpId(53), vec![0; 100]),
&SendOptions {
lifecycle_id: LifecycleId::new(1),
max_retransmissions: Some(0),
..SendOptions::default()
},
);
rtx.get_chunks_to_send(now, MTU, |bytes, _| sq.produce(now, bytes));
while events.borrow_mut().next_event().is_some() {}
now = rtx.next_timeout().unwrap();
rtx.handle_timeout(now);
assert!(rtx.should_send_forward_tsn(now));
let Chunk::ForwardTsn(fwd) = rtx.create_forward_tsn() else {
panic!("Expected Forward TSN");
};
handle_sack(&mut rtx, now, fwd.new_cumulative_tsn);
assert_eq!(
expect_on_lifecycle_message_maybe_sent!(events.borrow_mut().next_event()),
LifecycleId::from(1)
);
assert_eq!(
expect_on_lifecycle_end!(events.borrow_mut().next_event()),
LifecycleId::from(1)
);
expect_no_event!(events.borrow_mut().next_event());
}
}