use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU16, Ordering};
use tokio::net::UdpSocket;
#[derive(Debug, Clone)]
pub struct UdtlConfig {
pub redundancy_depth: u8,
pub max_buffer: u16,
pub max_datagram: u16,
}
impl Default for UdtlConfig {
fn default() -> Self {
Self {
redundancy_depth: 2,
max_buffer: 1024,
max_datagram: 1400,
}
}
}
pub struct UdtlTransport {
socket: Arc<UdpSocket>,
local_seq: AtomicU16,
remote_addr: SocketAddr,
config: UdtlConfig,
send_history: tokio::sync::Mutex<VecDeque<SentPacket>>,
}
struct SentPacket {
#[allow(dead_code)]
seq: u16,
data: Vec<u8>,
}
impl UdtlTransport {
pub fn new(socket: Arc<UdpSocket>, remote_addr: SocketAddr) -> Self {
Self {
socket,
local_seq: AtomicU16::new(1),
remote_addr,
config: UdtlConfig::default(),
send_history: tokio::sync::Mutex::new(VecDeque::new()),
}
}
pub fn with_config(
socket: Arc<UdpSocket>,
remote_addr: SocketAddr,
config: UdtlConfig,
) -> Self {
Self {
socket,
local_seq: AtomicU16::new(1),
remote_addr,
config,
send_history: tokio::sync::Mutex::new(VecDeque::new()),
}
}
pub async fn send(&self, ifp_data: &[u8]) -> Result<(), crate::errors::RtcError> {
let seq = self.local_seq.fetch_add(1, Ordering::SeqCst);
let mut packet = Vec::with_capacity(2 + ifp_data.len());
packet.extend_from_slice(&seq.to_be_bytes());
let len = ifp_data.len() as u16;
packet.extend_from_slice(&len.to_be_bytes());
packet.extend_from_slice(ifp_data);
let mut history = self.send_history.lock().await;
while history.len() as u8 > self.config.redundancy_depth {
history.pop_front();
}
for sent in history.iter() {
let r_len = sent.data.len() as u16;
packet.extend_from_slice(&r_len.to_be_bytes());
packet.extend_from_slice(&sent.data);
}
history.push_back(SentPacket {
seq,
data: ifp_data.to_vec(),
});
self.socket
.send_to(&packet, self.remote_addr)
.await
.map_err(|e| crate::errors::RtcError::Transport(format!("UDPTL send failed: {e}")))?;
Ok(())
}
pub async fn recv(
&self,
recv_buf: &mut UdtlReceiveBuffer,
) -> Result<Option<Vec<u8>>, crate::errors::RtcError> {
let mut buf = vec![0u8; self.config.max_datagram as usize];
let (n, _from) =
self.socket.recv_from(&mut buf).await.map_err(|e| {
crate::errors::RtcError::Transport(format!("UDPTL recv failed: {e}"))
})?;
if n < 2 {
return Ok(None);
}
let mut pos = 0usize;
let seq = u16::from_be_bytes([buf[0], buf[1]]);
pos += 2;
if pos + 2 > n {
return Ok(None);
}
let primary_len = u16::from_be_bytes([buf[pos], buf[pos + 1]]) as usize;
pos += 2;
if pos + primary_len > n {
return Ok(None);
}
let primary_data = buf[pos..pos + primary_len].to_vec();
pos += primary_len;
let mut redundant: Vec<(u16, Vec<u8>)> = Vec::new();
while pos + 2 <= n {
let r_len = u16::from_be_bytes([buf[pos], buf[pos + 1]]) as usize;
pos += 2;
if pos + r_len > n {
break;
}
redundant.push((0, buf[pos..pos + r_len].to_vec()));
pos += r_len;
}
recv_buf.try_deliver(seq, primary_data, redundant)
}
pub fn local_addr(&self) -> Result<SocketAddr, crate::errors::RtcError> {
self.socket
.local_addr()
.map_err(|e| crate::errors::RtcError::Transport(format!("local_addr failed: {e}")))
}
pub fn current_seq(&self) -> u16 {
self.local_seq.load(Ordering::SeqCst)
}
pub fn set_config(&mut self, config: UdtlConfig) {
self.config = config;
}
pub fn socket(&self) -> &Arc<UdpSocket> {
&self.socket
}
}
#[derive(Debug)]
pub struct UdtlReceiveBuffer {
expected_seq: u16,
buffer: std::collections::BTreeMap<u16, Vec<u8>>,
max_size: u16,
pub packets_received: u64,
pub packets_lost: u64,
pub packets_recovered: u64,
pub last_delivered_seq: Option<u16>,
}
impl UdtlReceiveBuffer {
pub fn new() -> Self {
Self {
expected_seq: 1,
buffer: std::collections::BTreeMap::new(),
max_size: 128,
packets_received: 0,
packets_lost: 0,
packets_recovered: 0,
last_delivered_seq: None,
}
}
pub fn with_max_size(max_size: u16) -> Self {
Self {
max_size,
..Self::new()
}
}
pub fn try_deliver(
&mut self,
seq: u16,
primary: Vec<u8>,
_redundant: Vec<(u16, Vec<u8>)>,
) -> Result<Option<Vec<u8>>, crate::errors::RtcError> {
self.packets_received += 1;
if seq < self.expected_seq && self.expected_seq.wrapping_sub(seq) < 16384 {
return Ok(None);
}
if seq == self.expected_seq {
self.expected_seq = self.expected_seq.wrapping_add(1);
self.last_delivered_seq = Some(seq);
self.flush_contiguous();
self.cleanup_stale();
Ok(Some(primary))
} else if seq > self.expected_seq {
if (self.buffer.len() as u16) < self.max_size {
self.buffer.insert(seq, primary);
}
self.flush_buffer()
} else {
Ok(None)
}
}
fn flush_contiguous(&mut self) {
while let Some(_) = self.buffer.remove(&self.expected_seq) {
self.expected_seq = self.expected_seq.wrapping_add(1);
self.last_delivered_seq = Some(self.expected_seq.wrapping_sub(1));
self.packets_recovered += 1;
}
}
fn flush_buffer(&mut self) -> Result<Option<Vec<u8>>, crate::errors::RtcError> {
if let Some(data) = self.buffer.remove(&self.expected_seq) {
self.expected_seq = self.expected_seq.wrapping_add(1);
self.last_delivered_seq = Some(self.expected_seq.wrapping_sub(1));
self.packets_recovered += 1;
self.flush_contiguous();
return Ok(Some(data));
}
Ok(None)
}
fn cleanup_stale(&mut self) {
let max_gap = 32u16;
let mut early = Vec::new();
for (&seq, _) in self.buffer.range(self.expected_seq..) {
let gap = seq.wrapping_sub(self.expected_seq);
if gap >= max_gap && gap < 32768 {
early.push(seq);
}
}
for seq in early {
self.buffer.remove(&seq);
self.packets_lost += 1;
}
}
pub fn reset(&mut self, expected_seq: u16) {
self.expected_seq = expected_seq;
self.buffer.clear();
}
pub fn expected_seq(&self) -> u16 {
self.expected_seq
}
pub fn buffered_count(&self) -> usize {
self.buffer.len()
}
}
unsafe impl Send for UdtlTransport {}
unsafe impl Sync for UdtlTransport {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_udtl_receive_buffer_in_order() {
let mut buf = UdtlReceiveBuffer::new();
let result = buf.try_deliver(1, vec![0x01, 0x02], vec![]).unwrap();
assert_eq!(result, Some(vec![0x01, 0x02]));
assert_eq!(buf.expected_seq, 2);
}
#[test]
fn test_udtl_receive_buffer_out_of_order() {
let mut buf = UdtlReceiveBuffer::new();
let result = buf.try_deliver(2, vec![0x03, 0x04], vec![]).unwrap();
assert_eq!(result, None); assert_eq!(buf.buffered_count(), 1);
let result = buf.try_deliver(1, vec![0x01, 0x02], vec![]).unwrap();
assert_eq!(result, Some(vec![0x01, 0x02]));
assert_eq!(buf.expected_seq, 3);
assert_eq!(buf.buffered_count(), 0);
}
#[test]
fn test_udtl_receive_buffer_duplicate() {
let mut buf = UdtlReceiveBuffer::new();
buf.try_deliver(1, vec![0x01], vec![]).unwrap();
let result = buf.try_deliver(1, vec![0x02], vec![]).unwrap();
assert_eq!(result, None);
assert_eq!(buf.expected_seq, 2);
}
#[test]
fn test_udtl_receive_buffer_too_old() {
let mut buf = UdtlReceiveBuffer::new();
buf.try_deliver(5, vec![0x05], vec![]).unwrap();
assert_eq!(buf.expected_seq, 1);
assert_eq!(buf.buffered_count(), 1);
buf.buffer.remove(&5);
buf.try_deliver(1, vec![0x01], vec![]).unwrap();
assert_eq!(buf.expected_seq, 2);
let result = buf.try_deliver(1, vec![0x01], vec![]).unwrap();
assert_eq!(result, None);
}
#[test]
fn test_udtl_receive_buffer_gap_recovery() {
let mut buf = UdtlReceiveBuffer::new();
buf.try_deliver(1, vec![0x01], vec![]).unwrap();
assert_eq!(buf.expected_seq, 2);
buf.try_deliver(3, vec![0x03], vec![]).unwrap();
assert_eq!(buf.buffered_count(), 1);
let result = buf.try_deliver(2, vec![0x02], vec![]).unwrap();
assert_eq!(result, Some(vec![0x02]));
assert_eq!(buf.expected_seq, 4);
assert_eq!(buf.buffered_count(), 0);
}
#[test]
fn test_udtl_sequence_wrapping() {
let mut buf = UdtlReceiveBuffer::with_max_size(256);
buf.expected_seq = 65530;
buf.last_delivered_seq = Some(65529);
let result = buf.try_deliver(65530, vec![0x01], vec![]).unwrap();
assert_eq!(result, Some(vec![0x01]));
assert_eq!(buf.expected_seq, 65531);
let result = buf.try_deliver(65531, vec![0x02], vec![]).unwrap();
assert_eq!(result, Some(vec![0x02]));
assert_eq!(buf.expected_seq, 65532);
buf.expected_seq = 65535;
let result = buf.try_deliver(65535, vec![0x03], vec![]).unwrap();
assert_eq!(result, Some(vec![0x03]));
assert_eq!(buf.expected_seq, 0);
let result = buf.try_deliver(0, vec![0x04], vec![]).unwrap();
assert_eq!(result, Some(vec![0x04]));
assert_eq!(buf.expected_seq, 1);
}
#[test]
fn test_send_history_pruning() {
let config = UdtlConfig {
redundancy_depth: 2,
..UdtlConfig::default()
};
let mut history: VecDeque<SentPacket> = VecDeque::new();
for i in 0..5u16 {
while history.len() as u8 > config.redundancy_depth {
history.pop_front();
}
history.push_back(SentPacket {
seq: i,
data: vec![i as u8],
});
}
assert_eq!(history.len(), 3); assert_eq!(history[0].seq, 2);
assert_eq!(history[1].seq, 3);
assert_eq!(history[2].seq, 4);
}
#[test]
fn test_cleanup_stale_removes_old_packets() {
let mut buf = UdtlReceiveBuffer::new();
buf.try_deliver(1, vec![0x01], vec![]).unwrap();
assert_eq!(buf.expected_seq, 2);
buf.try_deliver(2, vec![0x02], vec![]).unwrap();
buf.try_deliver(3, vec![0x03], vec![]).unwrap();
buf.try_deliver(4, vec![0x04], vec![]).unwrap();
assert_eq!(buf.expected_seq, 5);
buf.try_deliver(100, vec![0x64], vec![]).unwrap();
assert_eq!(buf.buffered_count(), 1);
buf.try_deliver(5, vec![0x05], vec![]).unwrap();
assert_eq!(buf.expected_seq, 6);
assert_eq!(buf.buffered_count(), 0);
assert_eq!(buf.packets_lost, 1);
}
#[test]
fn test_reset_buffer() {
let mut buf = UdtlReceiveBuffer::new();
buf.try_deliver(1, vec![0x01], vec![]).unwrap();
buf.try_deliver(3, vec![0x03], vec![]).unwrap();
assert_eq!(buf.buffered_count(), 1);
buf.reset(10);
assert_eq!(buf.expected_seq, 10);
assert_eq!(buf.buffered_count(), 0);
}
#[test]
fn test_packet_stats_tracking() {
let mut buf = UdtlReceiveBuffer::new();
buf.try_deliver(1, vec![0x01], vec![]).unwrap(); buf.try_deliver(3, vec![0x03], vec![]).unwrap(); buf.try_deliver(2, vec![0x02], vec![]).unwrap(); buf.try_deliver(1, vec![0x01], vec![]).unwrap();
assert_eq!(buf.packets_received, 4);
assert_eq!(buf.last_delivered_seq, Some(3));
}
}