use crate::transport::packet_coalescer::{Decoalescer, PacketCoalescer};
use crate::transport::types::{
PacketFlags, PacketHeader, PhantomPacket, SequenceNumber, SessionId, StreamId,
};
pub fn wrap_coalesced_packet(
session_id: SessionId,
stream_id: StreamId,
sequence: SequenceNumber,
flags_extra: u16,
bundle: Vec<u8>,
) -> PhantomPacket {
let flag_bits = flags_extra | PacketFlags::COALESCED;
let header = PacketHeader::new(session_id, stream_id, sequence, PacketFlags::new(flag_bits));
PhantomPacket::new(header, bundle)
}
pub fn unwrap_coalesced_packet(
packet: &PhantomPacket,
) -> Result<Option<Vec<Vec<u8>>>, CoalescedParseError> {
if !packet.header.flags.contains(PacketFlags::COALESCED) {
return Ok(None);
}
if packet.payload.len() < 2 {
return Err(CoalescedParseError::EmptyOrTruncatedHeader);
}
let claimed_count = u16::from_be_bytes([packet.payload[0], packet.payload[1]]) as usize;
let decoder =
Decoalescer::new(&packet.payload).ok_or(CoalescedParseError::EmptyOrTruncatedHeader)?;
let mut out = Vec::with_capacity(claimed_count);
for sub in decoder {
out.push(sub.to_vec());
}
if out.len() != claimed_count {
return Err(CoalescedParseError::TruncatedSubPacket {
expected: claimed_count,
got: out.len(),
});
}
Ok(Some(out))
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CoalescedParseError {
EmptyOrTruncatedHeader,
TruncatedSubPacket { expected: usize, got: usize },
}
impl std::fmt::Display for CoalescedParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::EmptyOrTruncatedHeader => {
write!(f, "COALESCED payload truncated before the bundle header")
}
Self::TruncatedSubPacket { expected, got } => write!(
f,
"COALESCED bundle declared {} sub-packets, got {} well-formed",
expected, got
),
}
}
}
impl std::error::Error for CoalescedParseError {}
pub fn drain_coalescer_to_packets(
coalescer: &mut PacketCoalescer,
session_id: SessionId,
stream_id: StreamId,
next_sequence: &mut SequenceNumber,
flags_extra: u16,
) -> Vec<PhantomPacket> {
let mut out = Vec::new();
while let Some(bundle) = coalescer.flush() {
let pkt = wrap_coalesced_packet(session_id, stream_id, *next_sequence, flags_extra, bundle);
*next_sequence = next_sequence.wrapping_add(1);
out.push(pkt);
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use crate::transport::packet_coalescer::CoalescerConfig;
fn fixed_session_id() -> SessionId {
SessionId::from_bytes([0xAB; 32])
}
#[test]
fn wrap_sets_coalesced_flag_and_preserves_payload() {
let bundle = vec![0u8; 64];
let v = wrap_coalesced_packet(fixed_session_id(), 7, 1, 0, bundle.clone());
let v2 = &v;
assert!(v2.header.flags.contains(PacketFlags::COALESCED));
assert_eq!(v2.payload, bundle);
}
#[test]
fn wrap_or_s_in_extra_flags() {
let v = wrap_coalesced_packet(fixed_session_id(), 7, 1, PacketFlags::ENCRYPTED, vec![]);
let v2 = &v;
assert!(v2.header.flags.contains(PacketFlags::COALESCED));
assert!(v2.header.flags.contains(PacketFlags::ENCRYPTED));
}
#[test]
fn round_trip_three_subpayloads() {
let mut c = PacketCoalescer::new(CoalescerConfig::default());
c.push(b"aaaa");
c.push(b"bb");
c.push(b"cccccc");
let bundle = c.flush().expect("flush");
let packet = wrap_coalesced_packet(fixed_session_id(), 0, 0, 0, bundle);
let v2 = packet;
let parsed = unwrap_coalesced_packet(&v2)
.expect("ok")
.expect("coalesced flag matched");
assert_eq!(parsed.len(), 3);
assert_eq!(parsed[0], b"aaaa");
assert_eq!(parsed[1], b"bb");
assert_eq!(parsed[2], b"cccccc");
}
#[test]
fn unwrap_returns_none_when_not_coalesced() {
let header = PacketHeader::new(
fixed_session_id(),
0,
0,
PacketFlags::new(PacketFlags::ENCRYPTED),
);
let p = PhantomPacket::new(header, vec![0u8; 16]);
let parsed = unwrap_coalesced_packet(&p).expect("no error");
assert!(parsed.is_none());
}
#[test]
fn unwrap_errors_on_truncated_header() {
let header = PacketHeader::new(
fixed_session_id(),
0,
0,
PacketFlags::new(PacketFlags::COALESCED),
);
let p = PhantomPacket::new(header, vec![0u8]);
let err = unwrap_coalesced_packet(&p).expect_err("err");
assert_eq!(err, CoalescedParseError::EmptyOrTruncatedHeader);
}
#[test]
fn unwrap_errors_when_bundle_count_exceeds_actual_subs() {
let mut bundle = Vec::new();
bundle.extend_from_slice(&5u16.to_be_bytes()); bundle.extend_from_slice(&3u16.to_be_bytes()); bundle.extend_from_slice(b"abc");
let header = PacketHeader::new(
fixed_session_id(),
0,
0,
PacketFlags::new(PacketFlags::COALESCED),
);
let p = PhantomPacket::new(header, bundle);
let err = unwrap_coalesced_packet(&p).expect_err("err");
match err {
CoalescedParseError::TruncatedSubPacket { expected, got } => {
assert_eq!(expected, 5);
assert_eq!(got, 1);
}
other => panic!("unexpected error variant: {:?}", other),
}
}
#[test]
fn drain_coalescer_helper_emits_one_packet_per_flush() {
let mut c = PacketCoalescer::new(CoalescerConfig::default());
c.push(b"x");
c.push(b"yy");
let mut next_seq: SequenceNumber = 100;
let packets = drain_coalescer_to_packets(
&mut c,
fixed_session_id(),
3,
&mut next_seq,
PacketFlags::ENCRYPTED,
);
assert_eq!(packets.len(), 1);
let v2 = packets.into_iter().next().unwrap();
assert!(v2.header.flags.contains(PacketFlags::COALESCED));
assert!(v2.header.flags.contains(PacketFlags::ENCRYPTED));
assert_eq!(v2.header.stream_id, 3);
assert_eq!(v2.header.sequence, 100);
assert_eq!(next_seq, 101);
}
}