use std::{
cmp::min,
collections::VecDeque,
convert::TryFrom,
ops::Range,
time::{Duration, Instant},
};
use bytes::{Bytes, BytesMut};
use crate::{options::PacketCount, packet::*};
use super::{
time::{ClockAdjustment, SynchronizedRemoteClock},
DataPacketAction, DataPacketError,
};
#[derive(Debug)]
pub struct LostPacket {
data_sequence_number: SeqNumber,
feedback_time: Instant,
k: i32,
}
impl LostPacket {
pub fn new(data_sequence_number: SeqNumber, feedback_time: Instant) -> Self {
Self {
data_sequence_number,
feedback_time,
k: 2,
}
}
}
#[derive(Debug)]
pub enum BufferPacket {
Lost(LostPacket),
Received(DataPacket),
Dropped(SeqNumber),
}
impl BufferPacket {
pub fn data_sequence_number(&self) -> SeqNumber {
match self {
BufferPacket::Lost(LostPacket {
data_sequence_number: seq_number,
..
})
| BufferPacket::Dropped(seq_number)
| BufferPacket::Received(DataPacket { seq_number, .. }) => *seq_number,
}
}
pub fn data_packet(&self) -> Option<&DataPacket> {
match self {
BufferPacket::Received(data) => Some(data),
_ => None,
}
}
pub fn into_data_packet(self) -> Option<DataPacket> {
match self {
BufferPacket::Received(data) => Some(data),
_ => None,
}
}
pub fn in_message(&self, message: MsgNumber) -> bool {
self.data_packet()
.map_or(false, |d| d.message_number == message)
}
pub fn is_first(&self) -> bool {
matches!(self, BufferPacket::Received(data) if data.message_loc.contains(PacketLocation::FIRST))
}
fn lost_or_dropped(&self) -> Option<SeqNumber> {
match self {
BufferPacket::Lost(LostPacket {
data_sequence_number: sn,
..
})
| BufferPacket::Dropped(sn) => Some(*sn),
_ => None,
}
}
pub fn lost_ready_for_feedback_mut(
&mut self,
now: Instant,
rtt_mean: TimeSpan,
) -> Option<&mut LostPacket> {
match self {
BufferPacket::Lost(lost) if now > lost.feedback_time + (rtt_mean * lost.k) => {
Some(lost)
}
_ => None,
}
}
pub fn update_data(&mut self, data: DataPacket) -> Result<(), DataPacketError> {
use BufferPacket::*;
if matches!(self, Lost(_)) {
*self = Received(data);
Ok(())
} else {
Err(DataPacketError::DiscardedDuplicate {
seq_number: data.seq_number,
})
}
}
pub fn drop_unreceived(&mut self) -> Option<SeqNumber> {
use BufferPacket::*;
let dsn = self.data_sequence_number();
if matches!(self, Lost(_)) {
*self = Dropped(dsn);
Some(dsn)
} else {
None
}
}
}
pub struct MessagePacketCount {
count: usize,
done: bool,
}
impl MessagePacketCount {
pub fn new() -> Self {
Self {
count: 0,
done: false,
}
}
pub fn accumulate(mut self, packet: &BufferPacket) -> Option<Self> {
let location = packet.data_packet()?.message_loc;
if !self.done {
if self.count == 0 && !location.contains(PacketLocation::FIRST) {
return None;
}
if location.contains(PacketLocation::LAST) {
self.done = true;
}
self.count += 1;
}
Some(self)
}
pub fn calculate(self) -> Option<usize> {
if self.done {
Some(self.count)
} else {
None
}
}
}
#[derive(Debug, Eq, PartialEq)]
pub struct MessageError {
pub too_late_packets: Range<SeqNumber>,
pub delay: TimeSpan,
}
#[derive(Debug)]
pub struct ReceiveBuffer {
tsbpd_latency: Duration,
tsbpd_tolerance: Duration,
lrsn: SeqNumber,
seqno0: SeqNumber,
too_late_packet_drop: bool,
remote_clock: SynchronizedRemoteClock,
buffer: VecDeque<BufferPacket>,
max_buffer_size: PacketCount,
}
impl ReceiveBuffer {
pub fn new(
socket_start_time: Instant,
tsbpd_latency: Duration,
too_late_packet_drop: bool,
init_seq_num: SeqNumber,
max_buffer_size: PacketCount,
) -> Self {
Self {
tsbpd_latency,
tsbpd_tolerance: Duration::from_millis(5),
too_late_packet_drop,
lrsn: init_seq_num,
seqno0: init_seq_num,
remote_clock: SynchronizedRemoteClock::new(socket_start_time),
buffer: VecDeque::with_capacity(max_buffer_size.into()),
max_buffer_size,
}
}
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
pub fn next_ack_dsn(&self) -> SeqNumber {
self.lrsn
}
pub fn clear(&mut self) {
self.buffer.clear();
}
pub fn synchronize_clock(
&mut self,
now: Instant,
now_ts: TimeStamp,
) -> Option<ClockAdjustment> {
self.remote_clock.synchronize(now, now_ts)
}
pub fn buffer_available(&self) -> usize {
usize::from(self.max_buffer_size) - self.buffer.len()
}
fn next_packet_dsn(&self) -> SeqNumber {
self.seqno0 + u32::try_from(self.buffer.len()).unwrap()
}
fn index_for_seqno(&self, seq_number: SeqNumber) -> Option<usize> {
if seq_number < self.seqno0 {
None
} else {
Some((seq_number - self.seqno0) as usize)
}
}
fn clamped_index_for_seqno(&self, seq_number: SeqNumber) -> usize {
min(seq_number.saturating_sub(self.seqno0), self.buffer.len())
}
pub fn push_packet(
&mut self,
now: Instant,
data: DataPacket,
) -> Result<DataPacketAction, DataPacketError> {
use std::cmp::Ordering::*;
match data.seq_number.cmp(&self.next_packet_dsn()) {
Equal => self.append_next(data),
Greater => self.append_with_loss(now, data),
Less => self.recover_data(data),
}
}
pub fn pop_next_message(
&mut self,
now: Instant,
) -> Result<Option<(Instant, Bytes)>, MessageError> {
let timestamp = match self.front_ts() {
Some(timestamp) => timestamp,
None => return self.drop_too_late_packets(now),
};
let sent_time = self.remote_clock.instant_from(timestamp);
if now < sent_time + self.tsbpd_latency {
return Ok(None);
}
let packet_count = match self.next_message_packet_count() {
Some(packet_count) => packet_count,
None => return self.drop_too_late_packets(now),
};
self.seqno0 += u32::try_from(packet_count).unwrap();
let release_time = self.remote_clock.monotonic_instant_from(timestamp);
let message = if packet_count == 1 {
self.release_single_packet_message(release_time)
} else {
self.release_full_message(release_time, packet_count)
};
Ok(message)
}
fn front_ts(&mut self) -> Option<TimeStamp> {
self.buffer.front()?.data_packet().map(|d| d.timestamp)
}
fn release_single_packet_message(&mut self, release_time: Instant) -> Option<(Instant, Bytes)> {
Some((
release_time,
self.buffer.pop_front()?.into_data_packet()?.payload,
))
}
fn release_full_message(
&mut self,
release_time: Instant,
packet_count: usize,
) -> Option<(Instant, Bytes)> {
Some((
release_time,
self.buffer
.drain(0..packet_count)
.fold(BytesMut::new(), |mut bytes, pack| {
bytes.extend(pack.into_data_packet().unwrap().payload);
bytes
})
.freeze(),
))
}
pub fn prepare_loss_list(
&mut self,
now: Instant,
rtt_mean: TimeSpan,
) -> Option<CompressedLossList> {
let loss_list = self
.buffer
.range_mut(self.lost_list_index()..)
.filter_map(|p| p.lost_ready_for_feedback_mut(now, rtt_mean))
.map(|lost| {
lost.k += 1;
lost.feedback_time = now;
lost.data_sequence_number
});
CompressedLossList::try_from_iter(loss_list)
}
pub fn drop_packets(&mut self, range: Range<SeqNumber>) -> usize {
let first_idx = self.clamped_index_for_seqno(range.start);
let last_idx = self.clamped_index_for_seqno(range.end);
self.buffer
.range_mut(first_idx..last_idx)
.filter_map(|p| p.drop_unreceived())
.count()
}
pub fn next_message_release_time(&self) -> Option<Instant> {
self.buffer
.front()
.filter(|p| p.is_first())?
.data_packet()
.map(|d| self.remote_clock.instant_from(d.timestamp) + self.tsbpd_latency)
}
fn append_next(&mut self, data: DataPacket) -> Result<DataPacketAction, DataPacketError> {
if self.buffer_available() == 0 {
Err(DataPacketError::BufferFull {
seq_number: data.seq_number,
buffer_size: self.buffer.len(),
})
} else {
self.append_data(data);
Ok(DataPacketAction::Received {
lrsn: self.lrsn,
recovered: false,
})
}
}
fn append_with_loss(
&mut self,
now: Instant,
data: DataPacket,
) -> Result<DataPacketAction, DataPacketError> {
let seq_number = data.seq_number;
let lost = self.next_packet_dsn()..seq_number;
let lost_count = lost.end - lost.start;
let buffer_required = usize::try_from(lost_count).unwrap() + 1; let buffer_available = self.buffer_available();
if buffer_available < buffer_required {
Err(DataPacketError::PacketTooEarly {
seq_number,
buffer_available,
buffer_required,
})
} else {
self.append_lost_packets(now, &lost);
self.append_data(data);
Ok(DataPacketAction::ReceivedWithLoss(lost.into()))
}
}
fn recover_data(&mut self, data: DataPacket) -> Result<DataPacketAction, DataPacketError> {
let seq_number = data.seq_number;
let index = self
.index_for_seqno(seq_number)
.ok_or(DataPacketError::PacketTooLate {
seq_number,
seq_number_0: self.seqno0,
})?;
self.buffer.get_mut(index).unwrap().update_data(data)?;
if self.lrsn == seq_number {
self.recalculate_lrsn(index);
}
Ok(DataPacketAction::Received {
lrsn: self.lrsn,
recovered: true,
})
}
fn append_data(&mut self, data: DataPacket) {
let seq_number = data.seq_number;
if self.lrsn == seq_number {
self.lrsn = seq_number + 1;
}
self.buffer.push_back(BufferPacket::Received(data));
}
fn append_lost_packets(&mut self, now: Instant, lost: &Range<SeqNumber>) {
let lost_count = lost.end - lost.start;
for i in 0..lost_count {
let loss = LostPacket::new(lost.start + i, now);
self.buffer.push_back(BufferPacket::Lost(loss));
}
}
fn lost_list_index(&self) -> usize {
self.buffer
.iter()
.take_while(|b| b.data_packet().is_some())
.count()
}
fn next_message_packet_count(&self) -> Option<usize> {
let first = self.buffer.front()?.data_packet()?;
self.buffer
.iter()
.take_while(|p| p.in_message(first.message_number))
.try_fold(MessagePacketCount::new(), |a, p| a.accumulate(p))?
.calculate()
}
fn drop_too_late_packets(
&mut self,
now: Instant,
) -> Result<Option<(Instant, Bytes)>, MessageError> {
if !self.too_late_packet_drop {
return Ok(None);
}
let data_packets = self.buffer.iter().map(|packet| {
packet.data_packet().map(|data| {
(
data.seq_number,
self.remote_clock.instant_from(data.timestamp),
data.message_loc,
)
})
});
let tsbpd_threshold = now - self.tsbpd_latency - self.tsbpd_tolerance;
let too_late_packets = data_packets.take_while(|packet| {
packet.map_or(true, |(_, packet_time, message_loc)| {
packet_time <= tsbpd_threshold || !message_loc.contains(PacketLocation::FIRST)
})
});
let dropped_packets = too_late_packets.fold(None, |last, packet| match (last, packet) {
(None, Some((seq_number, packet_time, _))) => Some((seq_number, packet_time)),
(Some((_, first_packet_time)), Some((seq_number, _, _))) => {
Some((seq_number, first_packet_time))
}
(last, _) => last,
});
let (last_seq_number, first_packet_time) = match dropped_packets {
Some(packets) => packets,
None => return Ok(None),
};
let begin_packet = self.seqno0;
let end_packet = last_seq_number + 1;
let drop_count = end_packet.saturating_sub(begin_packet);
self.seqno0 = end_packet;
self.buffer.drain(0..drop_count);
self.recalculate_lrsn(0);
Err(MessageError {
delay: TimeSpan::from_interval(first_packet_time + self.tsbpd_latency, now),
too_late_packets: begin_packet..end_packet,
})
}
fn recalculate_lrsn(&mut self, start_idx: usize) {
self.lrsn = self
.buffer
.range(start_idx..)
.filter_map(|p| p.lost_or_dropped())
.next()
.unwrap_or_else(|| self.next_packet_dsn())
}
pub fn rx_acknowledged_time(&self) -> Duration {
let start_idx = 0;
let end_idx = self.clamped_index_for_seqno(self.lrsn - 1);
if let (Some(BufferPacket::Received(s)), Some(BufferPacket::Received(e))) =
(self.buffer.get(start_idx), self.buffer.get(end_idx))
{
Duration::from_micros(
u64::try_from((e.timestamp - s.timestamp).as_micros()).unwrap_or(0),
)
} else {
Duration::from_micros(0)
}
}
}
#[cfg(test)]
mod receive_buffer {
use pretty_assertions::assert_eq;
use super::*;
use DataPacketAction::*;
use DataPacketError::*;
fn basic_pack() -> DataPacket {
DataPacket {
seq_number: SeqNumber(1),
message_loc: PacketLocation::ONLY,
in_order_delivery: false,
encryption: DataEncryption::None,
retransmitted: false,
message_number: MsgNumber(0),
timestamp: TimeStamp::from_micros(0),
dest_sockid: SocketId(4),
payload: b"basic payload"[..].into(),
}
}
#[test]
fn not_ready_empty() {
let tsbpd = Duration::from_secs(2);
let start = Instant::now();
let init_seq_num = SeqNumber(3);
let mut buf = ReceiveBuffer::new(start, tsbpd, true, init_seq_num, PacketCount(8192));
assert_eq!(buf.next_ack_dsn(), init_seq_num);
assert_eq!(buf.next_message_release_time(), None);
assert_eq!(buf.pop_next_message(start), Ok(None));
}
#[test]
fn multi_packet_message_not_ready() {
let tsbpd = Duration::from_secs(2);
let start = Instant::now();
let init_seq_num = SeqNumber(5);
let mut buf = ReceiveBuffer::new(start, tsbpd, true, init_seq_num, PacketCount(8192));
assert_eq!(
buf.push_packet(
start,
DataPacket {
seq_number: init_seq_num,
message_loc: PacketLocation::FIRST,
..basic_pack()
}
),
Ok(Received {
lrsn: init_seq_num + 1,
recovered: false
})
);
assert_eq!(buf.next_ack_dsn(), init_seq_num + 1);
assert_eq!(buf.next_message_release_time(), Some(start + tsbpd));
assert_eq!(buf.pop_next_message(start + tsbpd), Ok(None));
}
#[test]
fn multi_packet_message_lost_last_packet() {
let tsbpd = Duration::from_secs(2);
let start = Instant::now();
let init_seq_num = SeqNumber(0);
let mut buf = ReceiveBuffer::new(start, tsbpd, true, init_seq_num, PacketCount(8192));
assert_eq!(
buf.push_packet(
start,
DataPacket {
seq_number: init_seq_num,
message_loc: PacketLocation::FIRST,
..basic_pack()
}
),
Ok(Received {
lrsn: init_seq_num + 1,
recovered: false
})
);
assert_eq!(buf.next_ack_dsn(), init_seq_num + 1);
assert_eq!(buf.next_message_release_time(), Some(start + tsbpd));
assert_eq!(buf.pop_next_message(start + tsbpd), Ok(None));
assert_eq!(
buf.push_packet(
start,
DataPacket {
seq_number: init_seq_num + 2,
message_loc: PacketLocation::FIRST,
..basic_pack()
}
),
Ok(ReceivedWithLoss([init_seq_num + 1].iter().collect()))
);
assert_eq!(buf.next_ack_dsn(), init_seq_num + 1);
assert_eq!(buf.next_message_release_time(), Some(start + tsbpd));
assert_eq!(
buf.pop_next_message(start + tsbpd * 2),
Err(MessageError {
too_late_packets: SeqNumber(0)..SeqNumber(3),
delay: TimeSpan::from_millis(2_000)
})
);
}
#[test]
fn multi_packet_message_incomplete_transmission() {
let tsbpd = Duration::from_secs(2);
let start = Instant::now();
let init_seq_num = SeqNumber(5);
let mut buf = ReceiveBuffer::new(start, tsbpd, true, init_seq_num, PacketCount(8192));
assert_eq!(
buf.push_packet(
start,
DataPacket {
seq_number: init_seq_num,
message_loc: PacketLocation::FIRST,
..basic_pack()
}
),
Ok(Received {
lrsn: init_seq_num + 1,
recovered: false
})
);
assert_eq!(buf.next_ack_dsn(), init_seq_num + 1);
assert_eq!(buf.next_message_release_time(), Some(start + tsbpd));
assert_eq!(buf.pop_next_message(start + tsbpd), Ok(None));
assert_eq!(buf.pop_next_message(start + tsbpd), Ok(None));
assert_eq!(
buf.push_packet(
start,
DataPacket {
seq_number: init_seq_num + 1,
message_loc: PacketLocation::empty(),
..basic_pack()
}
),
Ok(Received {
lrsn: init_seq_num + 2,
recovered: false
})
);
assert_eq!(buf.next_ack_dsn(), init_seq_num + 2);
assert_eq!(buf.next_message_release_time(), Some(start + tsbpd));
assert_eq!(buf.pop_next_message(start + tsbpd), Ok(None));
}
#[test]
fn single_packet_message_ready() {
let tsbpd = Duration::from_secs(2);
let start = Instant::now();
let init_seq_num = SeqNumber(5);
let mut buf = ReceiveBuffer::new(start, tsbpd, true, init_seq_num, PacketCount(8192));
assert_eq!(
buf.push_packet(
start,
DataPacket {
seq_number: init_seq_num,
message_loc: PacketLocation::FIRST | PacketLocation::LAST,
payload: b"hello"[..].into(),
..basic_pack()
}
),
Ok(Received {
lrsn: init_seq_num + 1,
recovered: false
})
);
assert_eq!(
buf.push_packet(
start,
DataPacket {
seq_number: init_seq_num + 1,
message_loc: PacketLocation::empty(),
payload: b"no"[..].into(),
..basic_pack()
}
),
Ok(Received {
lrsn: init_seq_num + 2,
recovered: false
})
);
assert_eq!(buf.next_ack_dsn(), init_seq_num + 2);
assert_eq!(buf.next_message_release_time(), Some(start + tsbpd));
assert_eq!(
buf.pop_next_message(start + tsbpd * 2),
Ok(Some((start, b"hello"[..].into())))
);
}
#[test]
fn push_packet_multi_packet_message_ready() {
let tsbpd = Duration::from_secs(2);
let start = Instant::now();
let init_seq_num = SeqNumber(5);
let mut buf = ReceiveBuffer::new(start, tsbpd, true, init_seq_num, PacketCount(8192));
assert_eq!(
buf.push_packet(
start,
DataPacket {
seq_number: init_seq_num,
message_loc: PacketLocation::FIRST,
payload: b"hello"[..].into(),
..basic_pack()
}
),
Ok(Received {
lrsn: init_seq_num + 1,
recovered: false
})
);
assert_eq!(
buf.push_packet(
start,
DataPacket {
seq_number: init_seq_num + 1,
message_loc: PacketLocation::empty(),
payload: b"yas"[..].into(),
..basic_pack()
}
),
Ok(Received {
lrsn: init_seq_num + 2,
recovered: false
})
);
assert_eq!(
buf.push_packet(
start,
DataPacket {
seq_number: init_seq_num + 2,
message_loc: PacketLocation::LAST,
payload: b"nas"[..].into(),
..basic_pack()
}
),
Ok(Received {
lrsn: init_seq_num + 3,
recovered: false
})
);
assert_eq!(buf.next_ack_dsn(), init_seq_num + 3);
assert_eq!(buf.next_message_release_time(), Some(start + tsbpd));
assert_eq!(buf.pop_next_message(start), Ok(None));
assert_eq!(
buf.pop_next_message(start + tsbpd * 2),
Ok(Some((start, b"helloyasnas"[..].into())))
);
assert_eq!(buf.buffer.len(), 0);
}
#[test]
fn prepare_loss_list() {
let tsbpd = Duration::from_secs(2);
let start = Instant::now();
let init_seq_num = SeqNumber(5);
let mean_rtt = TimeSpan::from_micros(10_000);
let mut buf = ReceiveBuffer::new(start, tsbpd, true, init_seq_num, PacketCount(8192));
assert_eq!(buf.prepare_loss_list(start, mean_rtt), None);
let now = start;
assert_eq!(
buf.push_packet(
now,
DataPacket {
seq_number: init_seq_num,
message_loc: PacketLocation::FIRST,
payload: b"hello"[..].into(),
..basic_pack()
}
),
Ok(Received {
lrsn: init_seq_num + 1,
recovered: false
})
);
assert_eq!(
buf.push_packet(
now,
DataPacket {
seq_number: init_seq_num + 5,
message_loc: PacketLocation::LAST,
payload: b"yas"[..].into(),
..basic_pack()
},
),
Ok(ReceivedWithLoss(
(init_seq_num + 1..init_seq_num + 5).into()
))
);
assert_eq!(buf.prepare_loss_list(now, mean_rtt), None);
let now = now + mean_rtt;
assert_eq!(
buf.push_packet(
now,
DataPacket {
seq_number: init_seq_num + 15,
message_loc: PacketLocation::LAST,
payload: b"nas"[..].into(),
..basic_pack()
}
),
Ok(ReceivedWithLoss(
(init_seq_num + 6..init_seq_num + 15).into()
))
);
assert_eq!(buf.prepare_loss_list(now, mean_rtt), None);
let now = now + mean_rtt * 3;
assert_eq!(
buf.prepare_loss_list(now, mean_rtt),
Some((1..5).chain(6..15).map(|a| init_seq_num + a).collect())
);
assert_eq!(buf.prepare_loss_list(now, mean_rtt), None);
}
#[test]
fn drop_too_late_packets() {
let _ = pretty_env_logger::try_init();
let tsbpd = Duration::from_secs(2);
let start = Instant::now();
let init_seq_num = SeqNumber(0);
let mut buf = ReceiveBuffer::new(start, tsbpd, true, init_seq_num, PacketCount(8192));
let now = start;
let _ = buf.push_packet(
now,
DataPacket {
seq_number: init_seq_num + 1,
message_loc: PacketLocation::FIRST,
..basic_pack()
},
);
assert_eq!(buf.pop_next_message(now), Ok(None));
assert_eq!(buf.next_ack_dsn(), init_seq_num);
let now = now + tsbpd;
let _ = buf.push_packet(
now,
DataPacket {
timestamp: TimeStamp::MIN + tsbpd,
seq_number: init_seq_num + 3,
payload: b"test"[..].into(),
..basic_pack()
},
);
assert_eq!(buf.pop_next_message(now), Ok(None));
assert_eq!(buf.next_ack_dsn(), init_seq_num);
let now = now + Duration::from_millis(5);
assert_eq!(
buf.pop_next_message(now),
Err(MessageError {
too_late_packets: SeqNumber(0)..SeqNumber(2),
delay: TimeSpan::from_millis(5)
})
);
assert_eq!(buf.next_ack_dsn(), SeqNumber(2));
assert_eq!(buf.pop_next_message(now), Ok(None));
assert_eq!(buf.next_ack_dsn(), SeqNumber(2));
}
#[test]
fn not_dropping_too_late_packets() {
let _ = pretty_env_logger::try_init();
let tsbpd = Duration::from_secs(2);
let start = Instant::now();
let init_seq_num = SeqNumber(5);
let mut buf = ReceiveBuffer::new(start, tsbpd, false, init_seq_num, PacketCount(8192));
let now = start;
let _ = buf.push_packet(
now,
DataPacket {
seq_number: init_seq_num + 1,
message_loc: PacketLocation::FIRST,
payload: b"hello"[..].into(),
..basic_pack()
},
);
let _ = buf.push_packet(
now,
DataPacket {
seq_number: init_seq_num + 2,
message_loc: PacketLocation::MIDDLE,
payload: b"hello"[..].into(),
..basic_pack()
},
);
assert_eq!(buf.pop_next_message(now), Ok(None));
assert_eq!(buf.next_ack_dsn(), init_seq_num);
let now = now + tsbpd;
let _ = buf.push_packet(
now,
DataPacket {
timestamp: TimeStamp::MIN + tsbpd,
seq_number: init_seq_num + 5,
message_loc: PacketLocation::ONLY,
payload: b"yas"[..].into(),
..basic_pack()
},
);
assert_eq!(buf.pop_next_message(now), Ok(None));
assert_eq!(buf.next_ack_dsn(), init_seq_num);
let now = now + Duration::from_millis(5);
assert_eq!(buf.pop_next_message(now), Ok(None));
assert_eq!(buf.next_ack_dsn(), init_seq_num, "{buf:?}");
let now = now + tsbpd + Duration::from_millis(5);
assert_eq!(buf.pop_next_message(now), Ok(None));
assert_eq!(buf.next_ack_dsn(), init_seq_num);
assert_eq!(buf.pop_next_message(now), Ok(None));
assert_eq!(buf.next_ack_dsn(), init_seq_num);
}
#[test]
fn drop_message() {
let tsbpd = Duration::from_secs(2);
let start = Instant::now();
let init_seq_num = SeqNumber(5);
let mean_rtt = TimeSpan::from_micros(10_000);
let mut buf = ReceiveBuffer::new(start, tsbpd, true, init_seq_num, PacketCount(8192));
let now = start;
assert_eq!(
buf.push_packet(
now,
DataPacket {
seq_number: init_seq_num + 3,
message_loc: PacketLocation::LAST,
payload: b"yas"[..].into(),
..basic_pack()
},
),
Ok(ReceivedWithLoss((init_seq_num..init_seq_num + 3).into()))
);
assert_eq!(buf.drop_packets(init_seq_num + 9..init_seq_num + 12), 0);
assert_eq!(buf.drop_packets(init_seq_num - 1..init_seq_num + 5), 3);
let now = now + mean_rtt * 3;
assert_eq!(buf.prepare_loss_list(now, mean_rtt), None);
}
#[test]
fn buffer_sizing() {
let tsbpd = Duration::from_secs(2);
let start = Instant::now();
let init_seq_num = SeqNumber(5);
let mut buf = ReceiveBuffer::new(start, tsbpd, true, init_seq_num, PacketCount(10));
assert_eq!(buf.buffer_available(), 10);
assert_eq!(
buf.push_packet(
start,
DataPacket {
seq_number: init_seq_num,
..basic_pack()
},
),
Ok(Received {
lrsn: init_seq_num + 1,
recovered: false
})
);
assert_eq!(buf.buffer_available(), 9);
assert_eq!(
buf.push_packet(
start,
DataPacket {
seq_number: init_seq_num + 8,
..basic_pack()
},
),
Ok(ReceivedWithLoss(
(init_seq_num + 1..init_seq_num + 8).into()
))
);
assert_eq!(buf.buffer_available(), 1);
assert_eq!(
buf.push_packet(
start,
DataPacket {
seq_number: init_seq_num + 10,
..basic_pack()
},
),
Err(PacketTooEarly {
seq_number: init_seq_num + 10,
buffer_available: 1,
buffer_required: 2
})
);
assert_eq!(buf.buffer_available(), 1);
assert_eq!(
buf.push_packet(
start,
DataPacket {
seq_number: init_seq_num + 9,
..basic_pack()
},
),
Ok(Received {
lrsn: init_seq_num + 1,
recovered: false
})
);
assert_eq!(buf.buffer_available(), 0);
assert_eq!(
buf.push_packet(
start,
DataPacket {
seq_number: init_seq_num + 10,
..basic_pack()
},
),
Err(BufferFull {
seq_number: init_seq_num + 10,
buffer_size: 10
})
);
assert_eq!(buf.buffer_available(), 0);
buf.pop_next_message(start + tsbpd).unwrap();
assert_eq!(buf.buffer_available(), 1);
assert_eq!(
buf.push_packet(
start + tsbpd,
DataPacket {
seq_number: init_seq_num + 10,
..basic_pack()
},
),
Ok(Received {
lrsn: init_seq_num + 1,
recovered: false
})
);
assert_eq!(buf.buffer_available(), 0);
}
#[test]
fn rx_acknowledged_time() {
let tsbpd = Duration::from_secs(2);
let start = Instant::now();
let init_seq_num = SeqNumber(5);
let mut buf = ReceiveBuffer::new(start, tsbpd, true, init_seq_num, PacketCount(10));
let add_packet = |i, buf: &mut ReceiveBuffer| {
buf.push_packet(
start + Duration::from_micros(u64::from(i) * 10),
DataPacket {
seq_number: init_seq_num + i,
timestamp: TimeStamp::from_micros(i * 10),
..basic_pack()
},
)
.unwrap();
};
assert_eq!(buf.rx_acknowledged_time(), Duration::from_secs(0));
add_packet(0, &mut buf);
assert_eq!(buf.rx_acknowledged_time(), Duration::from_secs(0));
add_packet(1, &mut buf);
assert_eq!(buf.rx_acknowledged_time(), Duration::from_micros(10));
add_packet(3, &mut buf);
assert_eq!(buf.rx_acknowledged_time(), Duration::from_micros(10));
add_packet(2, &mut buf);
assert_eq!(buf.rx_acknowledged_time(), Duration::from_micros(30));
buf.pop_next_message(start + tsbpd + Duration::from_micros(10))
.unwrap()
.unwrap();
assert_eq!(buf.rx_acknowledged_time(), Duration::from_micros(20));
buf.pop_next_message(start + tsbpd + Duration::from_micros(20))
.unwrap()
.unwrap();
assert_eq!(buf.rx_acknowledged_time(), Duration::from_micros(10));
buf.pop_next_message(start + tsbpd + Duration::from_micros(30))
.unwrap()
.unwrap();
assert_eq!(buf.rx_acknowledged_time(), Duration::from_micros(0));
buf.pop_next_message(start + tsbpd + Duration::from_micros(40))
.unwrap()
.unwrap();
assert_eq!(buf.rx_acknowledged_time(), Duration::from_micros(0));
assert_eq!(
buf.pop_next_message(start + tsbpd + Duration::from_micros(40))
.unwrap(),
None
);
assert_eq!(buf.rx_acknowledged_time(), Duration::from_micros(0));
}
}