#![allow(dead_code)]
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum PathId {
Primary,
Secondary,
}
impl PathId {
#[must_use]
pub const fn name(&self) -> &'static str {
match self {
Self::Primary => "Path-A (Primary)",
Self::Secondary => "Path-B (Secondary)",
}
}
}
#[derive(Debug, Clone)]
pub struct St2022Config {
pub merge_delay: Duration,
pub max_buffer_packets: usize,
pub prefer_primary: bool,
pub fail_threshold: u32,
pub restore_threshold: u32,
pub primary_addr: SocketAddr,
pub secondary_addr: SocketAddr,
}
impl Default for St2022Config {
fn default() -> Self {
Self {
merge_delay: Duration::from_millis(50),
max_buffer_packets: 512,
prefer_primary: true,
fail_threshold: 10,
restore_threshold: 5,
primary_addr: "0.0.0.0:5000".parse().expect("valid addr"),
secondary_addr: "0.0.0.0:5002".parse().expect("valid addr"),
}
}
}
impl St2022Config {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub const fn with_merge_delay(mut self, delay: Duration) -> Self {
self.merge_delay = delay;
self
}
#[must_use]
pub const fn with_max_buffer(mut self, max: usize) -> Self {
self.max_buffer_packets = max;
self
}
}
#[derive(Debug, Clone, Default)]
pub struct PathStats {
pub packets_received: u64,
pub packets_used: u64,
pub duplicates_discarded: u64,
pub late_arrivals: u64,
pub out_of_order: u64,
pub consecutive_failures: u32,
pub is_up: bool,
}
impl PathStats {
fn new() -> Self {
Self {
is_up: true,
..Default::default()
}
}
#[must_use]
pub fn health_score(&self) -> f64 {
let total = self.packets_received;
if total == 0 {
return 1.0; }
let bad = self.late_arrivals + self.out_of_order;
let ratio = bad as f64 / total as f64;
(1.0 - ratio).clamp(0.0, 1.0)
}
}
#[derive(Debug)]
struct MergeEntry {
payload: Vec<u8>,
first_path: PathId,
arrived_at: Instant,
delivered: bool,
}
pub struct ProtectionSwitcher {
config: St2022Config,
buffer: BTreeMap<u16, MergeEntry>,
stats: [PathStats; 2],
highest_seen: Option<u16>,
total_delivered: u64,
}
impl ProtectionSwitcher {
#[must_use]
pub fn new(config: St2022Config) -> Self {
Self {
config,
buffer: BTreeMap::new(),
stats: [PathStats::new(), PathStats::new()],
highest_seen: None,
total_delivered: 0,
}
}
const fn path_index(path: PathId) -> usize {
match path {
PathId::Primary => 0,
PathId::Secondary => 1,
}
}
pub fn receive(&mut self, seq: u16, payload: Vec<u8>, path: PathId) -> bool {
let idx = Self::path_index(path);
self.stats[idx].packets_received += 1;
if let Some(highest) = self.highest_seen {
let diff = seq.wrapping_sub(highest);
if diff > 32768 {
self.stats[idx].out_of_order += 1;
} else {
self.highest_seen = Some(seq);
}
} else {
self.highest_seen = Some(seq);
}
if let Some(entry) = self.buffer.get_mut(&seq) {
if entry.delivered {
self.stats[idx].duplicates_discarded += 1;
return false;
}
if entry.arrived_at.elapsed() > self.config.merge_delay {
self.stats[idx].late_arrivals += 1;
return false;
}
self.stats[idx].duplicates_discarded += 1;
return false;
}
while self.buffer.len() >= self.config.max_buffer_packets {
if let Some((&old_seq, _)) = self.buffer.iter().next() {
self.buffer.remove(&old_seq);
}
}
self.buffer.insert(
seq,
MergeEntry {
payload,
first_path: path,
arrived_at: Instant::now(),
delivered: false,
},
);
self.stats[idx].packets_used += 1;
true
}
pub fn drain_ready(&mut self) -> Vec<(u16, Vec<u8>)> {
let delay = self.config.merge_delay;
let mut ready = Vec::new();
for (&seq, entry) in &mut self.buffer {
if !entry.delivered && entry.arrived_at.elapsed() >= delay {
entry.delivered = true;
ready.push((seq, entry.payload.clone()));
self.total_delivered += 1;
}
}
self.buffer
.retain(|_, e| e.arrived_at.elapsed() < delay * 4);
ready
}
#[must_use]
pub fn path_stats(&self, path: PathId) -> &PathStats {
&self.stats[Self::path_index(path)]
}
#[must_use]
pub fn active_path(&self) -> PathId {
let primary_health = self.stats[0].health_score();
let secondary_health = self.stats[1].health_score();
if self.config.prefer_primary && primary_health >= secondary_health * 0.9 {
PathId::Primary
} else if secondary_health > primary_health {
PathId::Secondary
} else {
PathId::Primary
}
}
#[must_use]
pub fn buffer_depth(&self) -> usize {
self.buffer.len()
}
#[must_use]
pub fn total_delivered(&self) -> u64 {
self.total_delivered
}
}
pub struct DualPathSender {
_config: St2022Config,
next_seq: u16,
sent_primary: u64,
sent_secondary: u64,
}
impl DualPathSender {
#[must_use]
pub fn new(config: St2022Config) -> Self {
Self {
_config: config,
next_seq: 0,
sent_primary: 0,
sent_secondary: 0,
}
}
pub fn prepare_send(&mut self, _payload: &[u8]) -> (u16, SocketAddr, SocketAddr) {
let seq = self.next_seq;
self.next_seq = self.next_seq.wrapping_add(1);
self.sent_primary += 1;
self.sent_secondary += 1;
(seq, self._config.primary_addr, self._config.secondary_addr)
}
#[must_use]
pub const fn send_counts(&self) -> (u64, u64) {
(self.sent_primary, self.sent_secondary)
}
}
use std::collections::HashMap;
use crate::rist::parse_rtp_header;
#[derive(Debug, Clone, Default)]
pub struct PathSwitchingStats {
pub path_a_packets: u64,
pub path_b_packets: u64,
pub duplicates_discarded: u64,
pub recovered_from_path_b: u64,
pub path_a_loss_rate: f32,
pub path_b_loss_rate: f32,
}
#[derive(Debug, Clone)]
pub struct St20227Config {
pub path_delay_limit_ms: u32,
pub sequence_history: usize,
}
impl Default for St20227Config {
fn default() -> Self {
Self {
path_delay_limit_ms: 100,
sequence_history: 256,
}
}
}
impl St20227Config {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub const fn with_path_delay_limit(mut self, ms: u32) -> Self {
self.path_delay_limit_ms = ms;
self
}
#[must_use]
pub const fn with_sequence_history(mut self, n: usize) -> Self {
self.sequence_history = n;
self
}
}
pub struct St20227Receiver {
config: St20227Config,
path_a: HashMap<u16, (Vec<u8>, Instant)>,
path_b: HashMap<u16, (Vec<u8>, Instant)>,
delivered: std::collections::HashSet<u16>,
seen_on_a: std::collections::HashSet<u16>,
next_deliver: u16,
initialised: bool,
stats: PathSwitchingStats,
}
impl St20227Receiver {
#[must_use]
pub fn new(config: St20227Config) -> Self {
Self {
config,
path_a: HashMap::new(),
path_b: HashMap::new(),
delivered: std::collections::HashSet::new(),
seen_on_a: std::collections::HashSet::new(),
next_deliver: 0,
initialised: false,
stats: PathSwitchingStats::default(),
}
}
fn extract_payload(data: &[u8]) -> Option<Vec<u8>> {
if data.len() < 12 {
return None;
}
Some(data[12..].to_vec())
}
pub fn receive_path_a(&mut self, data: &[u8]) -> Option<Vec<u8>> {
let hdr = parse_rtp_header(data).ok()?;
let seq = hdr.rtp_seq;
self.stats.path_a_packets += 1;
self.init_next_deliver(seq);
self.seen_on_a.insert(seq);
if self.delivered.contains(&seq) {
self.stats.duplicates_discarded += 1;
return None;
}
if self.path_b.contains_key(&seq) {
let age_ok = self
.path_b
.get(&seq)
.map(|(_, t)| {
t.elapsed() < Duration::from_millis(u64::from(self.config.path_delay_limit_ms))
})
.unwrap_or(false);
if age_ok {
self.stats.duplicates_discarded += 1;
}
}
self.path_a.insert(seq, (data.to_vec(), Instant::now()));
self.try_deliver()
}
pub fn receive_path_b(&mut self, data: &[u8]) -> Option<Vec<u8>> {
let hdr = parse_rtp_header(data).ok()?;
let seq = hdr.rtp_seq;
self.stats.path_b_packets += 1;
self.init_next_deliver(seq);
if self.delivered.contains(&seq) {
self.stats.duplicates_discarded += 1;
return None;
}
let path_a_seen = self.seen_on_a.contains(&seq) || self.path_a.contains_key(&seq);
self.path_b.insert(seq, (data.to_vec(), Instant::now()));
if !path_a_seen {
self.stats.recovered_from_path_b += 1;
} else {
let age_ok = self
.path_a
.get(&seq)
.map(|(_, t)| {
t.elapsed() < Duration::from_millis(u64::from(self.config.path_delay_limit_ms))
})
.unwrap_or(true); if age_ok {
self.stats.duplicates_discarded += 1;
}
}
self.try_deliver()
}
#[must_use]
pub fn stats(&self) -> &PathSwitchingStats {
&self.stats
}
#[must_use]
pub fn pending_count(&self) -> usize {
let mut all_seqs: std::collections::HashSet<u16> = self.path_a.keys().copied().collect();
for k in self.path_b.keys() {
all_seqs.insert(*k);
}
all_seqs.len()
}
fn init_next_deliver(&mut self, seq: u16) {
if !self.initialised {
self.next_deliver = seq;
self.initialised = true;
}
}
fn try_deliver(&mut self) -> Option<Vec<u8>> {
let seq = self.next_deliver;
let delay_limit = Duration::from_millis(u64::from(self.config.path_delay_limit_ms));
if let Some((data, _arrived)) = self.path_a.remove(&seq) {
self.path_b.remove(&seq);
self.delivered.insert(seq);
self.advance_next_deliver();
self.evict_stale();
return Self::extract_payload(&data);
}
if let Some((data, arrived)) = self.path_b.get(&seq) {
if arrived.elapsed() <= delay_limit {
let payload = Self::extract_payload(data);
self.path_b.remove(&seq);
self.delivered.insert(seq);
self.advance_next_deliver();
self.evict_stale();
return payload;
}
}
if let Some((_, arrived)) = self.path_a.get(&seq) {
if arrived.elapsed() > delay_limit {
self.path_a.remove(&seq);
self.path_b.remove(&seq);
self.delivered.insert(seq);
self.advance_next_deliver();
self.evict_stale();
}
}
None
}
fn advance_next_deliver(&mut self) {
self.next_deliver = self.next_deliver.wrapping_add(1);
let limit = self.config.sequence_history;
if self.path_a.len() > limit {
let to_remove: Vec<u16> = {
let mut keys: Vec<u16> = self.path_a.keys().copied().collect();
let nd = self.next_deliver;
keys.sort_by_key(|&k| k.wrapping_sub(nd));
keys.into_iter().skip(limit).collect()
};
for k in to_remove {
self.path_a.remove(&k);
}
}
if self.path_b.len() > limit {
let to_remove: Vec<u16> = {
let mut keys: Vec<u16> = self.path_b.keys().copied().collect();
let nd = self.next_deliver;
keys.sort_by_key(|&k| k.wrapping_sub(nd));
keys.into_iter().skip(limit).collect()
};
for k in to_remove {
self.path_b.remove(&k);
}
}
}
fn evict_stale(&mut self) {
let delay_limit = Duration::from_millis(u64::from(self.config.path_delay_limit_ms) * 4);
self.path_a.retain(|_, (_, t)| t.elapsed() < delay_limit);
self.path_b.retain(|_, (_, t)| t.elapsed() < delay_limit);
let limit = self.config.sequence_history;
if self.delivered.len() > limit {
let nd = self.next_deliver;
let mut keys: Vec<u16> = self.delivered.iter().copied().collect();
keys.sort_by_key(|&k| std::cmp::Reverse(nd.wrapping_sub(k)));
for k in keys
.into_iter()
.take(self.delivered.len().saturating_sub(limit))
{
self.delivered.remove(&k);
}
}
if self.seen_on_a.len() > limit * 2 {
let nd = self.next_deliver;
let mut keys: Vec<u16> = self.seen_on_a.iter().copied().collect();
keys.sort_by_key(|&k| std::cmp::Reverse(nd.wrapping_sub(k)));
for k in keys
.into_iter()
.take(self.seen_on_a.len().saturating_sub(limit * 2))
{
self.seen_on_a.remove(&k);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_config() -> St2022Config {
St2022Config::new().with_merge_delay(Duration::from_millis(10))
}
#[test]
fn test_path_id_name() {
assert!(PathId::Primary.name().contains("Primary"));
assert!(PathId::Secondary.name().contains("Secondary"));
}
#[test]
fn test_config_default() {
let cfg = St2022Config::default();
assert_eq!(cfg.merge_delay, Duration::from_millis(50));
assert!(cfg.prefer_primary);
}
#[test]
fn test_config_builder() {
let cfg = St2022Config::new()
.with_merge_delay(Duration::from_millis(100))
.with_max_buffer(256);
assert_eq!(cfg.merge_delay, Duration::from_millis(100));
assert_eq!(cfg.max_buffer_packets, 256);
}
#[test]
fn test_path_stats_default_health() {
let stats = PathStats::new();
assert!((stats.health_score() - 1.0).abs() < 1e-9);
}
#[test]
fn test_path_stats_health_with_losses() {
let stats = PathStats {
packets_received: 100,
late_arrivals: 10,
..PathStats::new()
};
let score = stats.health_score();
assert!(score < 1.0);
assert!(score >= 0.0);
}
#[test]
fn test_switcher_first_copy_accepted() {
let mut sw = ProtectionSwitcher::new(make_config());
let is_first = sw.receive(0, vec![0u8; 188], PathId::Primary);
assert!(is_first);
assert_eq!(sw.path_stats(PathId::Primary).packets_used, 1);
}
#[test]
fn test_switcher_duplicate_discarded() {
let cfg = St2022Config::new().with_merge_delay(Duration::from_secs(10));
let mut sw = ProtectionSwitcher::new(cfg);
sw.receive(0, vec![0u8; 188], PathId::Primary);
let is_first = sw.receive(0, vec![0u8; 188], PathId::Secondary);
assert!(!is_first);
assert_eq!(sw.path_stats(PathId::Secondary).duplicates_discarded, 1);
}
#[test]
fn test_switcher_different_seqs() {
let cfg = St2022Config::new().with_merge_delay(Duration::from_secs(10));
let mut sw = ProtectionSwitcher::new(cfg);
let a = sw.receive(0, vec![0u8], PathId::Primary);
let b = sw.receive(1, vec![1u8], PathId::Primary);
assert!(a && b);
}
#[test]
fn test_switcher_drain_after_delay() {
let cfg = St2022Config::new().with_merge_delay(Duration::from_millis(1));
let mut sw = ProtectionSwitcher::new(cfg);
sw.receive(0, vec![42u8], PathId::Primary);
std::thread::sleep(Duration::from_millis(5));
let ready = sw.drain_ready();
assert_eq!(ready.len(), 1);
assert_eq!(ready[0].0, 0); }
#[test]
fn test_switcher_buffer_depth() {
let cfg = St2022Config::new().with_merge_delay(Duration::from_secs(10));
let mut sw = ProtectionSwitcher::new(cfg);
sw.receive(0, vec![0u8], PathId::Primary);
sw.receive(1, vec![1u8], PathId::Primary);
assert_eq!(sw.buffer_depth(), 2);
}
#[test]
fn test_switcher_active_path_default() {
let sw = ProtectionSwitcher::new(make_config());
assert_eq!(sw.active_path(), PathId::Primary);
}
#[test]
fn test_switcher_total_delivered() {
let cfg = St2022Config::new().with_merge_delay(Duration::from_millis(1));
let mut sw = ProtectionSwitcher::new(cfg);
sw.receive(0, vec![0u8], PathId::Primary);
sw.receive(1, vec![1u8], PathId::Primary);
std::thread::sleep(Duration::from_millis(5));
let _ = sw.drain_ready();
assert_eq!(sw.total_delivered(), 2);
}
#[test]
fn test_dual_path_sender_seq() {
let mut sender = DualPathSender::new(St2022Config::default());
let (seq0, _, _) = sender.prepare_send(&[0u8; 188]);
let (seq1, _, _) = sender.prepare_send(&[0u8; 188]);
assert_eq!(seq0, 0);
assert_eq!(seq1, 1);
}
#[test]
fn test_dual_path_sender_counts() {
let mut sender = DualPathSender::new(St2022Config::default());
sender.prepare_send(&[0u8]);
sender.prepare_send(&[0u8]);
let (p, s) = sender.send_counts();
assert_eq!(p, 2);
assert_eq!(s, 2);
}
#[test]
fn test_switcher_buffer_eviction() {
let cfg = St2022Config::new()
.with_merge_delay(Duration::from_secs(60))
.with_max_buffer(3);
let mut sw = ProtectionSwitcher::new(cfg);
for i in 0..5u16 {
sw.receive(i, vec![i as u8], PathId::Primary);
}
assert!(sw.buffer_depth() <= 3);
}
#[test]
fn test_path_index() {
assert_eq!(ProtectionSwitcher::path_index(PathId::Primary), 0);
assert_eq!(ProtectionSwitcher::path_index(PathId::Secondary), 1);
}
use crate::rist::{build_rtp_header, RistPacketHeader};
fn make_rtp_pkt(seq: u16, ssrc: u32, payload: &[u8]) -> Vec<u8> {
let hdr = RistPacketHeader {
rtp_seq: seq,
ssrc,
timestamp: u32::from(seq) * 90,
payload_type: 33,
marker: false,
};
let mut v: Vec<u8> = build_rtp_header(&hdr).to_vec();
v.extend_from_slice(payload);
v
}
#[test]
fn test_st20227_config_defaults() {
let cfg = St20227Config::default();
assert_eq!(cfg.path_delay_limit_ms, 100);
assert_eq!(cfg.sequence_history, 256);
}
#[test]
fn test_st20227_config_builder() {
let cfg = St20227Config::new()
.with_path_delay_limit(50)
.with_sequence_history(128);
assert_eq!(cfg.path_delay_limit_ms, 50);
assert_eq!(cfg.sequence_history, 128);
}
#[test]
fn test_path_switching_stats_default() {
let s = PathSwitchingStats::default();
assert_eq!(s.path_a_packets, 0);
assert_eq!(s.path_b_packets, 0);
assert_eq!(s.duplicates_discarded, 0);
}
#[test]
fn test_st20227_receiver_path_a_delivers() {
let cfg = St20227Config::new().with_path_delay_limit(1000);
let mut rx = St20227Receiver::new(cfg);
let pkt = make_rtp_pkt(0, 1, &[0xAAu8; 4]);
let result = rx.receive_path_a(&pkt);
assert!(result.is_some());
assert_eq!(rx.stats().path_a_packets, 1);
}
#[test]
fn test_st20227_receiver_path_b_delivers() {
let cfg = St20227Config::new().with_path_delay_limit(1000);
let mut rx = St20227Receiver::new(cfg);
let pkt = make_rtp_pkt(0, 1, &[0xBBu8; 4]);
let result = rx.receive_path_b(&pkt);
assert!(result.is_some());
assert_eq!(rx.stats().path_b_packets, 1);
}
#[test]
fn test_st20227_receiver_duplicate_discarded() {
let cfg = St20227Config::new().with_path_delay_limit(1000);
let mut rx = St20227Receiver::new(cfg);
let pkt = make_rtp_pkt(0, 1, &[0u8; 4]);
let _ = rx.receive_path_a(&pkt);
let _ = rx.receive_path_b(&pkt);
assert_eq!(rx.stats().duplicates_discarded, 1);
}
#[test]
fn test_st20227_receiver_recovery_from_path_b() {
let cfg = St20227Config::new().with_path_delay_limit(1000);
let mut rx = St20227Receiver::new(cfg);
let pkt = make_rtp_pkt(0, 1, &[0u8; 4]);
let _ = rx.receive_path_b(&pkt);
assert_eq!(rx.stats().recovered_from_path_b, 1);
}
#[test]
fn test_st20227_receiver_stats_ref() {
let rx = St20227Receiver::new(St20227Config::default());
let s = rx.stats();
assert_eq!(s.path_a_packets, 0);
}
#[test]
fn test_st20227_receiver_pending_count() {
let cfg = St20227Config::new().with_path_delay_limit(1000);
let mut rx = St20227Receiver::new(cfg);
let p0 = make_rtp_pkt(0, 1, &[0u8; 4]);
let _ = rx.receive_path_a(&p0);
let p1 = make_rtp_pkt(1, 1, &[1u8; 4]);
let _ = rx.receive_path_b(&p1);
let _count = rx.pending_count();
}
#[test]
fn test_st20227_receiver_sequential_path_a() {
let cfg = St20227Config::new().with_path_delay_limit(1000);
let mut rx = St20227Receiver::new(cfg);
let mut delivered = 0usize;
for seq in 0..5u16 {
let pkt = make_rtp_pkt(seq, 1, &[seq as u8; 4]);
if rx.receive_path_a(&pkt).is_some() {
delivered += 1;
}
}
assert_eq!(delivered, 5);
}
#[test]
fn test_st20227_receiver_payload_preserved() {
let cfg = St20227Config::new().with_path_delay_limit(1000);
let mut rx = St20227Receiver::new(cfg);
let payload = [0xDE, 0xAD, 0xBE, 0xEF];
let pkt = make_rtp_pkt(0, 42, &payload);
let result = rx.receive_path_a(&pkt).expect("should deliver");
assert_eq!(result, payload);
}
}