use crate::bytes::{Bytes, BytesMut};
use crate::net::atp::protocol::quic_frames::{QuicFrame, QuicFrameError};
use std::collections::{BTreeMap, VecDeque};
pub const MAX_UDP_PAYLOAD: usize = 65535;
pub const DEFAULT_IPV4_MTU: usize = 1500;
pub const DEFAULT_IPV6_MTU: usize = 1500;
pub const MIN_INITIAL_PACKET_SIZE: usize = 1200;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum PacketNumberSpace {
Initial,
Handshake,
ApplicationData,
}
#[derive(Debug, Clone)]
pub struct PacketConstraints {
pub max_packet_size: usize,
pub congestion_window: usize,
pub bytes_in_flight: usize,
pub anti_amplification_limit: Option<usize>,
pub packet_number_space: PacketNumberSpace,
}
impl PacketConstraints {
pub fn new() -> Self {
Self {
max_packet_size: DEFAULT_IPV4_MTU - 40, congestion_window: 10 * 1460, bytes_in_flight: 0,
anti_amplification_limit: Some(MIN_INITIAL_PACKET_SIZE * 3), packet_number_space: PacketNumberSpace::Initial,
}
}
pub fn with_mtu(mut self, mtu: usize) -> Self {
self.max_packet_size = mtu.saturating_sub(40); self
}
pub fn with_congestion_window(mut self, window: usize) -> Self {
self.congestion_window = window;
self
}
pub fn with_packet_number_space(mut self, space: PacketNumberSpace) -> Self {
self.packet_number_space = space;
self
}
pub fn without_anti_amplification(mut self) -> Self {
self.anti_amplification_limit = None;
self
}
pub fn available_packet_budget(&self) -> usize {
let congestion_available = self.congestion_window.saturating_sub(self.bytes_in_flight);
let mut budget = congestion_available.min(self.max_packet_size);
if let Some(amp_limit) = self.anti_amplification_limit {
budget = budget.min(amp_limit);
}
budget
}
}
impl Default for PacketConstraints {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct PrioritizedFrame {
pub frame: QuicFrame,
pub priority: u8,
pub retransmittable: bool,
pub ack_eliciting: bool,
}
impl PrioritizedFrame {
pub fn new(frame: QuicFrame) -> Self {
let (priority, retransmittable, ack_eliciting) = match &frame {
QuicFrame::ConnectionClose { .. } => (255, true, true),
QuicFrame::HandshakeDone => (200, true, true),
QuicFrame::Crypto { .. } => (180, true, true),
QuicFrame::ResetStream { .. } => (160, true, true),
QuicFrame::StopSending { .. } => (160, true, true),
QuicFrame::MaxData { .. } => (140, true, true),
QuicFrame::MaxStreamData { .. } => (140, true, true),
QuicFrame::MaxStreams { .. } => (140, true, true),
QuicFrame::DataBlocked { .. } => (120, true, true),
QuicFrame::StreamDataBlocked { .. } => (120, true, true),
QuicFrame::StreamsBlocked { .. } => (120, true, true),
QuicFrame::PathChallenge { .. } => (100, true, true),
QuicFrame::PathResponse { .. } => (100, true, true),
QuicFrame::Stream { .. } => (80, true, true),
QuicFrame::Ack { .. } => (60, false, false),
QuicFrame::Ping => (40, true, true),
QuicFrame::Padding { .. } => (0, false, false),
};
Self {
frame,
priority,
retransmittable,
ack_eliciting,
}
}
pub fn with_priority(mut self, priority: u8) -> Self {
self.priority = priority;
self
}
pub fn estimated_size(&self) -> usize {
match &self.frame {
QuicFrame::Padding { length } => *length,
QuicFrame::Ping => 1,
QuicFrame::Ack {
ack_ranges,
ecn_counts,
..
} => {
20 + (ack_ranges.len() * 4) + if ecn_counts.is_some() { 6 } else { 0 }
}
QuicFrame::ResetStream { .. } => 12, QuicFrame::StopSending { .. } => 8, QuicFrame::Crypto { data, .. } => 8 + data.len(), QuicFrame::Stream { data, offset, .. } => {
let base = 4; let offset_size = if offset.is_some() { 4 } else { 0 };
let length_size = if !data.is_empty() { 4 } else { 0 };
base + offset_size + length_size + data.len()
}
QuicFrame::MaxData { .. } => 5, QuicFrame::MaxStreamData { .. } => 8, QuicFrame::MaxStreams { .. } => 5, QuicFrame::DataBlocked { .. } => 5, QuicFrame::StreamDataBlocked { .. } => 8, QuicFrame::StreamsBlocked { .. } => 5, QuicFrame::PathChallenge { .. } => 9, QuicFrame::PathResponse { .. } => 9, QuicFrame::ConnectionClose { reason_phrase, .. } => {
12 + reason_phrase.len() }
QuicFrame::HandshakeDone => 1, }
}
}
#[derive(Debug)]
pub struct PacketAssembler {
pending_frames: BTreeMap<u8, VecDeque<PrioritizedFrame>>,
pending_frame_count: usize,
constraints: PacketConstraints,
}
impl PacketAssembler {
pub fn new(constraints: PacketConstraints) -> Self {
Self {
pending_frames: BTreeMap::new(),
pending_frame_count: 0,
constraints,
}
}
pub fn add_frame(&mut self, frame: PrioritizedFrame) {
self.pending_frames
.entry(frame.priority)
.or_default()
.push_back(frame);
self.pending_frame_count += 1;
}
pub fn add_quic_frame(&mut self, frame: QuicFrame) {
self.add_frame(PrioritizedFrame::new(frame));
}
pub fn set_constraints(&mut self, constraints: PacketConstraints) {
self.constraints = constraints;
}
pub fn has_pending_frames(&self) -> bool {
self.pending_frame_count > 0
}
pub fn assemble_packet(&mut self) -> Result<Option<AssembledPacket>, PacketAssemblyError> {
if !self.has_pending_frames() {
return Ok(None);
}
let available_budget = self.constraints.available_packet_budget();
if available_budget < 4 {
return Ok(None);
}
let mut packet = AssembledPacket::new(self.constraints.packet_number_space);
let mut used_budget = 0;
let mut frames_added = 0;
while let Some(frame) = self.highest_priority_frame() {
let estimated_size = frame.estimated_size();
if used_budget + estimated_size > available_budget {
break;
}
if !is_frame_allowed_in_space(&frame.frame, self.constraints.packet_number_space) {
self.pop_highest_priority_frame();
continue;
}
let frame = match self.pop_highest_priority_frame() {
Some(frame) => frame,
None => break, };
used_budget += estimated_size;
if frame.ack_eliciting {
packet.ack_eliciting = true;
}
if frame.retransmittable {
packet.retransmittable = true;
}
packet.frames.push(frame.frame);
frames_added += 1;
if frames_added >= 64 {
break;
}
}
if packet.frames.is_empty() {
return Ok(None);
}
if self.constraints.packet_number_space == PacketNumberSpace::Initial
&& packet.estimated_size() < MIN_INITIAL_PACKET_SIZE
{
let padding_needed = MIN_INITIAL_PACKET_SIZE - packet.estimated_size();
if used_budget + padding_needed <= available_budget {
packet.frames.push(QuicFrame::Padding {
length: padding_needed,
});
}
}
packet.calculate_size();
Ok(Some(packet))
}
pub fn clear_pending_frames(&mut self) {
self.pending_frames.clear();
self.pending_frame_count = 0;
}
pub fn pending_frame_count(&self) -> usize {
self.pending_frame_count
}
fn highest_priority_frame(&self) -> Option<&PrioritizedFrame> {
self.pending_frames
.last_key_value()
.and_then(|(_priority, frames)| frames.front())
}
fn pop_highest_priority_frame(&mut self) -> Option<PrioritizedFrame> {
let priority = *self.pending_frames.last_key_value()?.0;
let frames = self.pending_frames.get_mut(&priority)?;
let frame = frames.pop_front()?;
self.pending_frame_count -= 1;
if frames.is_empty() {
self.pending_frames.remove(&priority);
}
Some(frame)
}
}
#[derive(Debug, Clone)]
pub struct AssembledPacket {
pub frames: Vec<QuicFrame>,
pub packet_number_space: PacketNumberSpace,
pub ack_eliciting: bool,
pub retransmittable: bool,
pub size: usize,
}
impl AssembledPacket {
pub fn new(packet_number_space: PacketNumberSpace) -> Self {
Self {
frames: Vec::new(),
packet_number_space,
ack_eliciting: false,
retransmittable: false,
size: 0,
}
}
pub fn encode_frames(&self) -> Result<Bytes, QuicFrameError> {
let mut buf = BytesMut::with_capacity(self.size);
for frame in &self.frames {
frame.encode(&mut buf)?;
}
Ok(buf.freeze())
}
fn calculate_size(&mut self) {
self.size = self
.frames
.iter()
.map(|frame| {
let mut temp_buf = BytesMut::new();
let _ = frame.encode(&mut temp_buf);
temp_buf.len()
})
.sum();
}
pub fn estimated_size(&self) -> usize {
self.size
}
}
fn is_frame_allowed_in_space(frame: &QuicFrame, space: PacketNumberSpace) -> bool {
match space {
PacketNumberSpace::Initial => {
matches!(
frame,
QuicFrame::Padding { .. }
| QuicFrame::Ping
| QuicFrame::Ack { .. }
| QuicFrame::Crypto { .. }
| QuicFrame::ConnectionClose { .. }
)
}
PacketNumberSpace::Handshake => {
matches!(
frame,
QuicFrame::Padding { .. }
| QuicFrame::Ping
| QuicFrame::Ack { .. }
| QuicFrame::Crypto { .. }
| QuicFrame::ConnectionClose { .. }
)
}
PacketNumberSpace::ApplicationData => {
true
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum PacketAssemblyError {
#[error("frame encoding error: {0}")]
FrameEncoding(#[from] QuicFrameError),
#[error("packet exceeds maximum size: {size} > {max}")]
PacketTooLarge {
size: usize,
max: usize,
},
#[error("no budget available for packet assembly")]
NoBudgetAvailable,
#[error("frame not allowed in packet number space: {space:?}")]
InvalidFrameForSpace {
space: PacketNumberSpace,
},
}
#[cfg(test)]
mod tests {
use super::*;
use crate::net::atp::protocol::varint::VarInt;
#[test]
fn test_packet_constraints_budget() {
let constraints = PacketConstraints::new()
.with_congestion_window(1000)
.with_mtu(1200);
assert_eq!(constraints.available_packet_budget(), 1000);
}
#[test]
fn test_frame_prioritization() {
let ping = PrioritizedFrame::new(QuicFrame::Ping);
let close = PrioritizedFrame::new(QuicFrame::ConnectionClose {
error_code: VarInt::new(0).unwrap(),
frame_type: None,
reason_phrase: Bytes::from_static(b"test"),
});
assert!(close.priority > ping.priority);
assert!(close.ack_eliciting);
assert!(ping.retransmittable); }
#[test]
fn test_packet_assembly() {
let mut assembler = PacketAssembler::new(PacketConstraints::new());
assembler.add_quic_frame(QuicFrame::Ping);
assembler.add_quic_frame(QuicFrame::MaxData {
maximum_data: VarInt::new(1024).unwrap(),
});
let packet = assembler.assemble_packet().unwrap().unwrap();
assert_eq!(packet.frames.len(), 2);
assert!(packet.ack_eliciting);
}
#[test]
fn test_packet_assembly_priority_buckets_preserve_order() {
let mut assembler = PacketAssembler::new(
PacketConstraints::new()
.with_packet_number_space(PacketNumberSpace::ApplicationData)
.without_anti_amplification(),
);
assembler.add_frame(PrioritizedFrame::new(QuicFrame::Ping).with_priority(10));
assembler.add_frame(
PrioritizedFrame::new(QuicFrame::MaxData {
maximum_data: VarInt::new(1024).unwrap(),
})
.with_priority(10),
);
assembler.add_frame(
PrioritizedFrame::new(QuicFrame::ConnectionClose {
error_code: VarInt::new(0).unwrap(),
frame_type: None,
reason_phrase: Bytes::from_static(b"test"),
})
.with_priority(20),
);
assert_eq!(assembler.pending_frame_count(), 3);
let packet = assembler.assemble_packet().unwrap().unwrap();
assert!(matches!(
packet.frames[0],
QuicFrame::ConnectionClose { .. }
));
assert!(matches!(packet.frames[1], QuicFrame::Ping));
assert!(matches!(packet.frames[2], QuicFrame::MaxData { .. }));
assert_eq!(assembler.pending_frame_count(), 0);
}
#[test]
fn test_packet_number_space_filtering() {
let mut assembler = PacketAssembler::new(
PacketConstraints::new().with_packet_number_space(PacketNumberSpace::Initial),
);
assembler.add_quic_frame(QuicFrame::Ping); assembler.add_quic_frame(QuicFrame::Stream {
stream_id: VarInt::new(0).unwrap(),
offset: None,
data: Bytes::from_static(b"test"),
fin: false,
}); assembler.add_quic_frame(QuicFrame::Crypto {
offset: VarInt::new(0).unwrap(),
data: Bytes::from_static(b"handshake"),
});
let packet = assembler.assemble_packet().unwrap().unwrap();
let non_padding_frames: Vec<_> = packet
.frames
.iter()
.filter(|frame| !matches!(frame, QuicFrame::Padding { .. }))
.collect();
assert_eq!(non_padding_frames.len(), 2);
assert!(matches!(non_padding_frames[0], QuicFrame::Crypto { .. })); assert!(matches!(non_padding_frames[1], QuicFrame::Ping));
}
#[test]
fn test_anti_amplification_padding() {
let mut assembler = PacketAssembler::new(
PacketConstraints::new()
.with_packet_number_space(PacketNumberSpace::Initial)
.with_mtu(2000), );
assembler.add_quic_frame(QuicFrame::Ping);
let packet = assembler.assemble_packet().unwrap().unwrap();
let has_padding = packet
.frames
.iter()
.any(|f| matches!(f, QuicFrame::Padding { .. }));
assert!(has_padding);
assert!(packet.estimated_size() >= MIN_INITIAL_PACKET_SIZE);
}
}