#![allow(dead_code)]
use std::collections::HashMap;
pub type SinkId = String;
pub type RelaySourceId = String;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DropPolicy {
DropOldest,
DropNewest,
Block,
}
impl Default for DropPolicy {
fn default() -> Self {
Self::DropOldest
}
}
#[derive(Debug, Clone)]
pub struct RelayFrame {
pub source_id: RelaySourceId,
pub seq: u64,
pub pts_us: u64,
pub data: Vec<u8>,
}
#[derive(Debug, Clone, Default)]
pub struct SinkStats {
pub frames_delivered: u64,
pub frames_dropped: u64,
pub bytes_delivered: u64,
}
#[derive(Debug)]
pub struct RelaySink {
pub id: SinkId,
pub buffer_capacity: usize,
pub drop_policy: DropPolicy,
pub buffer: std::collections::VecDeque<RelayFrame>,
pub stats: SinkStats,
pub subscribed_sources: Option<Vec<RelaySourceId>>,
pub active: bool,
}
impl RelaySink {
#[must_use]
pub fn new(id: SinkId, buffer_capacity: usize, drop_policy: DropPolicy) -> Self {
Self {
id,
buffer_capacity,
drop_policy,
buffer: std::collections::VecDeque::new(),
stats: SinkStats::default(),
subscribed_sources: None,
active: true,
}
}
pub fn subscribe_to(mut self, sources: Vec<RelaySourceId>) -> Self {
self.subscribed_sources = Some(sources);
self
}
#[must_use]
pub fn is_subscribed_to(&self, source_id: &str) -> bool {
match &self.subscribed_sources {
None => true,
Some(subs) => subs.iter().any(|s| s == source_id),
}
}
pub fn push(&mut self, frame: RelayFrame) -> bool {
if self.buffer.len() >= self.buffer_capacity {
match self.drop_policy {
DropPolicy::DropOldest => {
self.buffer.pop_front();
self.stats.frames_dropped += 1;
}
DropPolicy::DropNewest => {
self.stats.frames_dropped += 1;
return false;
}
DropPolicy::Block => {
self.buffer.pop_front();
self.stats.frames_dropped += 1;
}
}
}
let byte_count = frame.data.len() as u64;
self.buffer.push_back(frame);
self.stats.frames_delivered += 1;
self.stats.bytes_delivered += byte_count;
true
}
pub fn pop(&mut self) -> Option<RelayFrame> {
self.buffer.pop_front()
}
#[must_use]
pub fn buffered_count(&self) -> usize {
self.buffer.len()
}
}
#[derive(Debug, Clone, Default)]
pub struct SourceStats {
pub frames_received: u64,
pub frames_relayed: u64,
pub bytes_received: u64,
}
#[derive(Debug, Clone, PartialEq, thiserror::Error)]
pub enum RelayError {
#[error("sink '{0}' already registered")]
SinkAlreadyExists(String),
#[error("sink '{0}' not found")]
SinkNotFound(String),
#[error("relay is shut down")]
ShutDown,
}
pub type RelayResult<T> = Result<T, RelayError>;
#[derive(Debug, Default)]
pub struct StreamRelay {
sinks: HashMap<SinkId, RelaySink>,
source_stats: HashMap<RelaySourceId, SourceStats>,
running: bool,
total_relayed: u64,
}
impl StreamRelay {
#[must_use]
pub fn new() -> Self {
Self {
sinks: HashMap::new(),
source_stats: HashMap::new(),
running: true,
total_relayed: 0,
}
}
pub fn add_sink(&mut self, sink: RelaySink) -> RelayResult<()> {
if self.sinks.contains_key(&sink.id) {
return Err(RelayError::SinkAlreadyExists(sink.id.clone()));
}
self.sinks.insert(sink.id.clone(), sink);
Ok(())
}
pub fn remove_sink(&mut self, sink_id: &str) -> RelayResult<()> {
self.sinks
.remove(sink_id)
.map(|_| ())
.ok_or_else(|| RelayError::SinkNotFound(sink_id.to_owned()))
}
pub fn get_sink(&self, sink_id: &str) -> RelayResult<&RelaySink> {
self.sinks
.get(sink_id)
.ok_or_else(|| RelayError::SinkNotFound(sink_id.to_owned()))
}
pub fn get_sink_mut(&mut self, sink_id: &str) -> RelayResult<&mut RelaySink> {
self.sinks
.get_mut(sink_id)
.ok_or_else(|| RelayError::SinkNotFound(sink_id.to_owned()))
}
pub fn relay_frame(&mut self, frame: RelayFrame) -> RelayResult<usize> {
if !self.running {
return Err(RelayError::ShutDown);
}
let stats = self
.source_stats
.entry(frame.source_id.clone())
.or_default();
stats.frames_received += 1;
stats.bytes_received += frame.data.len() as u64;
let mut delivered = 0usize;
let source_id = frame.source_id.clone();
for sink in self.sinks.values_mut() {
if !sink.active || !sink.is_subscribed_to(&source_id) {
continue;
}
if sink.push(frame.clone()) {
delivered += 1;
}
}
if delivered > 0 {
let stats = self.source_stats.entry(source_id).or_default();
stats.frames_relayed += 1;
self.total_relayed += 1;
}
Ok(delivered)
}
pub fn shutdown(&mut self) {
self.running = false;
}
#[must_use]
pub fn is_running(&self) -> bool {
self.running
}
#[must_use]
pub fn total_relayed(&self) -> u64 {
self.total_relayed
}
#[must_use]
pub fn sink_count(&self) -> usize {
self.sinks.len()
}
#[must_use]
pub fn source_stats(&self, source_id: &str) -> Option<&SourceStats> {
self.source_stats.get(source_id)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_frame(source_id: &str, seq: u64, data_len: usize) -> RelayFrame {
RelayFrame {
source_id: source_id.to_owned(),
seq,
pts_us: seq * 16666,
data: vec![0u8; data_len],
}
}
#[test]
fn test_relay_basic() {
let mut relay = StreamRelay::new();
let sink = RelaySink::new("sink1".into(), 10, DropPolicy::DropOldest);
relay.add_sink(sink).expect("relay is not shut down");
let delivered = relay
.relay_frame(make_frame("cam1", 0, 100))
.expect("relay is not shut down");
assert_eq!(delivered, 1);
assert_eq!(relay.total_relayed(), 1);
}
#[test]
fn test_relay_to_multiple_sinks() {
let mut relay = StreamRelay::new();
for i in 0..3 {
let sink = RelaySink::new(format!("sink{i}"), 10, DropPolicy::DropOldest);
relay.add_sink(sink).expect("relay is not shut down");
}
let delivered = relay
.relay_frame(make_frame("cam1", 0, 50))
.expect("relay is not shut down");
assert_eq!(delivered, 3);
}
#[test]
fn test_source_subscription_filter() {
let mut relay = StreamRelay::new();
let sink = RelaySink::new("sink1".into(), 10, DropPolicy::DropOldest)
.subscribe_to(vec!["cam1".to_owned()]);
relay.add_sink(sink).expect("relay is not shut down");
let d1 = relay
.relay_frame(make_frame("cam1", 0, 10))
.expect("relay is not shut down");
assert_eq!(d1, 1);
let d2 = relay
.relay_frame(make_frame("cam2", 0, 10))
.expect("relay is not shut down");
assert_eq!(d2, 0);
}
#[test]
fn test_drop_oldest_policy() {
let sink = RelaySink::new("s".into(), 2, DropPolicy::DropOldest);
let mut relay = StreamRelay::new();
relay.add_sink(sink).expect("relay is not shut down");
relay
.relay_frame(make_frame("c", 0, 10))
.expect("relay is not shut down");
relay
.relay_frame(make_frame("c", 1, 10))
.expect("relay is not shut down");
relay
.relay_frame(make_frame("c", 2, 10)) .expect("relay is not shut down");
let sink = relay.get_sink("s").expect("sink 's' was added");
assert_eq!(sink.buffered_count(), 2);
assert_eq!(sink.stats.frames_dropped, 1);
}
#[test]
fn test_drop_newest_policy() {
let sink = RelaySink::new("s".into(), 2, DropPolicy::DropNewest);
let mut relay = StreamRelay::new();
relay.add_sink(sink).expect("relay is not shut down");
relay
.relay_frame(make_frame("c", 0, 10))
.expect("relay is not shut down");
relay
.relay_frame(make_frame("c", 1, 10))
.expect("relay is not shut down");
let delivered = relay
.relay_frame(make_frame("c", 2, 10))
.expect("relay is not shut down");
assert_eq!(delivered, 0);
let sink = relay.get_sink("s").expect("sink 's' was added");
assert_eq!(sink.buffered_count(), 2);
assert_eq!(sink.stats.frames_dropped, 1);
}
#[test]
fn test_pop_from_sink() {
let mut relay = StreamRelay::new();
relay
.add_sink(RelaySink::new("s".into(), 5, DropPolicy::DropOldest))
.expect("relay is not shut down");
relay
.relay_frame(make_frame("c", 42, 20))
.expect("relay is not shut down");
let frame = relay
.get_sink_mut("s")
.expect("sink 's' was added")
.pop()
.expect("one frame was relayed");
assert_eq!(frame.seq, 42);
}
#[test]
fn test_remove_sink() {
let mut relay = StreamRelay::new();
relay
.add_sink(RelaySink::new("s".into(), 5, DropPolicy::DropOldest))
.expect("relay is not shut down");
relay.remove_sink("s").expect("sink 's' was added");
assert_eq!(relay.sink_count(), 0);
}
#[test]
fn test_shutdown_prevents_relay() {
let mut relay = StreamRelay::new();
relay.shutdown();
let result = relay.relay_frame(make_frame("c", 0, 10));
assert!(matches!(result, Err(RelayError::ShutDown)));
}
#[test]
fn test_duplicate_sink_rejected() {
let mut relay = StreamRelay::new();
relay
.add_sink(RelaySink::new("s".into(), 5, DropPolicy::DropOldest))
.expect("relay is not shut down");
let result = relay.add_sink(RelaySink::new("s".into(), 5, DropPolicy::DropOldest));
assert!(matches!(result, Err(RelayError::SinkAlreadyExists(_))));
}
#[test]
fn test_source_stats_updated() {
let mut relay = StreamRelay::new();
relay
.add_sink(RelaySink::new("s".into(), 5, DropPolicy::DropOldest))
.expect("relay is not shut down");
relay
.relay_frame(make_frame("cam", 0, 500))
.expect("relay is not shut down");
let stats = relay
.source_stats("cam")
.expect("source 'cam' has frames relayed");
assert_eq!(stats.frames_received, 1);
assert_eq!(stats.bytes_received, 500);
}
}