use crate::api::LifecycleId;
use crate::api::SocketTime;
use crate::api::StreamId;
use crate::math::round_up_to_4;
use crate::packet::SkippedStream;
use crate::packet::data::Data;
use crate::packet::forward_tsn_chunk::ForwardTsnChunk;
use crate::packet::iforward_tsn_chunk::IForwardTsnChunk;
use crate::packet::sack_chunk::GapAckBlock;
use crate::types::Mid;
use crate::types::OutgoingMessageId;
use crate::types::Ssn;
use crate::types::StreamKey;
use crate::types::Tsn;
use std::cmp::max;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::VecDeque;
use std::time::Duration;
#[derive(Debug, PartialEq)]
enum ItemState {
InFlight { time_sent: SocketTime },
ReportedMissing { time_sent: SocketTime, nack_count: u8 },
QueuedForRetransmission { time_sent: SocketTime },
Acked,
Abandoned,
AbandonedAndAcked,
}
impl ItemState {
fn time_sent(&self) -> Option<SocketTime> {
match self {
Self::InFlight { time_sent }
| Self::ReportedMissing { time_sent, .. }
| Self::QueuedForRetransmission { time_sent } => Some(*time_sent),
_ => None,
}
}
}
#[derive(Debug, PartialEq)]
enum NackAction {
Nothing,
Retransmit,
Abandon,
}
#[derive(Debug)]
pub(crate) struct AckInfo {
pub bytes_acked: usize,
pub has_packet_loss: bool,
pub highest_tsn_acked: Tsn,
pub acked_lifecycle_ids: Vec<LifecycleId>,
pub abandoned_lifecycle_ids: Vec<LifecycleId>,
}
#[derive(Debug, PartialEq)]
pub(crate) enum ChunkState {
InFlight,
Nacked,
ToBeRetransmitted,
Acked,
Abandoned,
}
const NUMBER_OF_NACKS_FOR_RETRANSMISSION: u8 = 3;
#[derive(Debug)]
struct Item {
message_id: OutgoingMessageId,
max_retransmissions: u16,
expires_at: SocketTime,
lifecycle_id: Option<LifecycleId>,
data: Data,
state: ItemState,
num_retransmissions: u16,
}
impl Item {
pub fn is_outstanding(&self) -> bool {
matches!(self.state, ItemState::InFlight { .. } | ItemState::ReportedMissing { .. })
}
pub fn is_acked(&self) -> bool {
matches!(self.state, ItemState::Acked | ItemState::AbandonedAndAcked)
}
pub fn is_nacked(&self) -> bool {
matches!(
self.state,
ItemState::ReportedMissing { .. } | ItemState::QueuedForRetransmission { .. }
)
}
pub fn is_abandoned(&self) -> bool {
matches!(self.state, ItemState::Abandoned | ItemState::AbandonedAndAcked)
}
pub fn should_be_retransmitted(&self) -> bool {
matches!(self.state, ItemState::QueuedForRetransmission { .. })
}
pub fn has_been_retransmitted(&self) -> bool {
self.num_retransmissions > 0
}
pub fn has_expired(&self, now: SocketTime) -> bool {
self.expires_at <= now
}
pub fn ack(&mut self) {
match self.state {
ItemState::InFlight { .. }
| ItemState::ReportedMissing { .. }
| ItemState::QueuedForRetransmission { .. } => {
self.state = ItemState::Acked;
}
ItemState::Abandoned => {
self.state = ItemState::AbandonedAndAcked;
}
ItemState::Acked | ItemState::AbandonedAndAcked => {}
}
}
pub fn nack(&mut self, retransmit_now: bool) -> NackAction {
let (time_sent, nack_count) = match self.state {
ItemState::InFlight { time_sent } => (time_sent, 0),
ItemState::ReportedMissing { time_sent, nack_count } => (time_sent, nack_count),
_ => return NackAction::Nothing,
};
let new_nack_count = nack_count.saturating_add(1);
if retransmit_now || new_nack_count >= NUMBER_OF_NACKS_FOR_RETRANSMISSION {
if self.num_retransmissions < self.max_retransmissions {
self.state = ItemState::QueuedForRetransmission { time_sent };
NackAction::Retransmit
} else {
self.state = ItemState::Abandoned;
NackAction::Abandon
}
} else {
self.state = ItemState::ReportedMissing { time_sent, nack_count: new_nack_count };
NackAction::Nothing
}
}
pub fn mark_as_retransmitted(&mut self, now: SocketTime) {
self.state = ItemState::InFlight { time_sent: now };
self.num_retransmissions = self.num_retransmissions.saturating_add(1);
}
pub fn abandon(&mut self) {
match self.state {
ItemState::InFlight { .. }
| ItemState::ReportedMissing { .. }
| ItemState::QueuedForRetransmission { .. } => {
self.state = ItemState::Abandoned;
}
ItemState::Acked => {
self.state = ItemState::AbandonedAndAcked;
}
ItemState::Abandoned | ItemState::AbandonedAndAcked => {}
}
}
pub fn get_rtt_from(&self, now: SocketTime) -> Option<Duration> {
if self.has_been_retransmitted() {
return None;
}
self.state.time_sent().map(|time_sent| now - time_sent)
}
}
#[derive(Debug)]
pub(crate) struct OutstandingData {
data_chunk_header_size: usize,
last_cumulative_tsn_ack: Tsn,
outstanding_data: VecDeque<Item>,
unacked_bytes: usize,
unacked_items: usize,
to_be_fast_retransmitted: BTreeSet<Tsn>,
to_be_retransmitted: BTreeSet<Tsn>,
stream_reset_breakpoint_tsns: BTreeSet<Tsn>,
unsent_messages_to_discard: Vec<(StreamId, OutgoingMessageId)>,
}
impl OutstandingData {
pub fn new(data_chunk_header_size: usize, last_cumulative_tsn_ack: Tsn) -> Self {
OutstandingData {
data_chunk_header_size,
last_cumulative_tsn_ack,
outstanding_data: VecDeque::new(),
unacked_bytes: 0,
unacked_items: 0,
to_be_fast_retransmitted: BTreeSet::new(),
to_be_retransmitted: BTreeSet::new(),
stream_reset_breakpoint_tsns: BTreeSet::new(),
unsent_messages_to_discard: Vec::new(),
}
}
pub fn handle_sack(
&mut self,
cumulative_tsn_ack: Tsn,
gap_ack_blocks: &[GapAckBlock],
is_in_fast_recovery: bool,
) -> AckInfo {
let cumulative_tsn_ack_advanced = cumulative_tsn_ack > self.last_cumulative_tsn_ack;
let mut ack_info = AckInfo {
highest_tsn_acked: cumulative_tsn_ack,
bytes_acked: 0,
has_packet_loss: false,
acked_lifecycle_ids: vec![],
abandoned_lifecycle_ids: vec![],
};
self.remove_acked(cumulative_tsn_ack, &mut ack_info);
self.ack_gap_blocks(cumulative_tsn_ack, gap_ack_blocks, &mut ack_info);
self.nack_between_ack_blocks(
cumulative_tsn_ack,
gap_ack_blocks,
is_in_fast_recovery,
cumulative_tsn_ack_advanced,
&mut ack_info,
);
ack_info
}
fn remove_acked(&mut self, cumulative_tsn_ack: Tsn, ack_info: &mut AckInfo) {
while !self.outstanding_data.is_empty() && self.last_cumulative_tsn_ack < cumulative_tsn_ack
{
let tsn = self.last_cumulative_tsn_ack + 1;
self.ack_chunk(tsn, ack_info);
let index = tsn.distance_to(self.last_cumulative_tsn_ack) - 1;
let item = self.outstanding_data.get_mut(index as usize).unwrap();
if let Some(lifecycle_id) = &item.lifecycle_id {
debug_assert!(item.data.is_end);
if item.is_abandoned() {
ack_info.abandoned_lifecycle_ids.push(lifecycle_id.clone());
} else {
ack_info.acked_lifecycle_ids.push(lifecycle_id.clone());
}
}
self.outstanding_data.pop_front();
self.last_cumulative_tsn_ack += 1;
}
self.stream_reset_breakpoint_tsns.retain(|b| *b > cumulative_tsn_ack + 1);
}
fn ack_gap_blocks(
&mut self,
cumulative_tsn_ack: Tsn,
gap_ack_blocks: &[GapAckBlock],
ack_info: &mut AckInfo,
) {
for block in gap_ack_blocks {
let start = cumulative_tsn_ack.add_to(block.start as u32);
let end = cumulative_tsn_ack.add_to(block.end as u32);
let start = start.max(self.last_cumulative_tsn_ack + 1);
let mut tsn = start;
while tsn <= end && tsn < self.next_tsn() {
self.ack_chunk(tsn, ack_info);
tsn += 1;
}
}
}
fn nack_between_ack_blocks(
&mut self,
cumulative_tsn_ack: Tsn,
gap_ack_blocks: &[GapAckBlock],
is_in_fast_recovery: bool,
cumulative_tsn_ack_advanced: bool,
ack_info: &mut AckInfo,
) {
let mut max_tsn_to_nack = ack_info.highest_tsn_acked;
if is_in_fast_recovery && cumulative_tsn_ack_advanced {
max_tsn_to_nack =
cumulative_tsn_ack.add_to(gap_ack_blocks.last().map(|b| b.end as u32).unwrap_or(0));
}
let mut prev_block_last_acked = cumulative_tsn_ack;
for block in gap_ack_blocks {
let cur_block_first_acked = cumulative_tsn_ack.add_to(block.start as u32);
let mut tsn = prev_block_last_acked.max(self.last_cumulative_tsn_ack) + 1;
let limit = cur_block_first_acked.min(self.next_tsn());
while tsn < limit && tsn <= max_tsn_to_nack {
ack_info.has_packet_loss |= self.nack_chunk(tsn, false, !is_in_fast_recovery);
tsn += 1;
}
prev_block_last_acked = cumulative_tsn_ack.add_to(block.end as u32);
}
}
fn nack_chunk(&mut self, tsn: Tsn, retransmit_now: bool, do_fast_retransmit: bool) -> bool {
let index = tsn.distance_to(self.last_cumulative_tsn_ack) - 1;
let item = self.outstanding_data.get_mut(index as usize).unwrap();
let was_outstanding = item.is_outstanding();
let action = item.nack(retransmit_now);
if was_outstanding && !item.is_outstanding() {
self.unacked_bytes -=
round_up_to_4!(self.data_chunk_header_size + item.data.payload.len());
self.unacked_items -= 1;
}
match action {
NackAction::Nothing => false,
NackAction::Retransmit => {
debug_assert!(matches!(item.state, ItemState::QueuedForRetransmission { .. }));
if do_fast_retransmit {
self.to_be_fast_retransmitted.insert(tsn);
} else {
self.to_be_retransmitted.insert(tsn);
}
true
}
NackAction::Abandon => {
self.abandon_all_for(tsn);
true
}
}
}
fn ack_chunk(&mut self, tsn: Tsn, ack_info: &mut AckInfo) {
let index = tsn.distance_to(self.last_cumulative_tsn_ack) - 1;
let item = self.outstanding_data.get_mut(index as usize).unwrap();
if !item.is_acked() {
let serialized_size =
round_up_to_4!(self.data_chunk_header_size + item.data.payload.len());
ack_info.bytes_acked += serialized_size;
if item.is_outstanding() {
self.unacked_bytes -= serialized_size;
self.unacked_items -= 1;
}
if item.should_be_retransmitted() {
self.to_be_retransmitted.remove(&tsn);
self.to_be_fast_retransmitted.remove(&tsn);
}
item.ack();
ack_info.highest_tsn_acked = max(ack_info.highest_tsn_acked, tsn);
}
}
pub fn has_unsent_messages_to_discard(&self) -> bool {
!self.unsent_messages_to_discard.is_empty()
}
pub fn get_unsent_messages_to_discard(&mut self) -> Vec<(StreamId, OutgoingMessageId)> {
std::mem::take(&mut self.unsent_messages_to_discard)
}
fn extract_chunks_that_can_fit(
&mut self,
now: SocketTime,
mut max_size: usize,
tsns: &mut BTreeSet<Tsn>,
) -> Vec<(Tsn, Data)> {
let mut result: Vec<(Tsn, Data)> = vec![];
for tsn in tsns.iter() {
let index = tsn.distance_to(self.last_cumulative_tsn_ack) - 1;
let item = self.outstanding_data.get_mut(index as usize).unwrap();
debug_assert!(item.should_be_retransmitted());
debug_assert!(!item.is_outstanding());
debug_assert!(!item.is_abandoned());
debug_assert!(!item.is_acked());
let size = round_up_to_4!(self.data_chunk_header_size + item.data.payload.len());
if size <= max_size {
item.mark_as_retransmitted(now);
result.push((*tsn, item.data.clone()));
max_size -= size;
self.unacked_bytes += size;
self.unacked_items += 1;
}
if max_size <= self.data_chunk_header_size {
break;
}
}
for (tsn, _) in &result {
tsns.remove(tsn);
}
result
}
pub fn get_chunks_to_be_fast_retransmitted(
&mut self,
now: SocketTime,
max_size: usize,
) -> Vec<(Tsn, Data)> {
let mut tsns = std::mem::take(&mut self.to_be_fast_retransmitted);
let chunks = self.extract_chunks_that_can_fit(now, max_size, &mut tsns);
self.to_be_retransmitted.append(&mut tsns);
chunks
}
pub fn get_chunks_to_be_retransmitted(
&mut self,
now: SocketTime,
max_size: usize,
) -> Vec<(Tsn, Data)> {
let mut tsns = std::mem::take(&mut self.to_be_retransmitted);
let chunks = self.extract_chunks_that_can_fit(now, max_size, &mut tsns);
std::mem::swap(&mut self.to_be_retransmitted, &mut tsns);
chunks
}
pub fn unacked_bytes(&self) -> usize {
self.unacked_bytes
}
pub fn unacked_items(&self) -> usize {
self.unacked_items
}
pub fn expire_outstanding_chunks(&mut self, now: SocketTime) {
let mut tsns_to_expire: Vec<Tsn> = Vec::new();
let mut tsn = self.last_cumulative_tsn_ack;
for item in &mut self.outstanding_data {
tsn += 1;
if item.is_abandoned() {
} else if item.is_nacked() && item.has_expired(now) {
log::debug!(
"Marking nacked chunk {} and message {} as expired",
tsn,
item.data.mid
);
tsns_to_expire.push(tsn);
} else {
break;
}
}
for tsn in tsns_to_expire {
self.abandon_all_for(tsn);
}
}
pub fn is_empty(&self) -> bool {
self.outstanding_data.is_empty()
}
pub fn has_data_to_be_fast_retransmitted(&self) -> bool {
!self.to_be_fast_retransmitted.is_empty()
}
pub fn has_data_to_be_retransmitted(&self) -> bool {
!self.to_be_retransmitted.is_empty() || !self.to_be_fast_retransmitted.is_empty()
}
pub fn last_cumulative_acked_tsn(&self) -> Tsn {
self.last_cumulative_tsn_ack
}
pub fn next_tsn(&self) -> Tsn {
self.highest_outstanding_tsn() + 1
}
pub fn highest_outstanding_tsn(&self) -> Tsn {
self.last_cumulative_tsn_ack.add_to(self.outstanding_data.len() as u32)
}
pub fn insert(
&mut self,
message_id: OutgoingMessageId,
data: &Data,
time_sent: SocketTime,
max_retransmissions: u16,
expires_at: SocketTime,
lifecycle_id: Option<LifecycleId>,
) -> Option<Tsn> {
debug_assert!(self.unsent_messages_to_discard.is_empty());
let chunk_size = round_up_to_4!(self.data_chunk_header_size + data.payload.len());
self.unacked_bytes += chunk_size;
self.unacked_items += 1;
let tsn = self.next_tsn();
let item = Item {
message_id,
max_retransmissions,
expires_at,
lifecycle_id,
data: data.clone(),
state: ItemState::InFlight { time_sent },
num_retransmissions: 0,
};
self.outstanding_data.push_back(item);
let item = self.outstanding_data.back().unwrap();
if item.expires_at <= time_sent {
log::debug!(
"Marking freshly produced chunk {} and message {} as expired",
tsn,
item.data.mid
);
self.abandon_all_for(tsn);
return None;
}
Some(tsn)
}
fn abandon_all_for(&mut self, tsn: Tsn) {
let index = tsn.distance_to(self.last_cumulative_tsn_ack) - 1;
let item = self.outstanding_data.get(index as usize).unwrap();
let message_id = item.message_id;
let stream_key = item.data.stream_key;
let ssn = item.data.ssn;
let mid = item.data.mid;
let mut end_found = false;
let mut tsn = self.last_cumulative_tsn_ack;
for other in &mut self.outstanding_data {
tsn += 1;
if other.message_id == message_id {
end_found |= other.data.is_end;
if !other.is_abandoned() {
let was_outstanding = other.is_outstanding();
if other.should_be_retransmitted() {
self.to_be_fast_retransmitted.remove(&tsn);
self.to_be_retransmitted.remove(&tsn);
}
other.abandon();
if was_outstanding {
self.unacked_bytes -=
round_up_to_4!(self.data_chunk_header_size + other.data.payload.len());
self.unacked_items -= 1;
}
}
}
}
if end_found {
return;
}
let data = Data { stream_key, ssn, mid, is_end: true, ..Default::default() };
let item = Item {
message_id,
max_retransmissions: 0,
expires_at: SocketTime::zero(),
lifecycle_id: None,
data,
state: ItemState::AbandonedAndAcked,
num_retransmissions: 0,
};
self.outstanding_data.push_back(item);
self.unsent_messages_to_discard.push((stream_key.id(), message_id));
}
pub fn nack_all(&mut self) {
let mut tsns_to_nack: Vec<Tsn> = Vec::new();
let mut tsn = self.last_cumulative_tsn_ack;
for item in &self.outstanding_data {
tsn += 1;
if !item.is_acked() {
tsns_to_nack.push(tsn);
}
}
for tsn in &tsns_to_nack {
self.nack_chunk(*tsn, true, false);
}
}
pub fn create_forward_tsn(&self) -> ForwardTsnChunk {
let mut skipped_per_ordered_stream: BTreeMap<StreamId, Ssn> = BTreeMap::new();
let mut new_cumulative_tsn = self.last_cumulative_tsn_ack;
let mut tsn = self.last_cumulative_tsn_ack;
for item in &self.outstanding_data {
tsn += 1;
if self.stream_reset_breakpoint_tsns.contains(&tsn)
|| tsn != new_cumulative_tsn + 1
|| !item.is_abandoned()
{
break;
}
new_cumulative_tsn = tsn;
if item.data.stream_key.is_ordered() {
let entry =
skipped_per_ordered_stream.entry(item.data.stream_key.id()).or_insert(Ssn(0));
if item.data.ssn > *entry {
*entry = item.data.ssn;
}
}
}
let skipped_streams: Vec<SkippedStream> = skipped_per_ordered_stream
.iter()
.map(|(stream_id, ssn)| SkippedStream::ForwardTsn(*stream_id, *ssn))
.collect();
ForwardTsnChunk { new_cumulative_tsn, skipped_streams }
}
pub fn create_iforward_tsn(&self) -> IForwardTsnChunk {
let mut skipped_per_stream: BTreeMap<StreamKey, Mid> = BTreeMap::new();
let mut new_cumulative_tsn = self.last_cumulative_tsn_ack;
let mut tsn = self.last_cumulative_tsn_ack;
for item in &self.outstanding_data {
tsn += 1;
if self.stream_reset_breakpoint_tsns.contains(&tsn)
|| tsn != new_cumulative_tsn + 1
|| !item.is_abandoned()
{
break;
}
new_cumulative_tsn = tsn;
let entry = skipped_per_stream.entry(item.data.stream_key).or_insert(Mid(0));
if item.data.mid > *entry {
*entry = item.data.mid;
}
}
let skipped_streams: Vec<SkippedStream> = skipped_per_stream
.iter()
.map(|(stream_key, mid)| SkippedStream::IForwardTsn(*stream_key, *mid))
.collect();
IForwardTsnChunk { new_cumulative_tsn, skipped_streams }
}
pub fn measure_rtt(&mut self, now: SocketTime, tsn: Tsn) -> Option<Duration> {
if tsn > self.last_cumulative_tsn_ack && tsn < self.next_tsn() {
let index = tsn.distance_to(self.last_cumulative_tsn_ack) - 1;
let item = self.outstanding_data.get_mut(index as usize).unwrap();
return item.get_rtt_from(now);
}
None
}
pub fn get_chunk_states_for_testing(&self) -> Vec<(Tsn, ChunkState)> {
let mut states: Vec<(Tsn, ChunkState)> = vec![];
states.push((self.last_cumulative_tsn_ack, ChunkState::Acked));
let mut tsn = self.last_cumulative_tsn_ack;
for item in &self.outstanding_data {
tsn += 1;
let state = match &item.state {
ItemState::Abandoned | ItemState::AbandonedAndAcked => ChunkState::Abandoned,
ItemState::QueuedForRetransmission { .. } => ChunkState::ToBeRetransmitted,
ItemState::Acked => ChunkState::Acked,
ItemState::ReportedMissing { .. } => ChunkState::Nacked,
ItemState::InFlight { .. } => ChunkState::InFlight,
};
states.push((tsn, state));
}
states
}
pub fn should_send_forward_tsn(&self) -> bool {
self.outstanding_data.front().map(|c| c.is_abandoned()).unwrap_or(false)
}
pub fn reset_sequence_numbers(&mut self, last_cumulative_tsn: Tsn) {
self.last_cumulative_tsn_ack = last_cumulative_tsn;
}
pub fn begin_reset_streams(&mut self) {
self.stream_reset_breakpoint_tsns.insert(self.next_tsn());
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::testing::data_sequencer::DataSequencer;
use itertools::Itertools;
use std::collections::HashMap;
const MESSAGE_ID: OutgoingMessageId = OutgoingMessageId(17);
const DATA_CHUNK_HEADER_SIZE: usize = 16;
fn now() -> SocketTime {
SocketTime::zero()
}
fn no_expiry() -> SocketTime {
SocketTime::infinite_future()
}
fn insert(buf: &mut OutstandingData, data: Data) -> Tsn {
insert_limited_rtx(buf, data, u16::MAX)
}
fn insert_limited_rtx(buf: &mut OutstandingData, data: Data, max_retransmissions: u16) -> Tsn {
buf.insert(MESSAGE_ID, &data, now(), max_retransmissions, no_expiry(), None).unwrap()
}
struct ChunkGenerator {
current_message_id: OutgoingMessageId,
data_sequencers: HashMap<StreamId, DataSequencer>,
}
impl ChunkGenerator {
pub fn new() -> Self {
Self { current_message_id: OutgoingMessageId(17), data_sequencers: HashMap::new() }
}
pub fn add(
&mut self,
buf: &mut OutstandingData,
sid: StreamId,
payload: &str,
flags: &str,
) -> Tsn {
self.add_limited_rtx(buf, sid, payload, flags, u16::MAX)
}
pub fn add_limited_rtx(
&mut self,
buf: &mut OutstandingData,
sid: StreamId,
payload: &str,
flags: &str,
max_retransmissions: u16,
) -> Tsn {
let seq = self.data_sequencers.entry(sid).or_insert_with(|| DataSequencer::new(sid));
let data = seq.ordered(payload, flags);
let tsn = buf.insert(
self.current_message_id,
&data,
now(),
max_retransmissions,
no_expiry(),
None,
);
if flags.contains("E") {
self.current_message_id += 1;
}
tsn.unwrap()
}
}
#[test]
fn has_initial_state() {
let buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
assert!(buf.is_empty());
assert_eq!(buf.unacked_bytes(), 0);
assert_eq!(buf.unacked_items(), 0);
assert!(!buf.has_data_to_be_retransmitted());
assert_eq!(buf.last_cumulative_acked_tsn(), Tsn(9));
assert_eq!(buf.next_tsn(), Tsn(10));
assert_eq!(buf.highest_outstanding_tsn(), Tsn(9));
assert_eq!(buf.get_chunk_states_for_testing(), vec![(Tsn(9), ChunkState::Acked)]);
assert!(!buf.should_send_forward_tsn());
}
#[test]
fn insert_chunk() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
let tsn = insert(&mut buf, seq.ordered("a", "BE"));
assert_eq!(tsn, Tsn(10));
assert_eq!(buf.unacked_bytes(), DATA_CHUNK_HEADER_SIZE + round_up_to_4!(1));
assert_eq!(buf.unacked_items(), 1);
assert!(!buf.has_data_to_be_retransmitted());
assert_eq!(buf.last_cumulative_acked_tsn(), Tsn(9));
assert_eq!(buf.highest_outstanding_tsn(), Tsn(10));
assert_eq!(buf.next_tsn(), Tsn(11));
assert_eq!(
buf.get_chunk_states_for_testing(),
vec![(Tsn(9), ChunkState::Acked), (Tsn(10), ChunkState::InFlight)]
);
}
#[test]
fn acks_single_chunk() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
let tsn = insert(&mut buf, seq.ordered("a", "BE"));
assert_eq!(tsn, Tsn(10));
let ack = buf.handle_sack(Tsn(10), &[], false);
assert_eq!(ack.bytes_acked, DATA_CHUNK_HEADER_SIZE + round_up_to_4!(1));
assert_eq!(ack.highest_tsn_acked, Tsn(10));
assert!(!ack.has_packet_loss);
assert_eq!(buf.unacked_bytes(), 0);
assert_eq!(buf.unacked_items(), 0);
assert!(!buf.has_data_to_be_retransmitted());
assert_eq!(buf.last_cumulative_acked_tsn(), Tsn(10));
assert_eq!(buf.highest_outstanding_tsn(), Tsn(10));
assert_eq!(buf.next_tsn(), Tsn(11));
assert_eq!(buf.get_chunk_states_for_testing(), vec![(Tsn(10), ChunkState::Acked)]);
}
#[test]
fn acks_previous_chunk_doesnt_update() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
let tsn = insert(&mut buf, seq.ordered("a", "BE"));
assert_eq!(tsn, Tsn(10));
let ack = buf.handle_sack(Tsn(9), &[], false);
assert_eq!(ack.bytes_acked, 0);
assert_eq!(ack.highest_tsn_acked, Tsn(9));
assert!(!ack.has_packet_loss);
assert_eq!(buf.unacked_bytes(), DATA_CHUNK_HEADER_SIZE + round_up_to_4!(1));
assert_eq!(buf.unacked_items(), 1);
assert!(!buf.has_data_to_be_retransmitted());
assert_eq!(buf.last_cumulative_acked_tsn(), Tsn(9));
assert_eq!(buf.highest_outstanding_tsn(), Tsn(10));
assert_eq!(buf.next_tsn(), Tsn(11));
assert_eq!(
buf.get_chunk_states_for_testing(),
vec![(Tsn(9), ChunkState::Acked), (Tsn(10), ChunkState::InFlight)]
);
}
#[test]
fn acks_and_nacks_with_gap_ack_blocks() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
insert(&mut buf, seq.ordered("a", "B"));
insert(&mut buf, seq.ordered("b", "E"));
let ack = buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 2)], false);
assert_eq!(ack.bytes_acked, DATA_CHUNK_HEADER_SIZE + round_up_to_4!(1));
assert_eq!(ack.highest_tsn_acked, Tsn(11));
assert!(!ack.has_packet_loss);
assert_eq!(buf.unacked_bytes(), DATA_CHUNK_HEADER_SIZE + round_up_to_4!(1));
assert_eq!(buf.unacked_items(), 1);
assert!(!buf.has_data_to_be_retransmitted());
assert_eq!(buf.last_cumulative_acked_tsn(), Tsn(9));
assert_eq!(buf.highest_outstanding_tsn(), Tsn(11));
assert_eq!(buf.next_tsn(), Tsn(12));
assert_eq!(
buf.get_chunk_states_for_testing(),
vec![
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::Nacked),
(Tsn(11), ChunkState::Acked)
]
);
}
#[test]
fn nacks_three_times_with_same_tsn_doesnt_retransmit() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
insert(&mut buf, seq.ordered("a", "B"));
insert(&mut buf, seq.ordered("b", "E"));
assert!(!buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 2)], false).has_packet_loss);
assert!(!buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 2)], false).has_packet_loss);
assert!(!buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 2)], false).has_packet_loss);
assert_eq!(
buf.get_chunk_states_for_testing(),
vec![
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::Nacked),
(Tsn(11), ChunkState::Acked)
]
);
}
#[test]
fn nacks_three_times_results_in_retransmission() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
insert(&mut buf, seq.ordered("a", "B"));
insert(&mut buf, seq.ordered("b", ""));
insert(&mut buf, seq.ordered("c", ""));
insert(&mut buf, seq.ordered("d", "E"));
assert!(!buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 2)], false).has_packet_loss);
assert!(!buf.has_data_to_be_retransmitted());
assert!(!buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 3)], false).has_packet_loss);
assert!(!buf.has_data_to_be_retransmitted());
let ack = buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 4)], false);
assert_eq!(ack.bytes_acked, DATA_CHUNK_HEADER_SIZE + round_up_to_4!(1));
assert_eq!(ack.highest_tsn_acked, Tsn(13));
assert!(ack.has_packet_loss);
assert!(buf.has_data_to_be_retransmitted());
assert_eq!(
buf.get_chunk_states_for_testing(),
vec![
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::ToBeRetransmitted),
(Tsn(11), ChunkState::Acked),
(Tsn(12), ChunkState::Acked),
(Tsn(13), ChunkState::Acked)
]
);
}
#[test]
fn nacks_three_times_results_in_abandoning() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
insert_limited_rtx(&mut buf, seq.ordered("a", "B"), 0);
insert_limited_rtx(&mut buf, seq.ordered("b", ""), 0);
insert_limited_rtx(&mut buf, seq.ordered("c", ""), 0);
insert_limited_rtx(&mut buf, seq.ordered("d", "E"), 0);
assert!(!buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 2)], false).has_packet_loss);
assert!(!buf.has_data_to_be_retransmitted());
assert!(!buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 3)], false).has_packet_loss);
assert!(!buf.has_data_to_be_retransmitted());
let ack = buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 4)], false);
assert_eq!(ack.bytes_acked, DATA_CHUNK_HEADER_SIZE + round_up_to_4!(1));
assert_eq!(ack.highest_tsn_acked, Tsn(13));
assert!(ack.has_packet_loss);
assert!(!buf.has_data_to_be_retransmitted());
assert_eq!(
buf.get_chunk_states_for_testing(),
vec![
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::Abandoned),
(Tsn(11), ChunkState::Abandoned),
(Tsn(12), ChunkState::Abandoned),
(Tsn(13), ChunkState::Abandoned)
]
);
}
#[test]
fn nacks_extremely_many_times_doesnt_overflow() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
insert_limited_rtx(&mut buf, seq.ordered("a", "B"), 0);
const FRAGMENT_COUNT: u16 = 1000;
for _ in 0..FRAGMENT_COUNT {
insert_limited_rtx(&mut buf, seq.ordered("b", ""), 0);
}
for i in 0..FRAGMENT_COUNT {
buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 2 + i)], false);
}
}
#[test]
fn nacks_three_times_results_in_abandoning_with_placeholder() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
insert_limited_rtx(&mut buf, seq.ordered("a", "B"), 0);
insert_limited_rtx(&mut buf, seq.ordered("b", ""), 0);
insert_limited_rtx(&mut buf, seq.ordered("c", ""), 0);
insert_limited_rtx(&mut buf, seq.ordered("d", ""), 0);
assert!(!buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 2)], false).has_packet_loss);
assert!(!buf.has_data_to_be_retransmitted());
assert!(!buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 3)], false).has_packet_loss);
assert!(!buf.has_data_to_be_retransmitted());
assert!(!buf.has_unsent_messages_to_discard());
let ack = buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 4)], false);
assert_eq!(ack.bytes_acked, DATA_CHUNK_HEADER_SIZE + round_up_to_4!(1));
assert_eq!(ack.highest_tsn_acked, Tsn(13));
assert!(ack.has_packet_loss);
assert!(!buf.has_data_to_be_retransmitted());
assert_eq!(
buf.get_chunk_states_for_testing(),
vec![
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::Abandoned),
(Tsn(11), ChunkState::Abandoned),
(Tsn(12), ChunkState::Abandoned),
(Tsn(13), ChunkState::Abandoned),
(Tsn(14), ChunkState::Abandoned)
]
);
assert_eq!(buf.get_unsent_messages_to_discard(), vec![(StreamId(1), MESSAGE_ID)]);
}
#[test]
fn expires_chunk_before_it_is_inserted() {
let now = now();
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
let expires_at = now + Duration::from_millis(1);
assert!(
buf.insert(
MESSAGE_ID,
&seq.ordered("a", "B"),
now + Duration::from_millis(0),
u16::MAX,
expires_at,
None,
)
.is_some()
);
assert!(
buf.insert(
MESSAGE_ID,
&seq.ordered("b", ""),
now + Duration::from_millis(0),
u16::MAX,
expires_at,
None,
)
.is_some()
);
assert!(
buf.insert(
MESSAGE_ID,
&seq.ordered("c", "E"),
now + Duration::from_millis(1),
u16::MAX,
expires_at,
None,
)
.is_none()
);
assert!(!buf.has_data_to_be_retransmitted());
assert_eq!(buf.last_cumulative_acked_tsn(), Tsn(9));
assert_eq!(buf.highest_outstanding_tsn(), Tsn(12));
assert_eq!(buf.next_tsn(), Tsn(13));
assert_eq!(
buf.get_chunk_states_for_testing(),
vec![
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::Abandoned),
(Tsn(11), ChunkState::Abandoned),
(Tsn(12), ChunkState::Abandoned),
]
);
assert!(!buf.has_unsent_messages_to_discard());
}
#[test]
fn expires_chunk_before_it_is_inserted_adds_placeholder() {
let now = now();
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
let expires_at = now + Duration::from_millis(1);
assert!(
buf.insert(
MESSAGE_ID,
&seq.ordered("a", "B"),
now + Duration::from_millis(0),
u16::MAX,
expires_at,
None,
)
.is_some()
);
assert!(
buf.insert(
MESSAGE_ID,
&seq.ordered("b", ""),
now + Duration::from_millis(0),
u16::MAX,
expires_at,
None,
)
.is_some()
);
assert!(!buf.has_unsent_messages_to_discard());
assert!(
buf.insert(
MESSAGE_ID,
&seq.ordered("c", ""),
now + Duration::from_millis(1),
u16::MAX,
expires_at,
None,
)
.is_none()
);
assert!(!buf.has_data_to_be_retransmitted());
assert_eq!(buf.last_cumulative_acked_tsn(), Tsn(9));
assert_eq!(buf.highest_outstanding_tsn(), Tsn(13));
assert_eq!(buf.next_tsn(), Tsn(14));
assert_eq!(
buf.get_chunk_states_for_testing(),
vec![
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::Abandoned),
(Tsn(11), ChunkState::Abandoned),
(Tsn(12), ChunkState::Abandoned),
(Tsn(13), ChunkState::Abandoned),
]
);
assert_eq!(buf.get_unsent_messages_to_discard(), vec![(StreamId(1), MESSAGE_ID)]);
}
#[test]
fn can_generate_forward_tsn() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
insert_limited_rtx(&mut buf, seq.ordered("a", "B"), 0);
insert_limited_rtx(&mut buf, seq.ordered("b", ""), 0);
insert_limited_rtx(&mut buf, seq.ordered("c", "E"), 0);
buf.nack_all();
assert_eq!(
buf.get_chunk_states_for_testing(),
vec![
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::Abandoned),
(Tsn(11), ChunkState::Abandoned),
(Tsn(12), ChunkState::Abandoned),
]
);
assert!(buf.should_send_forward_tsn());
let chunk = buf.create_forward_tsn();
assert_eq!(chunk.new_cumulative_tsn, Tsn(12));
}
#[test]
fn ack_with_gap_blocks_from_rfc9260_section334() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
insert(&mut buf, seq.ordered("a", "B"));
insert(&mut buf, seq.ordered("b", ""));
insert(&mut buf, seq.ordered("c", ""));
insert(&mut buf, seq.ordered("d", ""));
insert(&mut buf, seq.ordered("e", ""));
insert(&mut buf, seq.ordered("f", ""));
insert(&mut buf, seq.ordered("g", ""));
insert(&mut buf, seq.ordered("h", "E"));
assert_eq!(
buf.get_chunk_states_for_testing(),
vec![
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::InFlight),
(Tsn(11), ChunkState::InFlight),
(Tsn(12), ChunkState::InFlight),
(Tsn(13), ChunkState::InFlight),
(Tsn(14), ChunkState::InFlight),
(Tsn(15), ChunkState::InFlight),
(Tsn(16), ChunkState::InFlight),
(Tsn(17), ChunkState::InFlight)
]
);
buf.handle_sack(Tsn(12), &[GapAckBlock::new(2, 3), GapAckBlock::new(5, 5)], false);
assert_eq!(
buf.get_chunk_states_for_testing(),
vec![
(Tsn(12), ChunkState::Acked),
(Tsn(13), ChunkState::Nacked),
(Tsn(14), ChunkState::Acked),
(Tsn(15), ChunkState::Acked),
(Tsn(16), ChunkState::Nacked),
(Tsn(17), ChunkState::Acked)
]
);
}
#[test]
fn measure_rtt() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
let now = now();
buf.insert(OutgoingMessageId(1), &seq.ordered("a", "BE"), now, u16::MAX, no_expiry(), None);
let tsn = buf
.insert(
OutgoingMessageId(2),
&seq.ordered("b", "BE"),
now + Duration::from_millis(1),
u16::MAX,
no_expiry(),
None,
)
.unwrap();
buf.insert(
OutgoingMessageId(3),
&seq.ordered("c", "BE"),
now + Duration::from_millis(2),
u16::MAX,
no_expiry(),
None,
);
let duration = buf.measure_rtt(now + Duration::from_millis(123), tsn).unwrap();
assert_eq!(duration, Duration::from_millis(122));
}
#[test]
fn must_retransmit_before_getting_nacked_again() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
for i in 10..=20 {
let flags = match i {
10 => "B",
20 => "E",
_ => "",
};
insert_limited_rtx(&mut buf, seq.ordered("a", flags), 1);
}
buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 2)], false);
assert!(!buf.has_data_to_be_retransmitted());
buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 3)], false);
assert!(!buf.has_data_to_be_retransmitted());
let ack = buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 4)], false);
assert!(ack.has_packet_loss);
assert!(buf.has_data_to_be_retransmitted());
buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 5)], false);
assert!(buf.has_data_to_be_retransmitted());
buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 6)], false);
assert!(buf.has_data_to_be_retransmitted());
let ack = buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 7)], false);
assert!(!ack.has_packet_loss);
assert!(buf.has_data_to_be_retransmitted());
let chunks = buf.get_chunks_to_be_fast_retransmitted(now(), 1000);
assert_eq!(chunks.iter().map(|c| c.0).collect_vec(), &[Tsn(10)]);
assert!(buf.get_chunks_to_be_retransmitted(now(), 1000).is_empty());
buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 8)], false);
assert!(!buf.has_data_to_be_retransmitted());
buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 9)], false);
assert!(!buf.has_data_to_be_retransmitted());
let ack = buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 10)], false);
assert!(ack.has_packet_loss);
assert!(!buf.has_data_to_be_retransmitted());
}
#[test]
fn lifecyle_returns_acked_items_in_ack_info() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
buf.insert(
OutgoingMessageId(1),
&seq.ordered("a", "BE"),
now(),
u16::MAX,
no_expiry(),
LifecycleId::new(42),
);
buf.insert(
OutgoingMessageId(2),
&seq.ordered("b", "BE"),
now(),
u16::MAX,
no_expiry(),
LifecycleId::new(43),
);
buf.insert(
OutgoingMessageId(3),
&seq.ordered("c", "BE"),
now(),
u16::MAX,
no_expiry(),
LifecycleId::new(44),
);
let ack = buf.handle_sack(Tsn(11), &[], false);
assert_eq!(ack.acked_lifecycle_ids, &[LifecycleId::from(42), LifecycleId::from(43)]);
let ack = buf.handle_sack(Tsn(12), &[], false);
assert_eq!(ack.acked_lifecycle_ids, &[LifecycleId::from(44)]);
}
#[test]
fn lifecycle_returns_abandoned_nacked_three_times() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
buf.insert(
OutgoingMessageId(1),
&seq.ordered("a", "B"),
now(),
0,
no_expiry(),
None,
);
buf.insert(
OutgoingMessageId(1),
&seq.ordered("b", ""),
now(),
0,
no_expiry(),
None,
);
buf.insert(
OutgoingMessageId(1),
&seq.ordered("c", ""),
now(),
0,
no_expiry(),
None,
);
buf.insert(
OutgoingMessageId(1),
&seq.ordered("d", "E"),
now(),
0,
no_expiry(),
LifecycleId::new(42),
);
buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 2)], false);
assert!(!buf.has_data_to_be_retransmitted());
buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 3)], false);
assert!(!buf.has_data_to_be_retransmitted());
let ack = buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 4)], false);
assert!(ack.has_packet_loss);
assert!(ack.abandoned_lifecycle_ids.is_empty());
assert!(buf.should_send_forward_tsn());
let fwd = buf.create_forward_tsn();
assert_eq!(fwd.new_cumulative_tsn, Tsn(13));
let ack = buf.handle_sack(Tsn(13), &[], false);
assert!(!ack.has_packet_loss);
assert_eq!(ack.abandoned_lifecycle_ids, &[LifecycleId::from(42)]);
}
#[test]
fn lifecycle_returns_abandoned_after_t3rtx_expired() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
buf.insert(
OutgoingMessageId(1),
&seq.ordered("a", "B"),
now(),
0,
no_expiry(),
None,
);
buf.insert(
OutgoingMessageId(1),
&seq.ordered("b", ""),
now(),
0,
no_expiry(),
None,
);
buf.insert(
OutgoingMessageId(1),
&seq.ordered("c", ""),
now(),
0,
no_expiry(),
None,
);
buf.insert(
OutgoingMessageId(1),
&seq.ordered("d", "E"),
now(),
0,
no_expiry(),
LifecycleId::new(42),
);
assert_eq!(
buf.get_chunk_states_for_testing(),
vec![
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::InFlight),
(Tsn(11), ChunkState::InFlight),
(Tsn(12), ChunkState::InFlight),
(Tsn(13), ChunkState::InFlight),
]
);
buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 4)], false);
assert!(!buf.has_data_to_be_retransmitted());
assert_eq!(
buf.get_chunk_states_for_testing(),
vec![
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::Nacked),
(Tsn(11), ChunkState::Acked),
(Tsn(12), ChunkState::Acked),
(Tsn(13), ChunkState::Acked),
]
);
buf.nack_all();
assert_eq!(
buf.get_chunk_states_for_testing(),
vec![
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::Abandoned),
(Tsn(11), ChunkState::Abandoned),
(Tsn(12), ChunkState::Abandoned),
(Tsn(13), ChunkState::Abandoned),
]
);
assert!(buf.should_send_forward_tsn());
let fwd = buf.create_forward_tsn();
assert_eq!(fwd.new_cumulative_tsn, Tsn(13));
let ack = buf.handle_sack(Tsn(13), &[], false);
assert!(!ack.has_packet_loss);
assert_eq!(ack.abandoned_lifecycle_ids, &[LifecycleId::from(42)]);
}
#[test]
fn generates_forward_tsn_until_next_stream_reset_tsn() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = ChunkGenerator::new();
seq.add_limited_rtx(&mut buf, StreamId(1), "a", "BE", 0);
seq.add_limited_rtx(&mut buf, StreamId(1), "b", "BE", 0);
seq.add_limited_rtx(&mut buf, StreamId(1), "c", "BE", 0);
buf.begin_reset_streams();
seq.add_limited_rtx(&mut buf, StreamId(2), "d", "BE", 0);
seq.add_limited_rtx(&mut buf, StreamId(2), "e", "BE", 0);
buf.begin_reset_streams();
seq.add_limited_rtx(&mut buf, StreamId(3), "f", "BE", 0);
assert_eq!(seq.add(&mut buf, StreamId(3), "g", "BE"), Tsn(16));
assert!(!buf.should_send_forward_tsn());
buf.handle_sack(Tsn(11), &[], false);
buf.nack_all();
assert_eq!(
buf.get_chunk_states_for_testing(),
vec![
(Tsn(11), ChunkState::Acked),
(Tsn(12), ChunkState::Abandoned),
(Tsn(13), ChunkState::Abandoned),
(Tsn(14), ChunkState::Abandoned),
(Tsn(15), ChunkState::Abandoned),
(Tsn(16), ChunkState::ToBeRetransmitted),
]
);
assert!(buf.should_send_forward_tsn());
let fwd = buf.create_forward_tsn();
assert_eq!(fwd.new_cumulative_tsn, Tsn(12));
assert_eq!(fwd.skipped_streams, vec!(SkippedStream::ForwardTsn(StreamId(1), Ssn(2))));
buf.handle_sack(Tsn(12), &[], false);
assert!(buf.should_send_forward_tsn());
let fwd = buf.create_forward_tsn();
assert_eq!(fwd.new_cumulative_tsn, Tsn(14));
assert_eq!(fwd.skipped_streams, vec!(SkippedStream::ForwardTsn(StreamId(2), Ssn(1))));
buf.handle_sack(Tsn(13), &[], false);
assert!(buf.should_send_forward_tsn());
let fwd = buf.create_forward_tsn();
assert_eq!(fwd.new_cumulative_tsn, Tsn(14));
assert_eq!(fwd.skipped_streams, vec!(SkippedStream::ForwardTsn(StreamId(2), Ssn(1))));
buf.handle_sack(Tsn(14), &[], false);
assert!(buf.should_send_forward_tsn());
let fwd = buf.create_forward_tsn();
assert_eq!(fwd.new_cumulative_tsn, Tsn(15));
assert_eq!(fwd.skipped_streams, vec!(SkippedStream::ForwardTsn(StreamId(3), Ssn(0))));
buf.handle_sack(Tsn(15), &[], false);
assert!(!buf.should_send_forward_tsn());
}
#[test]
fn fast_recovery_increments_nack_count_when_cumulative_tsn_advances() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
for _ in 10..=16 {
insert(&mut buf, seq.ordered("abc", "BE"));
}
let gab1 = vec![
GapAckBlock::new(2, 2), GapAckBlock::new(4, 4), GapAckBlock::new(6, 6), ];
buf.handle_sack(Tsn(10), &gab1, false);
let gab2 = vec![
GapAckBlock::new(1, 1), GapAckBlock::new(3, 3), GapAckBlock::new(5, 5), ];
buf.handle_sack(Tsn(11), &gab2, true);
let gab3 = vec![
GapAckBlock::new(2, 2), GapAckBlock::new(4, 4), ];
buf.handle_sack(Tsn(12), &gab3, true);
assert_eq!(
buf.get_chunk_states_for_testing(),
vec![
(Tsn(12), ChunkState::Acked),
(Tsn(13), ChunkState::ToBeRetransmitted),
(Tsn(14), ChunkState::Acked),
(Tsn(15), ChunkState::ToBeRetransmitted),
(Tsn(16), ChunkState::Acked),
]
);
}
#[test]
fn rtt_is_measured_even_if_queued_for_retransmission() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut generator = ChunkGenerator::new();
generator.add(&mut buf, StreamId(1), "a", "BE"); generator.add(&mut buf, StreamId(1), "b", "BE"); generator.add(&mut buf, StreamId(1), "c", "BE"); generator.add(&mut buf, StreamId(1), "d", "BE"); generator.add(&mut buf, StreamId(1), "e", "BE");
buf.handle_sack(Tsn(10), &[GapAckBlock::new(2, 2)], false);
buf.handle_sack(Tsn(10), &[GapAckBlock::new(2, 3)], false);
buf.handle_sack(Tsn(10), &[GapAckBlock::new(2, 4)], false);
assert!(buf.has_data_to_be_retransmitted());
assert_eq!(
buf.get_chunk_states_for_testing(),
&[
(Tsn(10), ChunkState::Acked),
(Tsn(11), ChunkState::ToBeRetransmitted),
(Tsn(12), ChunkState::Acked),
(Tsn(13), ChunkState::Acked),
(Tsn(14), ChunkState::Acked)
]
);
let t1 = SocketTime::zero() + Duration::from_millis(500);
assert_eq!(buf.measure_rtt(t1, Tsn(11)), Some(Duration::from_millis(500)));
}
#[test]
fn expires_chunks_queued_for_retransmission() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut generator = DataSequencer::new(StreamId(1));
let now = SocketTime::zero();
let expiry = now + Duration::from_millis(100);
buf.insert(OutgoingMessageId(1), &generator.ordered("a", "BE"), now, 1, expiry, None); buf.insert(OutgoingMessageId(2), &generator.ordered("b", "BE"), now, 1, expiry, None); buf.insert(OutgoingMessageId(3), &generator.ordered("c", "BE"), now, 1, no_expiry(), None); buf.insert(OutgoingMessageId(4), &generator.ordered("d", "BE"), now, 1, no_expiry(), None); buf.insert(OutgoingMessageId(5), &generator.ordered("e", "BE"), now, 1, no_expiry(), None);
buf.handle_sack(Tsn(9), &[GapAckBlock::new(3, 3)], false);
buf.handle_sack(Tsn(9), &[GapAckBlock::new(3, 4)], false);
buf.handle_sack(Tsn(9), &[GapAckBlock::new(3, 5)], false);
assert_eq!(
buf.get_chunk_states_for_testing(),
&[
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::ToBeRetransmitted),
(Tsn(11), ChunkState::ToBeRetransmitted),
(Tsn(12), ChunkState::Acked),
(Tsn(13), ChunkState::Acked),
(Tsn(14), ChunkState::Acked)
]
);
buf.expire_outstanding_chunks(now + Duration::from_millis(200));
assert_eq!(
buf.get_chunk_states_for_testing(),
&[
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::Abandoned),
(Tsn(11), ChunkState::Abandoned),
(Tsn(12), ChunkState::Acked),
(Tsn(13), ChunkState::Acked),
(Tsn(14), ChunkState::Acked)
]
);
}
#[test]
fn does_not_double_count_bytes_when_abandoned_chunk_is_reacked() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut generator = DataSequencer::new(StreamId(1));
let chunk_size = round_up_to_4!(DATA_CHUNK_HEADER_SIZE + 1);
let now = SocketTime::zero();
let expiry = now + Duration::from_millis(100);
buf.insert(OutgoingMessageId(1), &generator.ordered("a", "B"), now, 0, expiry, None);
buf.insert(OutgoingMessageId(1), &generator.ordered("b", "E"), now, 0, expiry, None);
buf.insert(OutgoingMessageId(2), &generator.ordered("c", "B"), now, 1, no_expiry(), None);
buf.insert(OutgoingMessageId(2), &generator.ordered("d", "E"), now, 1, no_expiry(), None);
let ack1 =
buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 2), GapAckBlock::new(4, 4)], false);
assert_eq!(ack1.bytes_acked, chunk_size * 2);
buf.expire_outstanding_chunks(now + Duration::from_millis(200));
assert_eq!(
buf.get_chunk_states_for_testing(),
&[
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::Abandoned),
(Tsn(11), ChunkState::Abandoned),
(Tsn(12), ChunkState::Nacked),
(Tsn(13), ChunkState::Acked),
]
);
let ack2 = buf.handle_sack(Tsn(13), &[], false);
assert_eq!(ack2.bytes_acked, chunk_size * 2);
}
#[test]
fn placeholder_fragment_does_not_contribute_to_bytes_acked() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
buf.insert(MESSAGE_ID, &seq.ordered("a", "B"), now(), 0, no_expiry(), None);
buf.nack_all();
assert_eq!(
buf.get_chunk_states_for_testing(),
vec![
(Tsn(9), ChunkState::Acked),
(Tsn(10), ChunkState::Abandoned),
(Tsn(11), ChunkState::Abandoned),
]
);
let ack = buf.handle_sack(Tsn(11), &[], false);
let expected_size = round_up_to_4!(DATA_CHUNK_HEADER_SIZE + 1);
assert_eq!(ack.bytes_acked, expected_size);
}
#[test]
fn rtt_not_measured_for_gap_acked_tsns() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut generator = ChunkGenerator::new();
let t0 = now();
generator.add(&mut buf, StreamId(1), "a", "BE"); generator.add(&mut buf, StreamId(1), "b", "BE"); generator.add(&mut buf, StreamId(1), "c", "BE"); generator.add(&mut buf, StreamId(1), "d", "BE"); generator.add(&mut buf, StreamId(1), "e", "BE");
buf.handle_sack(Tsn(10), &[GapAckBlock::new(2, 2)], false);
buf.handle_sack(Tsn(10), &[GapAckBlock::new(2, 3)], false);
buf.handle_sack(Tsn(10), &[GapAckBlock::new(2, 4)], false);
assert!(buf.has_data_to_be_fast_retransmitted());
let t1 = t0 + Duration::from_millis(100);
let chunks = buf.get_chunks_to_be_fast_retransmitted(t1, 1000);
assert_eq!(chunks[0].0, Tsn(11));
let t2 = t0 + Duration::from_millis(500);
assert_eq!(buf.measure_rtt(t2, Tsn(12)), None);
assert_eq!(buf.measure_rtt(t2, Tsn(11)), None);
}
#[test]
fn test_acked_chunk_is_removed_from_fast_retransmit() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
insert(&mut buf, seq.ordered("a", "B"));
insert(&mut buf, seq.ordered("b", ""));
insert(&mut buf, seq.ordered("c", ""));
insert(&mut buf, seq.ordered("d", ""));
insert(&mut buf, seq.ordered("e", "E"));
buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 2)], false);
buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 3)], false);
let ack = buf.handle_sack(Tsn(9), &[GapAckBlock::new(2, 4)], false);
assert!(ack.has_packet_loss);
assert!(buf.has_data_to_be_retransmitted());
buf.handle_sack(Tsn(14), &[], false);
assert!(buf.get_chunks_to_be_fast_retransmitted(now(), 1000).is_empty());
assert!(buf.get_chunks_to_be_retransmitted(now(), 1000).is_empty());
}
#[test]
fn test_gap_ack_block_beyond_highest_outstanding_does_not_panic() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
insert(&mut buf, seq.ordered("a", "B"));
insert(&mut buf, seq.ordered("b", ""));
insert(&mut buf, seq.ordered("c", ""));
insert(&mut buf, seq.ordered("d", "E"));
let gab = vec![GapAckBlock::new(4, 4)];
buf.handle_sack(Tsn(11), &gab, true);
}
#[test]
fn test_old_sack_with_gap_blocks_does_not_panic() {
let mut buf = OutstandingData::new(DATA_CHUNK_HEADER_SIZE, Tsn(9));
let mut seq = DataSequencer::new(StreamId(1));
insert(&mut buf, seq.ordered("a", "B"));
insert(&mut buf, seq.ordered("b", ""));
insert(&mut buf, seq.ordered("c", ""));
insert(&mut buf, seq.ordered("d", "E"));
buf.handle_sack(Tsn(13), &[], false);
insert(&mut buf, seq.ordered("e", "B"));
insert(&mut buf, seq.ordered("f", "E"));
let gab = vec![GapAckBlock::new(10, 10)];
buf.handle_sack(Tsn(5), &gab, false);
}
}