#![allow(dead_code)]
use std::collections::HashMap;
use std::net::SocketAddr;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RelayProtocol {
Rtmp,
Srt,
Hls,
Dash,
Rist,
WebRtc,
Rtp,
}
impl RelayProtocol {
#[must_use]
pub const fn name(&self) -> &'static str {
match self {
Self::Rtmp => "RTMP",
Self::Srt => "SRT",
Self::Hls => "HLS",
Self::Dash => "DASH",
Self::Rist => "RIST",
Self::WebRtc => "WebRTC",
Self::Rtp => "RTP",
}
}
#[must_use]
pub const fn default_port(&self) -> u16 {
match self {
Self::Rtmp => 1935,
Self::Srt => 9000,
Self::Hls => 80,
Self::Dash => 80,
Self::Rist => 5004,
Self::WebRtc => 443,
Self::Rtp => 5004,
}
}
#[must_use]
pub const fn is_push(&self) -> bool {
matches!(self, Self::Rtmp | Self::Srt | Self::Rist | Self::Rtp)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RelayEndpoint {
pub protocol: RelayProtocol,
pub address: SocketAddr,
pub stream_key: Option<String>,
pub passphrase: Option<String>,
}
impl RelayEndpoint {
#[must_use]
pub fn new(protocol: RelayProtocol, address: SocketAddr) -> Self {
Self {
protocol,
address,
stream_key: None,
passphrase: None,
}
}
#[must_use]
pub fn with_stream_key(mut self, key: impl Into<String>) -> Self {
self.stream_key = Some(key.into());
self
}
#[must_use]
pub fn with_passphrase(mut self, pass: impl Into<String>) -> Self {
self.passphrase = Some(pass.into());
self
}
#[must_use]
pub fn to_url(&self) -> String {
let proto = self.protocol.name().to_lowercase();
let base = format!("{}://{}", proto, self.address);
match &self.stream_key {
Some(k) => format!("{}/{}", base, k),
None => base,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RouteStatus {
Active,
Idle,
Reconnecting,
Error,
Stopped,
}
#[derive(Debug, Clone, Default)]
pub struct RouteStats {
pub bytes_relayed: u64,
pub frames_relayed: u64,
pub frames_dropped: u64,
pub reconnection_count: u32,
pub input_bitrate_bps: f64,
pub output_bitrate_bps: f64,
pub avg_relay_latency: Duration,
}
#[derive(Debug)]
pub struct RelayRoute {
pub id: u32,
pub input: RelayEndpoint,
pub outputs: Vec<RelayEndpoint>,
pub status: RouteStatus,
pub stats: RouteStats,
pub created_at: Instant,
pub stream_start: Option<Instant>,
pub max_queue_depth: usize,
pub buffer_on_disconnect: bool,
}
impl RelayRoute {
#[must_use]
pub fn new(id: u32, input: RelayEndpoint) -> Self {
Self {
id,
input,
outputs: Vec::new(),
status: RouteStatus::Idle,
stats: RouteStats::default(),
created_at: Instant::now(),
stream_start: None,
max_queue_depth: 120, buffer_on_disconnect: true,
}
}
pub fn add_output(&mut self, output: RelayEndpoint) {
self.outputs.push(output);
}
pub fn remove_output(&mut self, addr: SocketAddr) {
self.outputs.retain(|o| o.address != addr);
}
#[must_use]
pub fn output_count(&self) -> usize {
self.outputs.len()
}
pub fn mark_active(&mut self) {
self.status = RouteStatus::Active;
self.stream_start = Some(Instant::now());
}
#[must_use]
pub fn stream_duration(&self) -> Option<Duration> {
self.stream_start.map(|t| t.elapsed())
}
pub fn relay_frame(&mut self, frame_bytes: usize) -> usize {
if self.status != RouteStatus::Active {
self.stats.frames_dropped += 1;
return 0;
}
let count = self.outputs.len();
self.stats.bytes_relayed += frame_bytes as u64 * count as u64;
self.stats.frames_relayed += 1;
count
}
#[must_use]
pub fn drop_ratio(&self) -> f64 {
let total = self.stats.frames_relayed + self.stats.frames_dropped;
if total == 0 {
return 0.0;
}
self.stats.frames_dropped as f64 / total as f64
}
}
#[derive(Debug, Clone)]
pub struct ProtocolBridge {
pub from: RelayProtocol,
pub to: RelayProtocol,
pub requires_transcode: bool,
}
impl ProtocolBridge {
#[must_use]
pub fn new(from: RelayProtocol, to: RelayProtocol) -> Self {
let requires_transcode = matches!(
(from, to),
(RelayProtocol::Rtmp, RelayProtocol::Hls)
| (RelayProtocol::Rtmp, RelayProtocol::Dash)
| (RelayProtocol::Srt, RelayProtocol::Hls)
| (RelayProtocol::Srt, RelayProtocol::Dash)
| (RelayProtocol::WebRtc, RelayProtocol::Rtmp)
| (RelayProtocol::WebRtc, RelayProtocol::Hls)
);
Self {
from,
to,
requires_transcode,
}
}
#[must_use]
pub fn is_passthrough(&self) -> bool {
self.from == self.to
}
}
pub struct RelayServer {
routes: HashMap<u32, RelayRoute>,
next_id: u32,
max_routes: usize,
total_frames: u64,
}
impl RelayServer {
#[must_use]
pub fn new(max_routes: usize) -> Self {
Self {
routes: HashMap::new(),
next_id: 1,
max_routes,
total_frames: 0,
}
}
pub fn add_route(&mut self, input: RelayEndpoint) -> Option<u32> {
if self.routes.len() >= self.max_routes {
return None;
}
let id = self.next_id;
self.next_id += 1;
self.routes.insert(id, RelayRoute::new(id, input));
Some(id)
}
pub fn remove_route(&mut self, id: u32) -> bool {
self.routes.remove(&id).is_some()
}
#[must_use]
pub fn route(&self, id: u32) -> Option<&RelayRoute> {
self.routes.get(&id)
}
pub fn route_mut(&mut self, id: u32) -> Option<&mut RelayRoute> {
self.routes.get_mut(&id)
}
#[must_use]
pub fn active_route_count(&self) -> usize {
self.routes
.values()
.filter(|r| r.status == RouteStatus::Active)
.count()
}
#[must_use]
pub fn route_count(&self) -> usize {
self.routes.len()
}
pub fn process_frame(&mut self, route_id: u32, frame_bytes: usize) -> usize {
if let Some(route) = self.routes.get_mut(&route_id) {
self.total_frames += 1;
route.relay_frame(frame_bytes)
} else {
0
}
}
#[must_use]
pub fn aggregate_stats(&self) -> RouteStats {
let mut agg = RouteStats::default();
for r in self.routes.values() {
agg.bytes_relayed += r.stats.bytes_relayed;
agg.frames_relayed += r.stats.frames_relayed;
agg.frames_dropped += r.stats.frames_dropped;
agg.reconnection_count += r.stats.reconnection_count;
}
agg
}
}
#[derive(Debug, Clone)]
pub struct RelayFrame {
pub seq: u64,
pub timestamp_ms: u64,
pub data: Vec<u8>,
pub is_keyframe: bool,
}
pub struct StreamRelay {
input_queue: std::collections::VecDeque<RelayFrame>,
output_queue: std::collections::VecDeque<RelayFrame>,
max_depth: usize,
frames_relayed: u64,
frames_dropped: u64,
source_protocol: RelayProtocol,
dest_protocol: RelayProtocol,
}
impl StreamRelay {
#[must_use]
pub fn new(
source_protocol: RelayProtocol,
dest_protocol: RelayProtocol,
max_depth: usize,
) -> Self {
Self {
input_queue: std::collections::VecDeque::new(),
output_queue: std::collections::VecDeque::new(),
max_depth,
frames_relayed: 0,
frames_dropped: 0,
source_protocol,
dest_protocol,
}
}
pub fn push_input(&mut self, frame: RelayFrame) {
if self.input_queue.len() >= self.max_depth {
self.frames_dropped += 1;
return;
}
self.input_queue.push_back(frame);
}
pub fn process(&mut self) {
while let Some(frame) = self.input_queue.pop_front() {
if self.output_queue.len() < self.max_depth {
self.output_queue.push_back(frame);
self.frames_relayed += 1;
} else {
self.frames_dropped += 1;
}
}
}
pub fn pop_output(&mut self) -> Option<RelayFrame> {
self.output_queue.pop_front()
}
#[must_use]
pub fn protocol_bridge(&self) -> ProtocolBridge {
ProtocolBridge::new(self.source_protocol, self.dest_protocol)
}
#[must_use]
pub const fn frames_relayed(&self) -> u64 {
self.frames_relayed
}
#[must_use]
pub const fn frames_dropped(&self) -> u64 {
self.frames_dropped
}
#[must_use]
pub fn input_depth(&self) -> usize {
self.input_queue.len()
}
#[must_use]
pub fn output_depth(&self) -> usize {
self.output_queue.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_rtmp_endpoint() -> RelayEndpoint {
RelayEndpoint::new(
RelayProtocol::Rtmp,
"127.0.0.1:1935".parse().expect("valid addr"),
)
.with_stream_key("live/stream1")
}
fn make_srt_endpoint() -> RelayEndpoint {
RelayEndpoint::new(
RelayProtocol::Srt,
"127.0.0.1:9000".parse().expect("valid addr"),
)
.with_passphrase("secret")
}
#[test]
fn test_relay_protocol_names() {
assert_eq!(RelayProtocol::Rtmp.name(), "RTMP");
assert_eq!(RelayProtocol::Srt.name(), "SRT");
assert_eq!(RelayProtocol::Rtmp.default_port(), 1935);
assert_eq!(RelayProtocol::Srt.default_port(), 9000);
}
#[test]
fn test_relay_protocol_push_pull() {
assert!(RelayProtocol::Rtmp.is_push());
assert!(!RelayProtocol::Hls.is_push());
}
#[test]
fn test_endpoint_to_url() {
let ep = make_rtmp_endpoint();
let url = ep.to_url();
assert!(url.contains("rtmp"));
assert!(url.contains("live/stream1"));
}
#[test]
fn test_relay_route_new() {
let route = RelayRoute::new(1, make_rtmp_endpoint());
assert_eq!(route.id, 1);
assert_eq!(route.status, RouteStatus::Idle);
assert_eq!(route.output_count(), 0);
}
#[test]
fn test_relay_route_outputs() {
let mut route = RelayRoute::new(1, make_rtmp_endpoint());
let addr: SocketAddr = "192.168.1.1:1935".parse().expect("valid addr");
route.add_output(RelayEndpoint::new(RelayProtocol::Rtmp, addr));
assert_eq!(route.output_count(), 1);
route.remove_output(addr);
assert_eq!(route.output_count(), 0);
}
#[test]
fn test_relay_route_relay_frame() {
let mut route = RelayRoute::new(1, make_rtmp_endpoint());
let addr: SocketAddr = "192.168.1.1:1935".parse().expect("valid addr");
route.add_output(RelayEndpoint::new(RelayProtocol::Rtmp, addr));
route.mark_active();
let delivered = route.relay_frame(1000);
assert_eq!(delivered, 1);
assert_eq!(route.stats.frames_relayed, 1);
assert_eq!(route.stats.bytes_relayed, 1000);
}
#[test]
fn test_relay_route_idle_drops_frames() {
let mut route = RelayRoute::new(1, make_rtmp_endpoint());
route.relay_frame(100); assert_eq!(route.stats.frames_dropped, 1);
}
#[test]
fn test_relay_route_drop_ratio() {
let route = RelayRoute::new(1, make_rtmp_endpoint());
assert_eq!(route.drop_ratio(), 0.0);
}
#[test]
fn test_relay_route_stream_duration() {
let mut route = RelayRoute::new(1, make_rtmp_endpoint());
assert!(route.stream_duration().is_none());
route.mark_active();
assert!(route.stream_duration().is_some());
}
#[test]
fn test_relay_server_add_remove() {
let mut server = RelayServer::new(10);
let id = server.add_route(make_rtmp_endpoint()).expect("should add");
assert_eq!(server.route_count(), 1);
server.remove_route(id);
assert_eq!(server.route_count(), 0);
}
#[test]
fn test_relay_server_max_routes() {
let mut server = RelayServer::new(2);
server.add_route(make_rtmp_endpoint());
server.add_route(make_rtmp_endpoint());
let result = server.add_route(make_rtmp_endpoint());
assert!(result.is_none());
}
#[test]
fn test_relay_server_active_count() {
let mut server = RelayServer::new(10);
let id = server.add_route(make_rtmp_endpoint()).expect("should add");
server.route_mut(id).expect("should exist").mark_active();
assert_eq!(server.active_route_count(), 1);
}
#[test]
fn test_relay_server_process_frame() {
let mut server = RelayServer::new(10);
let id = server.add_route(make_rtmp_endpoint()).expect("should add");
let addr: SocketAddr = "192.168.1.1:1935".parse().expect("valid addr");
{
let route = server.route_mut(id).expect("should exist");
route.add_output(RelayEndpoint::new(RelayProtocol::Rtmp, addr));
route.mark_active();
}
let deliveries = server.process_frame(id, 500);
assert_eq!(deliveries, 1);
}
#[test]
fn test_relay_server_aggregate_stats() {
let mut server = RelayServer::new(10);
let id = server.add_route(make_rtmp_endpoint()).expect("should add");
let addr: SocketAddr = "192.168.1.1:1935".parse().expect("valid addr");
{
let route = server.route_mut(id).expect("should exist");
route.add_output(RelayEndpoint::new(RelayProtocol::Rtmp, addr));
route.mark_active();
}
server.process_frame(id, 100);
let stats = server.aggregate_stats();
assert_eq!(stats.frames_relayed, 1);
}
#[test]
fn test_protocol_bridge_passthrough() {
let bridge = ProtocolBridge::new(RelayProtocol::Rtmp, RelayProtocol::Rtmp);
assert!(bridge.is_passthrough());
}
#[test]
fn test_protocol_bridge_transcode() {
let bridge = ProtocolBridge::new(RelayProtocol::Rtmp, RelayProtocol::Hls);
assert!(bridge.requires_transcode);
}
#[test]
fn test_stream_relay_push_process() {
let mut relay = StreamRelay::new(RelayProtocol::Srt, RelayProtocol::Rtmp, 10);
relay.push_input(RelayFrame {
seq: 0,
timestamp_ms: 0,
data: vec![0u8; 188],
is_keyframe: true,
});
relay.process();
assert_eq!(relay.frames_relayed(), 1);
assert!(relay.pop_output().is_some());
}
#[test]
fn test_stream_relay_drop_when_full() {
let mut relay = StreamRelay::new(RelayProtocol::Srt, RelayProtocol::Rtmp, 1);
for i in 0..5u64 {
relay.push_input(RelayFrame {
seq: i,
timestamp_ms: i * 33,
data: vec![0u8; 4],
is_keyframe: false,
});
}
assert!(relay.frames_dropped() > 0);
}
#[test]
fn test_stream_relay_protocol_bridge() {
let relay = StreamRelay::new(RelayProtocol::Srt, RelayProtocol::Rtmp, 10);
let bridge = relay.protocol_bridge();
assert_eq!(bridge.from, RelayProtocol::Srt);
assert_eq!(bridge.to, RelayProtocol::Rtmp);
}
#[test]
fn test_endpoint_key_and_passphrase() {
let ep = make_srt_endpoint();
assert_eq!(ep.passphrase.as_deref(), Some("secret"));
let ep2 = make_rtmp_endpoint();
assert!(ep2.stream_key.is_some());
}
}