#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SrtConnectionMode {
Caller {
target_host: String,
target_port: u16,
},
Listener {
bind_port: u16,
backlog: u32,
},
Rendezvous {
local_port: u16,
remote_host: String,
remote_port: u16,
},
}
impl SrtConnectionMode {
#[must_use]
pub fn mode_name(&self) -> &str {
match self {
Self::Caller { .. } => "caller",
Self::Listener { .. } => "listener",
Self::Rendezvous { .. } => "rendezvous",
}
}
#[must_use]
pub fn is_outbound(&self) -> bool {
matches!(self, Self::Caller { .. } | Self::Rendezvous { .. })
}
pub fn validate(&self) -> Result<(), String> {
match self {
Self::Caller { target_port, .. } => {
if *target_port == 0 {
return Err("Caller: target_port must be greater than 0".to_owned());
}
}
Self::Listener { bind_port, backlog } => {
if *bind_port == 0 {
return Err("Listener: bind_port must be greater than 0".to_owned());
}
if *backlog == 0 {
return Err("Listener: backlog must be greater than 0".to_owned());
}
}
Self::Rendezvous {
local_port,
remote_port,
..
} => {
if *local_port == 0 {
return Err("Rendezvous: local_port must be greater than 0".to_owned());
}
if *remote_port == 0 {
return Err("Rendezvous: remote_port must be greater than 0".to_owned());
}
}
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct SrtConnectionConfig {
pub mode: SrtConnectionMode,
pub latency_ms: u32,
pub max_bandwidth_bps: Option<u64>,
pub passphrase: Option<String>,
pub stream_id: Option<String>,
pub connect_timeout_ms: u32,
}
impl SrtConnectionConfig {
fn new_with_mode(mode: SrtConnectionMode) -> Self {
Self {
mode,
latency_ms: 120,
max_bandwidth_bps: None,
passphrase: None,
stream_id: None,
connect_timeout_ms: 3000,
}
}
#[must_use]
pub fn caller(host: impl Into<String>, port: u16) -> Self {
Self::new_with_mode(SrtConnectionMode::Caller {
target_host: host.into(),
target_port: port,
})
}
#[must_use]
pub fn listener(port: u16) -> Self {
Self::new_with_mode(SrtConnectionMode::Listener {
bind_port: port,
backlog: 5,
})
}
#[must_use]
pub fn rendezvous(local_port: u16, remote_host: impl Into<String>, remote_port: u16) -> Self {
Self::new_with_mode(SrtConnectionMode::Rendezvous {
local_port,
remote_host: remote_host.into(),
remote_port,
})
}
#[must_use]
pub fn with_latency(mut self, ms: u32) -> Self {
self.latency_ms = ms;
self
}
#[must_use]
pub fn with_passphrase(mut self, pass: impl Into<String>) -> Self {
self.passphrase = Some(pass.into());
self
}
#[must_use]
pub fn with_stream_id(mut self, id: impl Into<String>) -> Self {
self.stream_id = Some(id.into());
self
}
#[must_use]
pub fn with_max_bandwidth(mut self, bps: u64) -> Self {
self.max_bandwidth_bps = Some(bps);
self
}
#[must_use]
pub fn build(self) -> SrtConnection {
SrtConnection::new(self)
}
pub fn validate(&self) -> Result<(), String> {
self.mode.validate()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SrtState {
Disconnected,
Connecting,
Connected,
Broken,
Closed,
}
pub struct SrtConnection {
config: SrtConnectionConfig,
state: SrtState,
bytes_sent: u64,
bytes_received: u64,
packets_lost: u64,
}
impl SrtConnection {
#[must_use]
pub fn new(config: SrtConnectionConfig) -> Self {
Self {
config,
state: SrtState::Disconnected,
bytes_sent: 0,
bytes_received: 0,
packets_lost: 0,
}
}
pub fn connect(&mut self) -> Result<(), String> {
match self.state {
SrtState::Disconnected | SrtState::Broken => {}
SrtState::Connecting => {
return Err("already connecting".to_owned());
}
SrtState::Connected => {
return Err("already connected".to_owned());
}
SrtState::Closed => {
return Err("connection is closed; create a new SrtConnection".to_owned());
}
}
self.config.validate()?;
self.state = SrtState::Connecting;
self.state = SrtState::Connected;
Ok(())
}
pub fn disconnect(&mut self) {
self.state = SrtState::Closed;
}
#[must_use]
pub fn state(&self) -> &SrtState {
&self.state
}
#[must_use]
pub fn config(&self) -> &SrtConnectionConfig {
&self.config
}
pub fn simulate_send(&mut self, bytes: u64) {
self.bytes_sent = self.bytes_sent.saturating_add(bytes);
}
pub fn simulate_receive(&mut self, bytes: u64) {
self.bytes_received = self.bytes_received.saturating_add(bytes);
}
pub fn simulate_loss(&mut self, packets: u64) {
self.packets_lost = self.packets_lost.saturating_add(packets);
}
#[must_use]
pub fn bytes_sent(&self) -> u64 {
self.bytes_sent
}
#[must_use]
pub fn bytes_received(&self) -> u64 {
self.bytes_received
}
#[must_use]
pub fn packet_loss_rate(&self) -> f64 {
const MTU: u64 = 1316;
let sent_pkts = self.bytes_sent.saturating_add(MTU - 1) / MTU;
let recv_pkts = self.bytes_received.saturating_add(MTU - 1) / MTU;
let total = sent_pkts + recv_pkts;
if total == 0 {
return 0.0;
}
self.packets_lost as f64 / total as f64
}
}
#[derive(Debug, Clone)]
pub struct SrtStreamConfig {
inner: SrtConnectionConfig,
}
impl SrtStreamConfig {
#[must_use]
pub fn with_mode(mode: SrtConnectionMode) -> Self {
Self {
inner: SrtConnectionConfig::new_with_mode(mode),
}
}
#[must_use]
pub fn with_latency(mut self, ms: u32) -> Self {
self.inner.latency_ms = ms;
self
}
#[must_use]
pub fn with_passphrase(mut self, pass: impl Into<String>) -> Self {
self.inner.passphrase = Some(pass.into());
self
}
#[must_use]
pub fn with_stream_id(mut self, id: impl Into<String>) -> Self {
self.inner.stream_id = Some(id.into());
self
}
#[must_use]
pub fn config(&self) -> &SrtConnectionConfig {
&self.inner
}
#[must_use]
pub fn build(self) -> SrtConnection {
self.inner.build()
}
#[must_use]
pub fn connection_string(&self) -> String {
connection_string(&self.inner)
}
}
#[must_use]
pub fn connection_string(config: &SrtConnectionConfig) -> String {
let (host, port, mode_name) = match &config.mode {
SrtConnectionMode::Caller {
target_host,
target_port,
} => (target_host.as_str(), *target_port, "caller"),
SrtConnectionMode::Listener { bind_port, .. } => ("0.0.0.0", *bind_port, "listener"),
SrtConnectionMode::Rendezvous {
remote_host,
remote_port,
..
} => (remote_host.as_str(), *remote_port, "rendezvous"),
};
let mut url = format!(
"srt://{host}:{port}?mode={mode_name}&latency={}",
config.latency_ms
);
if let Some(ref pass) = config.passphrase {
url.push_str(&format!("&passphrase={pass}"));
}
if let Some(ref sid) = config.stream_id {
url.push_str(&format!("&streamid={sid}"));
}
url
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_mode_name_caller() {
let m = SrtConnectionMode::Caller {
target_host: "host".to_owned(),
target_port: 9000,
};
assert_eq!(m.mode_name(), "caller");
}
#[test]
fn test_mode_name_listener() {
let m = SrtConnectionMode::Listener {
bind_port: 9000,
backlog: 5,
};
assert_eq!(m.mode_name(), "listener");
}
#[test]
fn test_mode_name_rendezvous() {
let m = SrtConnectionMode::Rendezvous {
local_port: 9000,
remote_host: "peer".to_owned(),
remote_port: 9001,
};
assert_eq!(m.mode_name(), "rendezvous");
}
#[test]
fn test_validate_caller_zero_port() {
let m = SrtConnectionMode::Caller {
target_host: "host".to_owned(),
target_port: 0,
};
assert!(m.validate().is_err());
}
#[test]
fn test_validate_listener_zero_port() {
let m = SrtConnectionMode::Listener {
bind_port: 0,
backlog: 5,
};
assert!(m.validate().is_err());
}
#[test]
fn test_validate_listener_zero_backlog() {
let m = SrtConnectionMode::Listener {
bind_port: 9000,
backlog: 0,
};
assert!(m.validate().is_err());
}
#[test]
fn test_validate_rendezvous_zero_local() {
let m = SrtConnectionMode::Rendezvous {
local_port: 0,
remote_host: "h".to_owned(),
remote_port: 9001,
};
assert!(m.validate().is_err());
}
#[test]
fn test_caller_is_outbound() {
let m = SrtConnectionMode::Caller {
target_host: "h".to_owned(),
target_port: 9000,
};
assert!(m.is_outbound());
}
#[test]
fn test_listener_is_not_outbound() {
let m = SrtConnectionMode::Listener {
bind_port: 9000,
backlog: 5,
};
assert!(!m.is_outbound());
}
#[test]
fn test_rendezvous_is_outbound() {
let m = SrtConnectionMode::Rendezvous {
local_port: 9000,
remote_host: "h".to_owned(),
remote_port: 9001,
};
assert!(m.is_outbound());
}
#[test]
fn test_connection_starts_disconnected() {
let conn = SrtConnectionConfig::caller("host", 9000).build();
assert_eq!(conn.state(), &SrtState::Disconnected);
}
#[test]
fn test_connect_transitions() {
let mut conn = SrtConnectionConfig::caller("host", 9000).build();
conn.connect().expect("connect should succeed");
assert_eq!(conn.state(), &SrtState::Connected);
}
#[test]
fn test_disconnect() {
let mut conn = SrtConnectionConfig::caller("host", 9000).build();
conn.connect().expect("connect");
conn.disconnect();
assert_eq!(conn.state(), &SrtState::Closed);
}
#[test]
fn test_simulate_send() {
let mut conn = SrtConnectionConfig::caller("host", 9000).build();
conn.simulate_send(1316);
conn.simulate_send(1316);
assert_eq!(conn.bytes_sent(), 2632);
}
#[test]
fn test_simulate_receive() {
let mut conn = SrtConnectionConfig::listener(9000).build();
conn.simulate_receive(4096);
assert_eq!(conn.bytes_received(), 4096);
}
#[test]
fn test_packet_loss_rate_no_traffic() {
let conn = SrtConnectionConfig::caller("host", 9000).build();
assert!((conn.packet_loss_rate() - 0.0).abs() < f64::EPSILON);
}
#[test]
fn test_packet_loss_rate() {
let mut conn = SrtConnectionConfig::rendezvous(9000, "peer", 9001).build();
conn.simulate_send(1316); conn.simulate_loss(1);
let rate = conn.packet_loss_rate();
assert!((rate - 1.0).abs() < 1e-9);
}
#[test]
fn test_with_latency() {
let cfg = SrtConnectionConfig::caller("host", 9000).with_latency(200);
assert_eq!(cfg.latency_ms, 200);
}
#[test]
fn test_with_passphrase() {
let cfg = SrtConnectionConfig::caller("host", 9000).with_passphrase("secret");
assert_eq!(cfg.passphrase.as_deref(), Some("secret"));
}
#[test]
fn test_connect_on_closed_errors() {
let mut conn = SrtConnectionConfig::caller("host", 9000).build();
conn.disconnect();
let result = conn.connect();
assert!(result.is_err());
}
#[test]
fn test_connection_string_caller() {
let cfg = SrtConnectionConfig::caller("stream.example.com", 9000).with_latency(120);
let url = connection_string(&cfg);
assert!(url.starts_with("srt://stream.example.com:9000"));
assert!(url.contains("mode=caller"));
assert!(url.contains("latency=120"));
}
#[test]
fn test_connection_string_listener() {
let cfg = SrtConnectionConfig::listener(9000).with_latency(80);
let url = connection_string(&cfg);
assert!(url.starts_with("srt://0.0.0.0:9000"));
assert!(url.contains("mode=listener"));
assert!(url.contains("latency=80"));
}
#[test]
fn test_connection_string_rendezvous() {
let cfg = SrtConnectionConfig::rendezvous(9000, "peer.example.com", 9001).with_latency(200);
let url = connection_string(&cfg);
assert!(url.contains("mode=rendezvous"));
assert!(url.contains("peer.example.com:9001"));
}
#[test]
fn test_connection_string_passphrase() {
let cfg = SrtConnectionConfig::caller("host", 9000).with_passphrase("secret123");
let url = connection_string(&cfg);
assert!(url.contains("passphrase=secret123"));
}
#[test]
fn test_connection_string_stream_id() {
let cfg = SrtConnectionConfig::caller("host", 9000)
.with_stream_id("#!::r=live/stream1,m=publish");
let url = connection_string(&cfg);
assert!(url.contains("streamid="));
}
#[test]
fn test_srt_stream_config_with_mode_caller() {
let mode = SrtConnectionMode::Caller {
target_host: "ingest.example.com".to_owned(),
target_port: 5000,
};
let cfg = SrtStreamConfig::with_mode(mode).with_latency(150);
let url = cfg.connection_string();
assert!(url.contains("ingest.example.com:5000"));
assert!(url.contains("mode=caller"));
assert!(url.contains("latency=150"));
}
#[test]
fn test_srt_stream_config_build() {
let mode = SrtConnectionMode::Listener {
bind_port: 9000,
backlog: 5,
};
let conn = SrtStreamConfig::with_mode(mode).build();
assert_eq!(conn.state(), &SrtState::Disconnected);
}
#[test]
fn test_connection_string_no_extras() {
let cfg = SrtConnectionConfig::caller("host", 9000);
let url = connection_string(&cfg);
assert!(!url.contains("passphrase="));
assert!(!url.contains("streamid="));
}
}