use std::collections::hash_map::IterMut;
use std::collections::VecDeque;
use std::time::Duration;
use std::time::Instant;
use rustc_hash::FxHashMap;
use crate::frame;
use crate::packet;
use crate::ranges::RangeSet;
use crate::tls::Level;
use crate::window::SeqNumWindow;
pub const MAX_NON_ACK_ELICITING: usize = 24;
pub const SPACE_COUNT: usize = 3;
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
#[repr(u64)]
pub enum SpaceId {
Initial = 0,
Handshake = 1,
Data = 2,
DataExt(u64),
}
impl SpaceId {
pub fn to_level(self) -> Level {
match self {
SpaceId::Initial => Level::Initial,
SpaceId::Handshake => Level::Handshake,
SpaceId::Data => Level::OneRTT,
SpaceId::DataExt(..) => Level::OneRTT,
}
}
}
pub struct PacketNumSpace {
pub id: SpaceId,
pub next_pkt_num: u64,
pub consecutive_non_ack_eliciting_sent: usize,
pub lowest_1rtt_pkt_num: u64,
pub largest_rx_pkt_num: u64,
pub first_pkt_num_recv: Option<u64>,
pub first_pkt_num_sent: Option<u64>,
pub largest_rx_pkt_time: Instant,
pub largest_rx_non_probing_pkt_num: u64,
pub largest_rx_ack_eliciting_pkt_num: u64,
pub recv_pkt_num_need_ack: RangeSet,
pub recv_pkt_num_win: SeqNumWindow,
pub need_send_ack: bool,
pub ack_eliciting_pkts_since_last_sent_ack: u64,
pub ack_timer: Option<Instant>,
pub sent: VecDeque<SentPacket>,
pub lost: Vec<frame::Frame>,
pub acked: Vec<frame::Frame>,
pub buffered: BufferQueue,
pub time_of_last_sent_ack_eliciting_pkt: Option<Instant>,
pub loss_time: Option<Instant>,
pub largest_acked_pkt: u64,
pub loss_probes: usize,
pub bytes_in_flight: usize,
pub ack_eliciting_in_flight: u64,
pub is_data: bool,
}
impl PacketNumSpace {
pub fn new(id: SpaceId) -> Self {
PacketNumSpace {
id,
next_pkt_num: 0,
consecutive_non_ack_eliciting_sent: 0,
lowest_1rtt_pkt_num: u64::MAX,
largest_rx_pkt_num: 0,
first_pkt_num_recv: None,
first_pkt_num_sent: None,
largest_rx_pkt_time: Instant::now(),
largest_rx_non_probing_pkt_num: 0,
largest_rx_ack_eliciting_pkt_num: 0,
recv_pkt_num_need_ack: RangeSet::new(crate::MAX_ACK_RANGES),
recv_pkt_num_win: SeqNumWindow::default(),
need_send_ack: false,
ack_eliciting_pkts_since_last_sent_ack: 0,
ack_timer: None,
sent: VecDeque::new(),
lost: Vec::new(),
acked: Vec::new(),
buffered: BufferQueue::default(),
time_of_last_sent_ack_eliciting_pkt: None,
loss_time: None,
largest_acked_pkt: u64::MAX,
loss_probes: 0,
bytes_in_flight: 0,
ack_eliciting_in_flight: 0,
is_data: id != SpaceId::Initial && id != SpaceId::Handshake,
}
}
pub fn detect_duplicated_pkt_num(&mut self, pkt_num: u64) -> bool {
self.recv_pkt_num_win.contains(pkt_num)
}
pub fn need_elicit_ack(&self) -> bool {
if self.consecutive_non_ack_eliciting_sent >= MAX_NON_ACK_ELICITING {
return true;
}
self.loss_probes > 0
}
pub fn need_send_buffered_frames(&self) -> bool {
!self.buffered.is_empty()
}
pub fn get_largest_acked_pkt(&self) -> Option<u64> {
if self.largest_acked_pkt != u64::MAX {
Some(self.largest_acked_pkt)
} else {
None
}
}
}
pub struct PacketNumSpaceMap {
spaces: FxHashMap<u64, PacketNumSpace>,
next_data_ext_id: u64,
}
impl PacketNumSpaceMap {
pub fn new() -> Self {
let mut m = PacketNumSpaceMap {
spaces: FxHashMap::default(),
next_data_ext_id: 3,
};
m.spaces.insert(0, PacketNumSpace::new(SpaceId::Initial));
m.spaces.insert(1, PacketNumSpace::new(SpaceId::Handshake));
m.spaces.insert(2, PacketNumSpace::new(SpaceId::Data));
m
}
pub fn get(&self, space_id: SpaceId) -> Option<&PacketNumSpace> {
match space_id {
SpaceId::Initial => self.spaces.get(&0),
SpaceId::Handshake => self.spaces.get(&1),
SpaceId::Data => self.spaces.get(&2),
SpaceId::DataExt(ref i) => self.spaces.get(i),
}
}
pub fn get_mut(&mut self, space_id: SpaceId) -> Option<&mut PacketNumSpace> {
match space_id {
SpaceId::Initial => self.spaces.get_mut(&0),
SpaceId::Handshake => self.spaces.get_mut(&1),
SpaceId::Data => self.spaces.get_mut(&2),
SpaceId::DataExt(ref i) => self.spaces.get_mut(i),
}
}
pub fn iter_mut(&mut self) -> IterMut<'_, u64, PacketNumSpace> {
self.spaces.iter_mut()
}
pub fn add(&mut self) -> SpaceId {
let space_id = SpaceId::DataExt(self.next_data_ext_id);
self.spaces
.insert(self.next_data_ext_id, PacketNumSpace::new(space_id));
self.next_data_ext_id += 1;
space_id
}
pub fn drop(&mut self, space_id: SpaceId) {
match space_id {
SpaceId::Initial => self.spaces.remove(&0),
SpaceId::Handshake => self.spaces.remove(&1),
SpaceId::Data => self.spaces.remove(&2),
SpaceId::DataExt(ref i) => self.spaces.remove(i),
};
}
pub fn need_send_buffered_frames(&self) -> bool {
for space in self.spaces.values() {
if space.need_send_buffered_frames() {
return true;
}
}
false
}
pub fn min_ack_timer(&self) -> Option<Instant> {
self.spaces.iter().filter_map(|(_, s)| s.ack_timer).min()
}
}
impl Default for PacketNumSpaceMap {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Default, Clone)]
pub struct RateSamplePacketState {
pub delivered: u64,
pub delivered_time: Option<Instant>,
pub first_sent_time: Option<Instant>,
pub is_app_limited: bool,
pub tx_in_flight: u64,
pub lost: u64,
}
#[derive(Clone)]
pub struct SentPacket {
pub pkt_type: packet::PacketType,
pub pkt_num: u64,
pub frames: Vec<frame::Frame>,
pub time_sent: Instant,
pub time_acked: Option<Instant>,
pub time_lost: Option<Instant>,
pub ack_eliciting: bool,
pub in_flight: bool,
pub has_data: bool,
pub pmtu_probe: bool,
pub pacing: bool,
pub sent_size: usize,
pub rate_sample_state: RateSamplePacketState,
pub buffer_flags: BufferFlags,
}
impl Default for SentPacket {
fn default() -> Self {
SentPacket {
pkt_type: packet::PacketType::OneRTT,
pkt_num: 0,
frames: vec![],
time_sent: Instant::now(),
time_acked: None,
time_lost: None,
ack_eliciting: false,
in_flight: false,
has_data: false,
pmtu_probe: false,
pacing: false,
sent_size: 0,
rate_sample_state: RateSamplePacketState::default(),
buffer_flags: BufferFlags::default(),
}
}
}
impl std::fmt::Debug for SentPacket {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "pn={:?}", self.pkt_num)?;
write!(f, " frames={:?}", self.frames)?;
write!(f, " sent_size={}", self.sent_size)?;
Ok(())
}
}
pub struct AckedPacket {
pub pkt_num: u64,
pub time_sent: Instant,
pub rtt: Duration,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum BufferType {
High = 0,
Mid = 1,
Low = 2,
}
impl From<usize> for BufferType {
fn from(index: usize) -> BufferType {
match index {
0 => BufferType::High,
1 => BufferType::Mid,
_ => BufferType::Low,
}
}
}
#[derive(Default)]
pub struct BufferQueue {
queues: [VecDeque<frame::Frame>; 3],
count: usize,
}
impl BufferQueue {
pub fn pop_front(&mut self) -> Option<(frame::Frame, BufferType)> {
for (i, queue) in self.queues.iter_mut().enumerate() {
if !queue.is_empty() {
self.count -= 1;
return Some((queue.pop_front().unwrap(), BufferType::from(i)));
}
}
None
}
pub fn push_front(&mut self, frame: frame::Frame, queue_type: BufferType) {
self.count += 1;
self.queues[queue_type as usize].push_front(frame)
}
pub fn push_back(&mut self, frame: frame::Frame, queue_type: BufferType) {
self.count += 1;
self.queues[queue_type as usize].push_back(frame)
}
pub fn append(&mut self, frames: &mut VecDeque<frame::Frame>, queue_type: BufferType) {
self.count += frames.len();
self.queues[queue_type as usize].append(frames)
}
pub fn len(&self) -> usize {
self.count
}
pub fn is_empty(&self) -> bool {
self.count == 0
}
}
#[derive(Clone, Default, Debug)]
pub struct BufferFlags {
pub from_high: bool,
pub from_mid: bool,
pub from_low: bool,
}
impl BufferFlags {
pub fn has_buffered(&self) -> bool {
self.from_high || self.from_mid || self.from_low
}
pub fn mark(&mut self, queue_type: BufferType) {
match queue_type {
BufferType::High => self.from_high = true,
BufferType::Mid => self.from_mid = true,
BufferType::Low => self.from_low = true,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::frame::Frame;
#[test]
fn initial_spaces() {
let mut spaces = PacketNumSpaceMap::default();
assert_eq!(spaces.iter_mut().count(), 3);
assert_eq!(
spaces.get_mut(SpaceId::Initial).unwrap().id,
SpaceId::Initial
);
assert_eq!(
spaces.get_mut(SpaceId::Handshake).unwrap().id,
SpaceId::Handshake
);
assert_eq!(spaces.get_mut(SpaceId::Data).unwrap().id, SpaceId::Data);
assert_eq!(
spaces.get(SpaceId::Initial).unwrap().id.to_level(),
Level::Initial
);
assert_eq!(
spaces.get(SpaceId::Handshake).unwrap().id.to_level(),
Level::Handshake
);
assert_eq!(
spaces.get(SpaceId::Data).unwrap().id.to_level(),
Level::OneRTT
);
let space = spaces.get_mut(SpaceId::Initial).unwrap();
assert_eq!(space.detect_duplicated_pkt_num(0), false);
assert_eq!(space.detect_duplicated_pkt_num(5), false);
}
#[test]
fn extra_spaces() {
let mut spaces = PacketNumSpaceMap::default();
assert!(spaces.get(SpaceId::DataExt(3)).is_none());
let space_id = spaces.add();
assert_eq!(spaces.iter_mut().count(), 4);
assert_eq!(space_id, SpaceId::DataExt(3));
assert!(spaces.get(space_id).is_some());
assert_eq!(
spaces.get_mut(space_id).unwrap().id.to_level(),
Level::OneRTT
);
spaces.drop(space_id);
assert!(spaces.get(SpaceId::DataExt(3)).is_none());
assert_eq!(spaces.iter_mut().count(), 3);
spaces.add();
spaces.drop(SpaceId::Initial);
spaces.drop(SpaceId::Handshake);
spaces.drop(SpaceId::Data);
assert_eq!(spaces.iter_mut().count(), 1);
}
#[test]
fn sent_packet() {
let sent_pkt = SentPacket {
pkt_num: 9,
frames: vec![
frame::Frame::Ping { pmtu_probe: None },
frame::Frame::Paddings { len: 200 },
],
time_sent: Instant::now(),
time_acked: None,
time_lost: None,
ack_eliciting: true,
in_flight: true,
has_data: false,
sent_size: 240,
rate_sample_state: Default::default(),
..SentPacket::default()
};
assert_eq!(
format!("{:?}", sent_pkt),
"pn=9 frames=[PING, PADDINGS len=200] sent_size=240"
);
}
#[test]
fn buffer_queue() {
let mut queue = BufferQueue::default();
assert_eq!(queue.len(), 0);
assert_eq!(queue.is_empty(), true);
let f1 = Frame::MaxStreamData {
stream_id: 4,
max: 10240,
};
queue.push_back(f1.clone(), BufferType::High);
assert_eq!(queue.len(), 1);
assert_eq!(queue.is_empty(), false);
let f2 = Frame::MaxStreamData {
stream_id: 8,
max: 24000,
};
queue.push_front(f2.clone(), BufferType::High);
assert_eq!(queue.len(), 2);
assert_eq!(queue.is_empty(), false);
let f3 = Frame::Ping { pmtu_probe: None };
queue.push_back(f3.clone(), BufferType::Low);
assert_eq!(queue.pop_front(), Some((f2.clone(), BufferType::High)));
assert_eq!(queue.pop_front(), Some((f1.clone(), BufferType::High)));
assert_eq!(queue.pop_front(), Some((f3.clone(), BufferType::Low)));
assert_eq!(queue.pop_front(), None);
assert_eq!(queue.is_empty(), true);
let mut fs = VecDeque::new();
fs.push_back(f1.clone());
fs.push_back(f2.clone());
queue.append(&mut fs, BufferType::Mid);
assert_eq!(queue.len(), 2);
assert_eq!(fs.len(), 0);
assert_eq!(queue.pop_front(), Some((f1.clone(), BufferType::Mid)));
assert_eq!(queue.pop_front(), Some((f2.clone(), BufferType::Mid)));
}
#[test]
fn buffer_flags() {
use BufferType::*;
let cases = [
(vec![], false),
(vec![High], true),
(vec![Mid], true),
(vec![Low], true),
(vec![Low, High], true),
(vec![Low, Mid], true),
(vec![High, Mid], true),
(vec![High, Mid, Low], true),
];
for case in cases {
let mut flags = BufferFlags::default();
for flag in case.0 {
flags.mark(flag);
}
assert_eq!(flags.has_buffered(), case.1);
}
}
}