#![allow(dead_code)]
use std::collections::{HashMap, VecDeque};
use std::net::SocketAddr;
use std::time::{Duration, Instant};
use crate::error::{NetError, NetResult};
pub const ZIXI_MAGIC: u16 = 0x5A58;
pub const ZIXI_HEADER_SIZE: usize = 11;
pub const ZIXI_MAX_PAYLOAD: usize = 65_524;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum FrameType {
Hello = 0x01,
Accept = 0x02,
Data = 0x03,
Ack = 0x04,
Nack = 0x05,
Retransmit = 0x06,
Bye = 0xFF,
}
impl FrameType {
pub fn from_byte(b: u8) -> NetResult<Self> {
match b {
0x01 => Ok(Self::Hello),
0x02 => Ok(Self::Accept),
0x03 => Ok(Self::Data),
0x04 => Ok(Self::Ack),
0x05 => Ok(Self::Nack),
0x06 => Ok(Self::Retransmit),
0xFF => Ok(Self::Bye),
other => Err(NetError::parse(
0,
format!("unknown Zixi frame type 0x{other:02X}"),
)),
}
}
#[must_use]
pub const fn as_byte(self) -> u8 {
self as u8
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ZixiHeader {
pub magic: u16,
pub frame_type: FrameType,
pub seq_no: u32,
pub timestamp_ms: u32,
pub payload_len: u16,
}
impl ZixiHeader {
#[must_use]
pub fn to_bytes(self) -> [u8; ZIXI_HEADER_SIZE] {
let mut buf = [0u8; ZIXI_HEADER_SIZE];
let magic = self.magic.to_be_bytes();
buf[0] = magic[0];
buf[1] = magic[1];
buf[2] = self.frame_type.as_byte();
let seq = self.seq_no.to_be_bytes();
buf[3] = seq[0];
buf[4] = seq[1];
buf[5] = seq[2];
buf[6] = seq[3];
let ts = self.timestamp_ms.to_be_bytes();
buf[7] = ts[0];
buf[8] = ts[1];
buf[9] = ts[2];
buf[10] = ts[3];
buf
}
pub fn from_bytes(buf: &[u8]) -> NetResult<(Self, u16)> {
if buf.len() < ZIXI_HEADER_SIZE + 2 {
return Err(NetError::parse(
0,
format!(
"buffer too short for Zixi header: {} < {}",
buf.len(),
ZIXI_HEADER_SIZE + 2
),
));
}
let magic = u16::from_be_bytes([buf[0], buf[1]]);
if magic != ZIXI_MAGIC {
return Err(NetError::parse(0, format!("bad Zixi magic 0x{magic:04X}")));
}
let frame_type = FrameType::from_byte(buf[2])?;
let seq_no = u32::from_be_bytes([buf[3], buf[4], buf[5], buf[6]]);
let timestamp_ms = u32::from_be_bytes([buf[7], buf[8], buf[9], buf[10]]);
let payload_len = u16::from_be_bytes([buf[11], buf[12]]);
Ok((
Self {
magic,
frame_type,
seq_no,
timestamp_ms,
payload_len,
},
payload_len,
))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ZixiFrame {
pub header: ZixiHeader,
pub payload: Vec<u8>,
}
impl ZixiFrame {
pub fn data(seq_no: u32, timestamp_ms: u32, payload: Vec<u8>) -> NetResult<Self> {
Self::with_type(FrameType::Data, seq_no, timestamp_ms, payload)
}
pub fn retransmit(seq_no: u32, timestamp_ms: u32, payload: Vec<u8>) -> NetResult<Self> {
Self::with_type(FrameType::Retransmit, seq_no, timestamp_ms, payload)
}
pub fn with_type(
frame_type: FrameType,
seq_no: u32,
timestamp_ms: u32,
payload: Vec<u8>,
) -> NetResult<Self> {
if payload.len() > ZIXI_MAX_PAYLOAD {
return Err(NetError::encoding(format!(
"payload {} bytes exceeds max {ZIXI_MAX_PAYLOAD}",
payload.len()
)));
}
Ok(Self {
header: ZixiHeader {
magic: ZIXI_MAGIC,
frame_type,
seq_no,
timestamp_ms,
payload_len: payload.len() as u16,
},
payload,
})
}
#[must_use]
pub fn to_bytes(&self) -> Vec<u8> {
let mut out = Vec::with_capacity(ZIXI_HEADER_SIZE + 2 + self.payload.len());
out.extend_from_slice(&self.header.to_bytes());
out.extend_from_slice(&self.header.payload_len.to_be_bytes());
out.extend_from_slice(&self.payload);
out
}
pub fn from_bytes(buf: &[u8]) -> NetResult<Self> {
let (header, payload_len) = ZixiHeader::from_bytes(buf)?;
let payload_start = ZIXI_HEADER_SIZE + 2;
let payload_end = payload_start + payload_len as usize;
if buf.len() < payload_end {
return Err(NetError::parse(
payload_start as u64,
format!(
"frame truncated: declared {} bytes payload, only {} available",
payload_len,
buf.len().saturating_sub(payload_start)
),
));
}
Ok(Self {
header,
payload: buf[payload_start..payload_end].to_vec(),
})
}
}
#[derive(Debug)]
pub struct RetransmitBuffer {
capacity: usize,
frames: VecDeque<ZixiFrame>,
index: HashMap<u32, usize>, }
impl RetransmitBuffer {
pub fn new(capacity: usize) -> NetResult<Self> {
if capacity == 0 {
return Err(NetError::invalid_state(
"retransmit buffer capacity must be > 0",
));
}
Ok(Self {
capacity,
frames: VecDeque::with_capacity(capacity),
index: HashMap::new(),
})
}
pub fn store(&mut self, frame: ZixiFrame) {
if self.frames.len() == self.capacity {
if let Some(oldest) = self.frames.pop_front() {
self.index.remove(&oldest.header.seq_no);
}
}
let idx = self.frames.len();
self.index.insert(frame.header.seq_no, idx);
self.frames.push_back(frame);
}
#[must_use]
pub fn get(&self, seq_no: u32) -> Option<&ZixiFrame> {
self.frames.iter().find(|f| f.header.seq_no == seq_no)
}
#[must_use]
pub fn len(&self) -> usize {
self.frames.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.frames.is_empty()
}
}
#[derive(Debug, Clone, Default)]
pub struct ZixiStats {
pub frames_sent: u64,
pub frames_received: u64,
pub retransmits_sent: u64,
pub retransmits_requested: u64,
pub frames_dropped_late: u64,
pub retransmit_failures: u64,
pub bytes_sent: u64,
pub bytes_received: u64,
}
impl ZixiStats {
#[must_use]
pub fn loss_ratio(&self) -> f64 {
if self.frames_sent == 0 {
return 0.0;
}
self.retransmits_requested as f64 / self.frames_sent as f64
}
}
#[derive(Debug, Clone)]
pub struct ZixiSenderConfig {
pub remote_addr: SocketAddr,
pub retransmit_buffer: usize,
pub stream_id: String,
pub latency_ms: u32,
}
impl ZixiSenderConfig {
#[must_use]
pub fn new(remote_addr: SocketAddr, stream_id: impl Into<String>) -> Self {
Self {
remote_addr,
retransmit_buffer: 1000,
stream_id: stream_id.into(),
latency_ms: 1000,
}
}
}
#[derive(Debug)]
pub struct ZixiSender {
config: ZixiSenderConfig,
next_seq: u32,
retransmit_buf: RetransmitBuffer,
stats: ZixiStats,
started_at: Instant,
}
impl ZixiSender {
pub fn new(config: ZixiSenderConfig) -> NetResult<Self> {
let buf = RetransmitBuffer::new(config.retransmit_buffer)?;
Ok(Self {
config,
next_seq: 0,
retransmit_buf: buf,
stats: ZixiStats::default(),
started_at: Instant::now(),
})
}
pub fn send(&mut self, payload: &[u8]) -> NetResult<Vec<u8>> {
let ts_ms = self.elapsed_ms();
let frame = ZixiFrame::data(self.next_seq, ts_ms, payload.to_vec())?;
let bytes = frame.to_bytes();
self.stats.frames_sent += 1;
self.stats.bytes_sent += payload.len() as u64;
self.retransmit_buf.store(frame);
self.next_seq = self.next_seq.wrapping_add(1);
Ok(bytes)
}
pub fn handle_nack(&mut self, seq_no: u32) -> NetResult<Option<Vec<u8>>> {
self.stats.retransmits_requested += 1;
match self.retransmit_buf.get(seq_no) {
None => {
self.stats.retransmit_failures += 1;
Ok(None)
}
Some(original) => {
let frame = ZixiFrame::retransmit(
seq_no,
original.header.timestamp_ms,
original.payload.clone(),
)?;
let bytes = frame.to_bytes();
self.stats.retransmits_sent += 1;
Ok(Some(bytes))
}
}
}
#[must_use]
pub const fn stats(&self) -> &ZixiStats {
&self.stats
}
#[must_use]
pub const fn next_seq(&self) -> u32 {
self.next_seq
}
#[must_use]
pub fn stream_id(&self) -> &str {
&self.config.stream_id
}
fn elapsed_ms(&self) -> u32 {
self.started_at.elapsed().as_millis() as u32
}
}
#[derive(Debug, Clone)]
pub struct ZixiReceiverConfig {
pub local_addr: SocketAddr,
pub latency_budget: Duration,
pub ack_every_n_frames: u32,
}
impl ZixiReceiverConfig {
#[must_use]
pub fn new(local_addr: SocketAddr) -> Self {
Self {
local_addr,
latency_budget: Duration::from_secs(1),
ack_every_n_frames: 10,
}
}
}
#[derive(Debug)]
pub struct ZixiReceiver {
config: ZixiReceiverConfig,
highest_seq: Option<u32>,
received_set: HashMap<u32, Instant>,
stats: ZixiStats,
since_last_ack: u32,
}
impl ZixiReceiver {
#[must_use]
pub fn new(config: ZixiReceiverConfig) -> Self {
Self {
config,
highest_seq: None,
received_set: HashMap::new(),
stats: ZixiStats::default(),
since_last_ack: 0,
}
}
pub fn receive(&mut self, raw: &[u8]) -> NetResult<Vec<ReceiverAction>> {
let frame = ZixiFrame::from_bytes(raw)?;
let mut actions = Vec::new();
match frame.header.frame_type {
FrameType::Data | FrameType::Retransmit => {
let seq = frame.header.seq_no;
let now = Instant::now();
self.stats.frames_received += 1;
self.stats.bytes_received += frame.payload.len() as u64;
self.received_set.insert(seq, now);
match self.highest_seq {
None => self.highest_seq = Some(seq),
Some(h) => {
if seq_after(seq, h) {
let gap_start = h.wrapping_add(1);
let mut s = gap_start;
while seq_after(seq, s) {
if !self.received_set.contains_key(&s) {
self.stats.retransmits_requested += 1;
actions.push(ReceiverAction::Nack(s));
}
s = s.wrapping_add(1);
}
self.highest_seq = Some(seq);
}
}
}
actions.push(ReceiverAction::Deliver(frame.payload.clone()));
self.since_last_ack += 1;
if self.config.ack_every_n_frames > 0
&& self.since_last_ack >= self.config.ack_every_n_frames
{
actions.push(ReceiverAction::Ack(seq));
self.since_last_ack = 0;
}
}
FrameType::Bye => {
actions.push(ReceiverAction::SessionEnded);
}
_ => {} }
Ok(actions)
}
#[must_use]
pub const fn stats(&self) -> &ZixiStats {
&self.stats
}
#[must_use]
pub const fn highest_seq(&self) -> Option<u32> {
self.highest_seq
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReceiverAction {
Deliver(Vec<u8>),
Nack(u32),
Ack(u32),
SessionEnded,
}
#[inline]
fn seq_after(a: u32, b: u32) -> bool {
a.wrapping_sub(b) < 0x8000_0000
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::{Ipv4Addr, SocketAddrV4};
fn local_addr() -> SocketAddr {
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9999))
}
fn remote_addr() -> SocketAddr {
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 10000))
}
#[test]
fn test_frame_type_round_trip() {
for &ft in &[
FrameType::Hello,
FrameType::Accept,
FrameType::Data,
FrameType::Ack,
FrameType::Nack,
FrameType::Retransmit,
FrameType::Bye,
] {
let b = ft.as_byte();
let parsed = FrameType::from_byte(b).expect("known type");
assert_eq!(ft, parsed);
}
}
#[test]
fn test_unknown_frame_type_errors() {
assert!(FrameType::from_byte(0xAB).is_err());
}
#[test]
fn test_frame_serialise_deserialise() {
let payload = b"hello zixi".to_vec();
let frame = ZixiFrame::data(42, 1000, payload.clone()).expect("valid");
let bytes = frame.to_bytes();
let decoded = ZixiFrame::from_bytes(&bytes).expect("valid");
assert_eq!(decoded.header.seq_no, 42);
assert_eq!(decoded.header.timestamp_ms, 1000);
assert_eq!(decoded.payload, payload);
assert_eq!(decoded.header.frame_type, FrameType::Data);
}
#[test]
fn test_magic_bytes_checked() {
let payload = b"test".to_vec();
let frame = ZixiFrame::data(0, 0, payload).expect("valid");
let mut bytes = frame.to_bytes();
bytes[0] = 0xDE; assert!(ZixiFrame::from_bytes(&bytes).is_err());
}
#[test]
fn test_truncated_buffer_errors() {
let bytes = [0u8; 5]; assert!(ZixiFrame::from_bytes(&bytes).is_err());
}
#[test]
fn test_oversized_payload_rejected() {
let too_big = vec![0u8; ZIXI_MAX_PAYLOAD + 1];
assert!(ZixiFrame::data(0, 0, too_big).is_err());
}
#[test]
fn test_header_to_bytes_length() {
let hdr = ZixiHeader {
magic: ZIXI_MAGIC,
frame_type: FrameType::Data,
seq_no: 100,
timestamp_ms: 500,
payload_len: 0,
};
let bytes = hdr.to_bytes();
assert_eq!(bytes.len(), ZIXI_HEADER_SIZE);
}
#[test]
fn test_sender_send_increments_seq() {
let mut sender =
ZixiSender::new(ZixiSenderConfig::new(remote_addr(), "test")).expect("valid");
sender.send(b"a").expect("ok");
assert_eq!(sender.next_seq(), 1);
sender.send(b"b").expect("ok");
assert_eq!(sender.next_seq(), 2);
}
#[test]
fn test_sender_stats_accumulate() {
let mut sender =
ZixiSender::new(ZixiSenderConfig::new(remote_addr(), "stream")).expect("valid");
sender.send(b"payload").expect("ok");
sender.send(b"payload2").expect("ok");
assert_eq!(sender.stats().frames_sent, 2);
assert_eq!(
sender.stats().bytes_sent,
b"payload".len() as u64 + b"payload2".len() as u64
);
}
#[test]
fn test_sender_nack_retransmits_frame() {
let mut sender = ZixiSender::new(ZixiSenderConfig::new(remote_addr(), "s")).expect("valid");
sender.send(b"original").expect("ok");
let retx = sender.handle_nack(0).expect("ok").expect("frame present");
let frame = ZixiFrame::from_bytes(&retx).expect("valid frame");
assert_eq!(frame.header.frame_type, FrameType::Retransmit);
assert_eq!(frame.payload, b"original");
}
#[test]
fn test_sender_nack_missing_frame() {
let mut sender = ZixiSender::new(ZixiSenderConfig::new(remote_addr(), "s")).expect("valid");
let result = sender.handle_nack(999).expect("no error");
assert!(result.is_none(), "seq 999 not in buffer");
assert_eq!(sender.stats().retransmit_failures, 1);
}
#[test]
fn test_receiver_delivers_payload() {
let mut receiver = ZixiReceiver::new(ZixiReceiverConfig::new(local_addr()));
let frame = ZixiFrame::data(0, 0, b"data".to_vec()).expect("valid");
let actions = receiver.receive(&frame.to_bytes()).expect("ok");
assert!(actions
.iter()
.any(|a| a == &ReceiverAction::Deliver(b"data".to_vec())));
}
#[test]
fn test_receiver_emits_nack_on_gap() {
let mut receiver = ZixiReceiver::new(ZixiReceiverConfig {
ack_every_n_frames: 0, ..ZixiReceiverConfig::new(local_addr())
});
let f0 = ZixiFrame::data(0, 0, b"a".to_vec()).expect("v");
receiver.receive(&f0.to_bytes()).expect("ok");
let f2 = ZixiFrame::data(2, 10, b"c".to_vec()).expect("v");
let actions = receiver.receive(&f2.to_bytes()).expect("ok");
let nacks: Vec<_> = actions
.iter()
.filter_map(|a| {
if let ReceiverAction::Nack(s) = a {
Some(*s)
} else {
None
}
})
.collect();
assert!(nacks.contains(&1), "should NACK seq=1, got {nacks:?}");
}
#[test]
fn test_receiver_session_ended_on_bye() {
let mut receiver = ZixiReceiver::new(ZixiReceiverConfig::new(local_addr()));
let bye = ZixiFrame::with_type(FrameType::Bye, 0, 0, vec![]).expect("valid");
let actions = receiver.receive(&bye.to_bytes()).expect("ok");
assert!(actions.contains(&ReceiverAction::SessionEnded));
}
#[test]
fn test_receiver_ack_every_n_frames() {
let mut receiver = ZixiReceiver::new(ZixiReceiverConfig {
ack_every_n_frames: 3,
..ZixiReceiverConfig::new(local_addr())
});
for seq in 0..3u32 {
let f = ZixiFrame::data(seq, 0, vec![seq as u8]).expect("v");
let actions = receiver.receive(&f.to_bytes()).expect("ok");
if seq == 2 {
assert!(
actions.iter().any(|a| matches!(a, ReceiverAction::Ack(_))),
"expected ACK on 3rd frame"
);
}
}
}
#[test]
fn test_retransmit_buffer_capacity() {
let mut buf = RetransmitBuffer::new(3).expect("valid");
for i in 0..5u32 {
let f = ZixiFrame::data(i, 0, vec![i as u8]).expect("v");
buf.store(f);
}
assert_eq!(buf.len(), 3);
assert!(buf.get(0).is_none());
assert!(buf.get(1).is_none());
assert!(buf.get(4).is_some());
}
#[test]
fn test_retransmit_buffer_zero_capacity_error() {
assert!(RetransmitBuffer::new(0).is_err());
}
#[test]
fn test_seq_after() {
assert!(seq_after(1, 0));
assert!(!seq_after(0, 1));
assert!(seq_after(0, u32::MAX));
assert!(!seq_after(u32::MAX, 0));
}
#[test]
fn test_stats_loss_ratio() {
let mut stats = ZixiStats::default();
stats.frames_sent = 100;
stats.retransmits_requested = 5;
assert!((stats.loss_ratio() - 0.05).abs() < 1e-10);
}
}