#![allow(dead_code)]
use std::collections::{BTreeMap, VecDeque};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RistProfile {
Simple,
Main,
Advanced,
}
impl RistProfile {
#[must_use]
pub const fn name(&self) -> &'static str {
match self {
Self::Simple => "RIST Simple Profile",
Self::Main => "RIST Main Profile",
Self::Advanced => "RIST Advanced Profile",
}
}
}
#[derive(Debug, Clone)]
pub struct RistConfig {
pub profile: RistProfile,
pub buffer_duration: Duration,
pub max_retransmissions: u32,
pub nack_interval: Duration,
pub null_packet_deletion: bool,
pub max_seq_gap: u16,
pub bind_addr: SocketAddr,
pub peer_addr: Option<SocketAddr>,
}
impl Default for RistConfig {
fn default() -> Self {
Self {
profile: RistProfile::Simple,
buffer_duration: Duration::from_millis(500),
max_retransmissions: 5,
nack_interval: Duration::from_millis(20),
null_packet_deletion: true,
max_seq_gap: 200,
bind_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
peer_addr: None,
}
}
}
impl RistConfig {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub const fn with_buffer(mut self, d: Duration) -> Self {
self.buffer_duration = d;
self
}
#[must_use]
pub fn with_peer(mut self, addr: SocketAddr) -> Self {
self.peer_addr = Some(addr);
self
}
#[must_use]
pub const fn without_npd(mut self) -> Self {
self.null_packet_deletion = false;
self
}
}
#[derive(Debug, Clone)]
pub struct RistPacket {
pub sequence: u16,
pub timestamp: u32,
pub ssrc: u32,
pub payload_type: u8,
pub payload: Vec<u8>,
pub is_retransmission: bool,
pub arrived_at: Instant,
}
impl RistPacket {
#[must_use]
pub fn new(sequence: u16, timestamp: u32, ssrc: u32, payload: Vec<u8>) -> Self {
Self {
sequence,
timestamp,
ssrc,
payload_type: 33, payload,
is_retransmission: false,
arrived_at: Instant::now(),
}
}
#[must_use]
pub fn encode(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(12 + self.payload.len());
buf.push(0x80);
buf.push(self.payload_type & 0x7F);
buf.extend_from_slice(&self.sequence.to_be_bytes());
buf.extend_from_slice(&self.timestamp.to_be_bytes());
buf.extend_from_slice(&self.ssrc.to_be_bytes());
buf.extend_from_slice(&self.payload);
buf
}
#[must_use]
pub fn decode(data: &[u8]) -> Option<Self> {
if data.len() < 12 {
return None;
}
let version = (data[0] >> 6) & 0x03;
if version != 2 {
return None;
}
let payload_type = data[1] & 0x7F;
let sequence = u16::from_be_bytes([data[2], data[3]]);
let timestamp = u32::from_be_bytes([data[4], data[5], data[6], data[7]]);
let ssrc = u32::from_be_bytes([data[8], data[9], data[10], data[11]]);
let payload = data[12..].to_vec();
Some(Self {
sequence,
timestamp,
ssrc,
payload_type,
payload,
is_retransmission: false,
arrived_at: Instant::now(),
})
}
}
#[derive(Debug, Clone)]
pub struct RistNack {
pub ssrc: u32,
pub seq_start: u16,
pub seq_count: u16,
pub generated_at: Instant,
pub send_count: u32,
}
impl RistNack {
#[must_use]
pub fn single(ssrc: u32, seq: u16) -> Self {
Self {
ssrc,
seq_start: seq,
seq_count: 1,
generated_at: Instant::now(),
send_count: 0,
}
}
#[must_use]
pub fn range(ssrc: u32, seq_start: u16, seq_count: u16) -> Self {
Self {
ssrc,
seq_start,
seq_count,
generated_at: Instant::now(),
send_count: 0,
}
}
#[must_use]
pub fn encode(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(16);
buf.push(0x81); buf.push(205); buf.extend_from_slice(&3u16.to_be_bytes()); buf.extend_from_slice(&self.ssrc.to_be_bytes()); buf.extend_from_slice(&self.ssrc.to_be_bytes()); buf.extend_from_slice(&self.seq_start.to_be_bytes());
let blp: u16 = if self.seq_count > 1 {
let ones = self.seq_count.saturating_sub(1).min(16);
(1u16 << ones) - 1
} else {
0
};
buf.extend_from_slice(&blp.to_be_bytes());
buf
}
}
pub struct RistReorderBuffer {
buffer: BTreeMap<u16, RistPacket>,
pending_nacks: BTreeMap<u16, RistNack>,
highest_delivered: Option<u16>,
buffer_duration: Duration,
nack_interval: Duration,
max_retransmissions: u32,
ssrc: u32,
}
impl RistReorderBuffer {
#[must_use]
pub fn new(config: &RistConfig, ssrc: u32) -> Self {
Self {
buffer: BTreeMap::new(),
pending_nacks: BTreeMap::new(),
highest_delivered: None,
buffer_duration: config.buffer_duration,
nack_interval: config.nack_interval,
max_retransmissions: config.max_retransmissions,
ssrc,
}
}
pub fn insert(&mut self, pkt: RistPacket) {
let seq = pkt.sequence;
self.pending_nacks.remove(&seq);
self.buffer.insert(seq, pkt);
self.detect_gaps(seq);
}
pub fn drain_ready(&mut self) -> Vec<RistPacket> {
let mut out = Vec::new();
loop {
let next_seq = match self.highest_delivered {
None => {
if let Some((&seq, _)) = self.buffer.iter().next() {
seq
} else {
break;
}
}
Some(last) => last.wrapping_add(1),
};
if let Some(pkt) = self.buffer.remove(&next_seq) {
self.highest_delivered = Some(next_seq);
out.push(pkt);
} else {
let gap_expired = self
.buffer
.iter()
.next()
.map(|(_, p)| p.arrived_at.elapsed() >= self.buffer_duration)
.unwrap_or(false);
if gap_expired {
self.highest_delivered = Some(next_seq);
} else {
break;
}
}
}
out
}
pub fn pending_nacks(&mut self) -> Vec<RistNack> {
let interval = self.nack_interval;
let max_retx = self.max_retransmissions;
let mut to_send = Vec::new();
let mut to_remove = Vec::new();
for (&seq, nack) in &mut self.pending_nacks {
if nack.send_count >= max_retx {
to_remove.push(seq);
continue;
}
if nack.send_count == 0 || nack.generated_at.elapsed() >= interval {
nack.send_count += 1;
nack.generated_at = Instant::now();
to_send.push(nack.clone());
}
}
for seq in to_remove {
self.pending_nacks.remove(&seq);
}
to_send
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.buffer.len()
}
#[must_use]
pub fn pending_nack_count(&self) -> usize {
self.pending_nacks.len()
}
fn detect_gaps(&mut self, arrived_seq: u16) {
let expected_start = match self.highest_delivered {
None => return, Some(last) => last.wrapping_add(1),
};
let mut seq = expected_start;
while seq != arrived_seq {
if !self.buffer.contains_key(&seq) && !self.pending_nacks.contains_key(&seq) {
self.pending_nacks
.insert(seq, RistNack::single(self.ssrc, seq));
}
seq = seq.wrapping_add(1);
}
}
}
pub struct NullPacketFilter;
impl NullPacketFilter {
#[must_use]
pub fn delete(ts_stream: &[u8]) -> Vec<u8> {
let mut out = Vec::with_capacity(ts_stream.len());
for chunk in ts_stream.chunks_exact(188) {
let pid = u16::from(chunk[1] & 0x1F) << 8 | u16::from(chunk[2]);
if chunk[0] == 0x47 && pid == 0x1FFF {
continue; }
out.extend_from_slice(chunk);
}
out
}
#[must_use]
pub fn restore(ts_stream: &[u8], target_len: usize) -> Vec<u8> {
let target_packets = target_len / 188;
let input_packets = ts_stream.len() / 188;
let nulls_needed = target_packets.saturating_sub(input_packets);
let mut out = Vec::with_capacity(target_len);
out.extend_from_slice(ts_stream);
for _ in 0..nulls_needed {
let mut null = [0u8; 188];
null[0] = 0x47;
null[1] = 0x1F;
null[2] = 0xFF;
null[3] = 0x10; out.extend_from_slice(&null);
}
out
}
}
#[derive(Debug, Clone, Default)]
pub struct RistFlowStats {
pub packets_received: u64,
pub retransmissions_received: u64,
pub packets_lost: u64,
pub nacks_sent: u64,
pub estimated_bitrate_bps: f64,
pub loss_ratio: f64,
}
impl RistFlowStats {
pub fn update_loss_ratio(&mut self) {
let total = self.packets_received + self.packets_lost;
if total > 0 {
self.loss_ratio = self.packets_lost as f64 / total as f64;
}
}
}
pub struct RistSender {
config: RistConfig,
next_seq: u16,
timestamp: u32,
ssrc: u32,
retransmit_buffer: BTreeMap<u16, (RistPacket, Instant)>,
max_buffer_packets: usize,
stats: RistFlowStats,
}
impl RistSender {
#[must_use]
pub fn new(config: RistConfig, ssrc: u32) -> Self {
Self {
config,
next_seq: 0,
timestamp: 0,
ssrc,
retransmit_buffer: BTreeMap::new(),
max_buffer_packets: 4096,
stats: RistFlowStats::default(),
}
}
pub fn send_packet(&mut self, payload: Vec<u8>) -> Vec<u8> {
let seq = self.next_seq;
self.next_seq = self.next_seq.wrapping_add(1);
let pkt = RistPacket::new(seq, self.timestamp, self.ssrc, payload);
self.timestamp = self.timestamp.wrapping_add(3003);
let encoded = pkt.encode();
while self.retransmit_buffer.len() >= self.max_buffer_packets {
if let Some((&first_seq, _)) = self.retransmit_buffer.iter().next() {
self.retransmit_buffer.remove(&first_seq);
}
}
self.retransmit_buffer.insert(seq, (pkt, Instant::now()));
self.stats.packets_received += 1;
encoded
}
pub fn handle_nack(&mut self, nack: &RistNack) -> Vec<Vec<u8>> {
let mut out = Vec::new();
for i in 0..nack.seq_count {
let seq = nack.seq_start.wrapping_add(i);
if let Some((pkt, _)) = self.retransmit_buffer.get(&seq) {
let mut retx = pkt.clone();
retx.is_retransmission = true;
out.push(retx.encode());
self.stats.retransmissions_received += 1;
}
}
out
}
pub fn evict_old_packets(&mut self) {
let cutoff = self.config.buffer_duration;
self.retransmit_buffer
.retain(|_, (_, sent_at)| sent_at.elapsed() < cutoff);
}
#[must_use]
pub fn stats(&self) -> &RistFlowStats {
&self.stats
}
#[must_use]
pub fn buffer_size(&self) -> usize {
self.retransmit_buffer.len()
}
}
pub struct RistReceiver {
config: RistConfig,
buffer: RistReorderBuffer,
stats: RistFlowStats,
ready_queue: VecDeque<RistPacket>,
}
impl RistReceiver {
#[must_use]
pub fn new(config: RistConfig, ssrc: u32) -> Self {
let buffer = RistReorderBuffer::new(&config, ssrc);
Self {
config,
buffer,
stats: RistFlowStats::default(),
ready_queue: VecDeque::new(),
}
}
pub fn receive_datagram(&mut self, data: &[u8]) -> Vec<Vec<u8>> {
let pkt = match RistPacket::decode(data) {
Some(p) => p,
None => return Vec::new(),
};
if pkt.is_retransmission {
self.stats.retransmissions_received += 1;
}
self.stats.packets_received += 1;
self.buffer.insert(pkt);
let ready = self.buffer.drain_ready();
let mut payloads = Vec::with_capacity(ready.len());
for p in ready {
if self.config.null_packet_deletion {
payloads.push(NullPacketFilter::restore(&p.payload, p.payload.len()));
} else {
payloads.push(p.payload.clone());
}
self.ready_queue.push_back(p);
}
payloads
}
pub fn nacks_to_send(&mut self) -> Vec<RistNack> {
let nacks = self.buffer.pending_nacks();
self.stats.nacks_sent += nacks.len() as u64;
nacks
}
#[must_use]
pub fn ready_count(&self) -> usize {
self.ready_queue.len()
}
#[must_use]
pub fn stats(&self) -> &RistFlowStats {
&self.stats
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.buffer.buffered_count()
}
}
use std::collections::HashSet;
use crate::error::NetError;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RistPacketHeader {
pub rtp_seq: u16,
pub ssrc: u32,
pub timestamp: u32,
pub payload_type: u8,
pub marker: bool,
}
pub fn parse_rtp_header(data: &[u8]) -> Result<RistPacketHeader, NetError> {
if data.len() < 12 {
return Err(NetError::Protocol(
"RTP header too short (need 12 bytes)".into(),
));
}
let version = (data[0] >> 6) & 0x03;
if version != 2 {
return Err(NetError::Protocol(format!(
"RTP version {version} unsupported (expected 2)"
)));
}
let marker = (data[1] & 0x80) != 0;
let payload_type = data[1] & 0x7F;
let rtp_seq = u16::from_be_bytes([data[2], data[3]]);
let timestamp = u32::from_be_bytes([data[4], data[5], data[6], data[7]]);
let ssrc = u32::from_be_bytes([data[8], data[9], data[10], data[11]]);
Ok(RistPacketHeader {
rtp_seq,
ssrc,
timestamp,
payload_type,
marker,
})
}
#[must_use]
pub fn build_rtp_header(header: &RistPacketHeader) -> [u8; 12] {
let mut buf = [0u8; 12];
buf[0] = 0x80;
buf[1] = (u8::from(header.marker) << 7) | (header.payload_type & 0x7F);
let seq_bytes = header.rtp_seq.to_be_bytes();
buf[2] = seq_bytes[0];
buf[3] = seq_bytes[1];
let ts_bytes = header.timestamp.to_be_bytes();
buf[4] = ts_bytes[0];
buf[5] = ts_bytes[1];
buf[6] = ts_bytes[2];
buf[7] = ts_bytes[3];
let ssrc_bytes = header.ssrc.to_be_bytes();
buf[8] = ssrc_bytes[0];
buf[9] = ssrc_bytes[1];
buf[10] = ssrc_bytes[2];
buf[11] = ssrc_bytes[3];
buf
}
#[derive(Debug, Clone)]
pub struct RistNackList {
pub ssrc: u32,
pub seq_numbers: Vec<u16>,
}
impl RistNackList {
#[must_use]
pub fn new(ssrc: u32, seq_numbers: Vec<u16>) -> Self {
Self { ssrc, seq_numbers }
}
}
#[derive(Debug, Clone)]
pub struct RistSimpleConfig {
pub profile: RistProfile,
pub buffer_size_ms: u32,
pub max_retransmits: u32,
pub overhead_bandwidth: f32,
}
impl Default for RistSimpleConfig {
fn default() -> Self {
Self {
profile: RistProfile::Simple,
buffer_size_ms: 500,
max_retransmits: 5,
overhead_bandwidth: 0.05,
}
}
}
impl RistSimpleConfig {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn into_full_config(self) -> RistConfig {
RistConfig {
profile: self.profile,
buffer_duration: Duration::from_millis(u64::from(self.buffer_size_ms)),
max_retransmissions: self.max_retransmits,
..RistConfig::default()
}
}
}
pub struct RistTaskSender {
inner: RistSender,
ssrc: u32,
}
impl RistTaskSender {
#[must_use]
pub fn new(config: RistSimpleConfig, ssrc: u32) -> Self {
let full_cfg = config.into_full_config();
Self {
inner: RistSender::new(full_cfg, ssrc),
ssrc,
}
}
pub fn packetize(&mut self, payload: &[u8], timestamp: u32) -> Vec<u8> {
self.inner.timestamp = timestamp;
self.inner.send_packet(payload.to_vec())
}
#[must_use]
pub fn handle_nack(&self, nack: &RistNackList) -> Vec<Vec<u8>> {
let mut out = Vec::new();
for &seq in &nack.seq_numbers {
if let Some((pkt, _)) = self.inner.retransmit_buffer.get(&seq) {
let mut retx = pkt.clone();
retx.is_retransmission = true;
out.push(retx.encode());
}
}
out
}
pub fn prune_buffer(&mut self, max_age_ms: u32, clock_rate: u32) {
if clock_rate == 0 {
return;
}
let current_ts = self.inner.timestamp;
let clock_rate_u64 = u64::from(clock_rate);
let max_age_ms_u64 = u64::from(max_age_ms);
self.inner.retransmit_buffer.retain(|_, (pkt, _)| {
let age_ticks = current_ts.wrapping_sub(pkt.timestamp) as u64;
let age_ms = age_ticks.saturating_mul(1000) / clock_rate_u64;
age_ms <= max_age_ms_u64
});
}
#[must_use]
pub fn buffer_size(&self) -> usize {
self.inner.buffer_size()
}
#[must_use]
pub fn stats(&self) -> &RistFlowStats {
self.inner.stats()
}
}
pub struct RistTaskReceiver {
config: RistSimpleConfig,
received: HashSet<u16>,
expected_seq: u16,
missing: VecDeque<(u16, Instant)>,
ssrc: u32,
initialised: bool,
}
impl RistTaskReceiver {
#[must_use]
pub fn new(config: RistSimpleConfig, ssrc: u32) -> Self {
Self {
config,
received: HashSet::new(),
expected_seq: 0,
missing: VecDeque::new(),
ssrc,
initialised: false,
}
}
pub fn receive_packet(&mut self, data: &[u8]) -> Result<Option<Vec<u8>>, NetError> {
let hdr = parse_rtp_header(data)?;
if data.len() < 12 {
return Err(NetError::Protocol("packet too short".into()));
}
let payload = data[12..].to_vec();
let seq = hdr.rtp_seq;
if !self.initialised {
self.expected_seq = seq;
self.initialised = true;
}
if self.received.contains(&seq) {
return Ok(None);
}
self.received.insert(seq);
self.missing.retain(|(s, _)| *s != seq);
if seq == self.expected_seq {
self.expected_seq = self.expected_seq.wrapping_add(1);
while self.received.contains(&self.expected_seq) {
self.expected_seq = self.expected_seq.wrapping_add(1);
}
Ok(Some(payload))
} else {
let mut gap_seq = self.expected_seq;
while gap_seq != seq {
if !self.received.contains(&gap_seq)
&& !self.missing.iter().any(|(s, _)| *s == gap_seq)
{
self.missing.push_back((gap_seq, Instant::now()));
}
gap_seq = gap_seq.wrapping_add(1);
}
Ok(None)
}
}
pub fn generate_nacks(&mut self) -> Vec<RistNackList> {
let timeout = Duration::from_millis(u64::from(self.config.buffer_size_ms));
self.missing.retain(|(_, t)| t.elapsed() < timeout);
if self.missing.is_empty() {
return Vec::new();
}
let seq_numbers: Vec<u16> = self.missing.iter().map(|(s, _)| *s).collect();
vec![RistNackList::new(self.ssrc, seq_numbers)]
}
#[must_use]
pub fn missing_count(&self) -> usize {
self.missing.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn default_config() -> RistConfig {
RistConfig::default()
}
#[test]
fn test_rist_profile_names() {
assert_eq!(RistProfile::Simple.name(), "RIST Simple Profile");
assert_eq!(RistProfile::Main.name(), "RIST Main Profile");
assert_eq!(RistProfile::Advanced.name(), "RIST Advanced Profile");
}
#[test]
fn test_rist_config_default() {
let cfg = default_config();
assert_eq!(cfg.profile, RistProfile::Simple);
assert!(cfg.null_packet_deletion);
assert_eq!(cfg.max_retransmissions, 5);
}
#[test]
fn test_rist_config_builder() {
let cfg = RistConfig::new()
.with_buffer(Duration::from_millis(200))
.without_npd();
assert!(!cfg.null_packet_deletion);
assert_eq!(cfg.buffer_duration, Duration::from_millis(200));
}
#[test]
fn test_rist_packet_encode_decode() {
let payload = vec![0x47, 0x1F, 0xFF, 0x10]; let pkt = RistPacket::new(42, 12345, 0xDEAD_BEEF, payload.clone());
let encoded = pkt.encode();
let decoded = RistPacket::decode(&encoded).expect("decode should succeed");
assert_eq!(decoded.sequence, 42);
assert_eq!(decoded.timestamp, 12345);
assert_eq!(decoded.ssrc, 0xDEAD_BEEF);
assert_eq!(decoded.payload, payload);
}
#[test]
fn test_rist_packet_decode_invalid_version() {
let mut raw = vec![0u8; 16];
raw[0] = 0x00; assert!(RistPacket::decode(&raw).is_none());
}
#[test]
fn test_rist_packet_decode_too_short() {
let raw = vec![0x80; 8]; assert!(RistPacket::decode(&raw).is_none());
}
#[test]
fn test_rist_nack_single_encode() {
let nack = RistNack::single(0xABCD_1234, 100);
let encoded = nack.encode();
assert_eq!(encoded.len(), 16);
assert_eq!(encoded[0], 0x81);
assert_eq!(encoded[1], 205);
}
#[test]
fn test_rist_nack_range() {
let nack = RistNack::range(0, 10, 5);
assert_eq!(nack.seq_start, 10);
assert_eq!(nack.seq_count, 5);
}
#[test]
fn test_null_packet_deletion() {
let mut stream = vec![0u8; 376]; stream[0] = 0x47;
stream[1] = 0x00; stream[2] = 0x00; stream[188] = 0x47;
stream[189] = 0x1F;
stream[190] = 0xFF;
let filtered = NullPacketFilter::delete(&stream);
assert_eq!(filtered.len(), 188); }
#[test]
fn test_null_packet_restore() {
let ts = vec![0x47u8; 188]; let restored = NullPacketFilter::restore(&ts, 376);
assert_eq!(restored.len(), 376);
assert_eq!(restored[188], 0x47);
assert_eq!(restored[189], 0x1F);
assert_eq!(restored[190], 0xFF);
}
#[test]
fn test_rist_sender_send_packet() {
let mut sender = RistSender::new(default_config(), 1);
let encoded = sender.send_packet(vec![0x47; 188]);
assert!(!encoded.is_empty());
assert_eq!(sender.buffer_size(), 1);
assert_eq!(sender.stats().packets_received, 1);
}
#[test]
fn test_rist_sender_sequence_wraps() {
let mut sender = RistSender::new(default_config(), 1);
sender.next_seq = u16::MAX;
sender.send_packet(vec![1]);
sender.send_packet(vec![2]);
assert_eq!(sender.next_seq, 1); }
#[test]
fn test_rist_sender_handle_nack() {
let mut sender = RistSender::new(default_config(), 5);
sender.send_packet(vec![0u8; 10]);
let nack = RistNack::single(5, 0);
let retx = sender.handle_nack(&nack);
assert_eq!(retx.len(), 1);
}
#[test]
fn test_rist_sender_nack_missing() {
let mut sender = RistSender::new(default_config(), 5);
let nack = RistNack::single(5, 99); let retx = sender.handle_nack(&nack);
assert!(retx.is_empty());
}
#[test]
fn test_reorder_buffer_insert_drain() {
let cfg = default_config();
let mut buf = RistReorderBuffer::new(&cfg, 1);
let pkt0 = RistPacket::new(0, 0, 1, vec![0]);
buf.insert(pkt0);
let ready = buf.drain_ready();
assert_eq!(ready.len(), 1);
assert_eq!(ready[0].sequence, 0);
}
#[test]
fn test_reorder_buffer_gap_nack() {
let cfg = RistConfig::new().with_buffer(Duration::from_secs(10)); let mut buf = RistReorderBuffer::new(&cfg, 1);
buf.insert(RistPacket::new(0, 0, 1, vec![0]));
let _ = buf.drain_ready();
buf.insert(RistPacket::new(2, 0, 1, vec![2]));
let _ = buf.drain_ready();
assert!(buf.pending_nack_count() > 0);
}
#[test]
fn test_rist_receiver_receive_datagram() {
let mut sender = RistSender::new(default_config(), 7);
let mut receiver = RistReceiver::new(default_config(), 7);
let encoded = sender.send_packet(vec![0x47; 188]);
let payloads = receiver.receive_datagram(&encoded);
assert_eq!(payloads.len(), 1);
assert_eq!(receiver.stats().packets_received, 1);
}
#[test]
fn test_rist_flow_stats_loss_ratio() {
let mut stats = RistFlowStats {
packets_received: 90,
packets_lost: 10,
..Default::default()
};
stats.update_loss_ratio();
assert!((stats.loss_ratio - 0.1).abs() < 1e-9);
}
#[test]
fn test_rist_flow_stats_zero_total() {
let mut stats = RistFlowStats::default();
stats.update_loss_ratio();
assert_eq!(stats.loss_ratio, 0.0);
}
#[test]
fn test_rist_sender_evict_old_packets() {
let cfg = RistConfig::new().with_buffer(Duration::from_nanos(1));
let mut sender = RistSender::new(cfg, 1);
sender.send_packet(vec![0u8; 188]);
std::thread::sleep(Duration::from_millis(1));
sender.evict_old_packets();
assert_eq!(sender.buffer_size(), 0);
}
#[test]
fn test_parse_build_rtp_header_roundtrip() {
let hdr = RistPacketHeader {
rtp_seq: 0x1234,
ssrc: 0xDEAD_BEEF,
timestamp: 0x0011_2233,
payload_type: 33,
marker: true,
};
let raw = build_rtp_header(&hdr);
let parsed = parse_rtp_header(&raw).expect("should parse");
assert_eq!(parsed.rtp_seq, hdr.rtp_seq);
assert_eq!(parsed.ssrc, hdr.ssrc);
assert_eq!(parsed.timestamp, hdr.timestamp);
assert_eq!(parsed.payload_type, hdr.payload_type);
assert_eq!(parsed.marker, hdr.marker);
}
#[test]
fn test_parse_rtp_header_too_short() {
let err = parse_rtp_header(&[0x80u8; 8]);
assert!(err.is_err());
}
#[test]
fn test_parse_rtp_header_bad_version() {
let mut raw = [0u8; 12];
raw[0] = 0x00; let err = parse_rtp_header(&raw);
assert!(err.is_err());
}
#[test]
fn test_build_rtp_header_version() {
let hdr = RistPacketHeader {
rtp_seq: 1,
ssrc: 0,
timestamp: 0,
payload_type: 96,
marker: false,
};
let raw = build_rtp_header(&hdr);
assert_eq!((raw[0] >> 6) & 0x03, 2);
}
#[test]
fn test_build_rtp_header_marker() {
let hdr = RistPacketHeader {
rtp_seq: 0,
ssrc: 0,
timestamp: 0,
payload_type: 96,
marker: true,
};
let raw = build_rtp_header(&hdr);
assert_eq!(raw[1] & 0x80, 0x80);
}
#[test]
fn test_rist_simple_config_defaults() {
let cfg = RistSimpleConfig::default();
assert_eq!(cfg.buffer_size_ms, 500);
assert_eq!(cfg.max_retransmits, 5);
assert!((cfg.overhead_bandwidth - 0.05).abs() < 1e-6);
}
#[test]
fn test_rist_simple_config_into_full() {
let simple = RistSimpleConfig {
buffer_size_ms: 200,
max_retransmits: 3,
..Default::default()
};
let full = simple.into_full_config();
assert_eq!(full.buffer_duration, Duration::from_millis(200));
assert_eq!(full.max_retransmissions, 3);
}
#[test]
fn test_rist_nack_list_new() {
let nack = RistNackList::new(42, vec![10, 11, 12]);
assert_eq!(nack.ssrc, 42);
assert_eq!(nack.seq_numbers, vec![10, 11, 12]);
}
#[test]
fn test_rist_task_sender_packetize_and_handle_nack() {
let cfg = RistSimpleConfig::default();
let mut sender = RistTaskSender::new(cfg, 0xABCD);
let pkt = sender.packetize(&[0x47u8; 188], 90_000);
assert!(!pkt.is_empty());
assert_eq!(sender.buffer_size(), 1);
let nack = RistNackList::new(0xABCD, vec![0]);
let retx = sender.handle_nack(&nack);
assert_eq!(retx.len(), 1);
}
#[test]
fn test_rist_task_sender_handle_nack_unknown() {
let cfg = RistSimpleConfig::default();
let sender = RistTaskSender::new(cfg, 1);
let nack = RistNackList::new(1, vec![99]);
let retx = sender.handle_nack(&nack);
assert!(retx.is_empty());
}
#[test]
fn test_rist_task_sender_prune_buffer() {
let cfg = RistSimpleConfig {
buffer_size_ms: 500,
..Default::default()
};
let mut sender = RistTaskSender::new(cfg, 1);
sender.packetize(&[0u8; 4], 0); sender.inner.timestamp = 9001;
sender.prune_buffer(50, 90_000);
assert_eq!(sender.buffer_size(), 0);
}
#[test]
fn test_rist_task_sender_prune_buffer_keeps_recent() {
let cfg = RistSimpleConfig::default();
let mut sender = RistTaskSender::new(cfg, 1);
sender.packetize(&[0u8; 4], 90_000); sender.prune_buffer(500, 90_000);
assert_eq!(sender.buffer_size(), 1);
}
#[test]
fn test_rist_task_receiver_in_order() {
let mut tx = RistTaskSender::new(RistSimpleConfig::default(), 7);
let mut rx = RistTaskReceiver::new(RistSimpleConfig::default(), 7);
let pkt0 = tx.packetize(&[0xAAu8; 4], 0);
let result = rx.receive_packet(&pkt0).expect("no error");
assert!(result.is_some());
}
#[test]
fn test_rist_task_receiver_duplicate() {
let mut tx = RistTaskSender::new(RistSimpleConfig::default(), 7);
let mut rx = RistTaskReceiver::new(RistSimpleConfig::default(), 7);
let pkt0 = tx.packetize(&[0xBBu8; 4], 0);
let _ = rx.receive_packet(&pkt0).expect("first ok");
let result = rx.receive_packet(&pkt0).expect("duplicate ok");
assert!(result.is_none());
}
#[test]
fn test_rist_task_receiver_generate_nacks_gap() {
let hdr0 = RistPacketHeader {
rtp_seq: 0,
ssrc: 1,
timestamp: 0,
payload_type: 33,
marker: false,
};
let hdr2 = RistPacketHeader {
rtp_seq: 2,
ssrc: 1,
timestamp: 90,
payload_type: 33,
marker: false,
};
let make_pkt = |hdr: &RistPacketHeader| {
let header_bytes = build_rtp_header(hdr);
let mut v: Vec<u8> = header_bytes.to_vec();
v.extend_from_slice(&[0xFFu8; 4]);
v
};
let mut rx = RistTaskReceiver::new(RistSimpleConfig::default(), 1);
let _ = rx.receive_packet(&make_pkt(&hdr0)).expect("seq 0 ok");
let _ = rx.receive_packet(&make_pkt(&hdr2)).expect("seq 2 ok");
let nacks = rx.generate_nacks();
assert!(!nacks.is_empty());
assert!(nacks[0].seq_numbers.contains(&1));
}
#[test]
fn test_rist_task_receiver_missing_count() {
let hdr0 = RistPacketHeader {
rtp_seq: 0,
ssrc: 5,
timestamp: 0,
payload_type: 33,
marker: false,
};
let hdr3 = RistPacketHeader {
rtp_seq: 3,
ssrc: 5,
timestamp: 270,
payload_type: 33,
marker: false,
};
let make_pkt = |hdr: &RistPacketHeader| {
let mut v: Vec<u8> = build_rtp_header(hdr).to_vec();
v.extend_from_slice(&[0u8; 4]);
v
};
let mut rx = RistTaskReceiver::new(RistSimpleConfig::default(), 5);
let _ = rx.receive_packet(&make_pkt(&hdr0));
let _ = rx.receive_packet(&make_pkt(&hdr3));
assert_eq!(rx.missing_count(), 2);
}
}