use crate::error::{NetError, NetResult};
use std::collections::HashMap;
use std::time::{Duration, Instant, SystemTime};
#[derive(Debug, Clone)]
pub struct IceServerConfig {
pub urls: Vec<String>,
pub username: Option<String>,
pub credential: Option<String>,
pub credential_type: Option<String>,
}
impl IceServerConfig {
#[must_use]
pub fn stun(url: impl Into<String>) -> Self {
Self {
urls: vec![url.into()],
username: None,
credential: None,
credential_type: None,
}
}
#[must_use]
pub fn turn(
url: impl Into<String>,
username: impl Into<String>,
credential: impl Into<String>,
) -> Self {
Self {
urls: vec![url.into()],
username: Some(username.into()),
credential: Some(credential.into()),
credential_type: Some("password".to_owned()),
}
}
#[must_use]
pub fn to_link_header(&self) -> String {
let mut parts = Vec::new();
for url in &self.urls {
let mut link = format!("<{url}>; rel=\"ice-server\"");
if let Some(ref user) = self.username {
link.push_str(&format!("; username=\"{user}\""));
}
if let Some(ref cred) = self.credential {
link.push_str(&format!("; credential=\"{cred}\""));
}
if let Some(ref ct) = self.credential_type {
link.push_str(&format!("; credential-type=\"{ct}\""));
}
parts.push(link);
}
parts.join(", ")
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WhipState {
WaitingOffer,
Negotiating,
Active,
Terminated,
}
impl WhipState {
#[must_use]
pub const fn name(&self) -> &'static str {
match self {
Self::WaitingOffer => "waiting_offer",
Self::Negotiating => "negotiating",
Self::Active => "active",
Self::Terminated => "terminated",
}
}
}
#[derive(Debug, Clone)]
pub struct WhipSession {
pub session_id: String,
pub state: WhipState,
pub offer_sdp: Option<String>,
pub answer_sdp: Option<String>,
pub auth_token: Option<String>,
pub trickle_candidates: Vec<String>,
pub created_at: Instant,
pub etag: String,
pub extensions: HashMap<String, String>,
}
impl WhipSession {
#[must_use]
pub fn new(session_id: impl Into<String>) -> Self {
let id = session_id.into();
let etag = format!("W/\"{}\"", simple_hash(&id));
Self {
session_id: id,
state: WhipState::WaitingOffer,
offer_sdp: None,
answer_sdp: None,
auth_token: None,
trickle_candidates: Vec::new(),
created_at: Instant::now(),
etag,
extensions: HashMap::new(),
}
}
pub fn process_offer(&mut self, offer_sdp: &str) -> NetResult<String> {
if self.state != WhipState::WaitingOffer && self.state != WhipState::Negotiating {
return Err(NetError::invalid_state(format!(
"Cannot process offer in state: {}",
self.state.name()
)));
}
self.offer_sdp = Some(offer_sdp.to_owned());
let answer = generate_sdp_answer(offer_sdp);
self.answer_sdp = Some(answer.clone());
self.state = WhipState::Negotiating;
Ok(answer)
}
pub fn add_trickle_candidates(&mut self, sdp_fragment: &str) -> NetResult<()> {
if self.state == WhipState::Terminated {
return Err(NetError::invalid_state("Session terminated"));
}
for line in sdp_fragment.lines() {
let trimmed = line.trim();
if trimmed.starts_with("a=candidate:") || trimmed.starts_with("a=end-of-candidates") {
self.trickle_candidates.push(trimmed.to_owned());
}
}
if sdp_fragment.contains("end-of-candidates") {
self.state = WhipState::Active;
}
Ok(())
}
pub fn terminate(&mut self) {
self.state = WhipState::Terminated;
}
#[must_use]
pub fn is_active(&self) -> bool {
self.state == WhipState::Active
}
#[must_use]
pub fn duration(&self) -> Duration {
self.created_at.elapsed()
}
#[must_use]
pub fn resource_path(&self) -> String {
format!("/whip/resource/{}", self.session_id)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WhepState {
WaitingOffer,
Negotiating,
Active,
Terminated,
}
impl WhepState {
#[must_use]
pub const fn name(&self) -> &'static str {
match self {
Self::WaitingOffer => "waiting_offer",
Self::Negotiating => "negotiating",
Self::Active => "active",
Self::Terminated => "terminated",
}
}
}
#[derive(Debug, Clone)]
pub struct WhepSession {
pub session_id: String,
pub state: WhepState,
pub offer_sdp: Option<String>,
pub answer_sdp: Option<String>,
pub auth_token: Option<String>,
pub trickle_candidates: Vec<String>,
pub created_at: Instant,
pub etag: String,
pub stream_key: Option<String>,
pub selected_layer: Option<LayerSelection>,
}
impl WhepSession {
#[must_use]
pub fn new(session_id: impl Into<String>) -> Self {
let id = session_id.into();
let etag = format!("W/\"{}\"", simple_hash(&id));
Self {
session_id: id,
state: WhepState::WaitingOffer,
offer_sdp: None,
answer_sdp: None,
auth_token: None,
trickle_candidates: Vec::new(),
created_at: Instant::now(),
etag,
stream_key: None,
selected_layer: None,
}
}
pub fn process_offer(&mut self, offer_sdp: &str) -> NetResult<String> {
if self.state != WhepState::WaitingOffer && self.state != WhepState::Negotiating {
return Err(NetError::invalid_state(format!(
"Cannot process offer in state: {}",
self.state.name()
)));
}
self.offer_sdp = Some(offer_sdp.to_owned());
let answer = generate_whep_answer(offer_sdp);
self.answer_sdp = Some(answer.clone());
self.state = WhepState::Negotiating;
Ok(answer)
}
pub fn add_trickle_candidates(&mut self, sdp_fragment: &str) -> NetResult<()> {
if self.state == WhepState::Terminated {
return Err(NetError::invalid_state("Session terminated"));
}
for line in sdp_fragment.lines() {
let trimmed = line.trim();
if trimmed.starts_with("a=candidate:") || trimmed.starts_with("a=end-of-candidates") {
self.trickle_candidates.push(trimmed.to_owned());
}
}
if sdp_fragment.contains("end-of-candidates") {
self.state = WhepState::Active;
}
Ok(())
}
pub fn select_layer(&mut self, layer: LayerSelection) -> NetResult<()> {
if self.state == WhepState::Terminated {
return Err(NetError::invalid_state("Session terminated"));
}
self.selected_layer = Some(layer);
Ok(())
}
pub fn terminate(&mut self) {
self.state = WhepState::Terminated;
}
#[must_use]
pub fn is_active(&self) -> bool {
self.state == WhepState::Active
}
#[must_use]
pub fn resource_path(&self) -> String {
format!("/whep/resource/{}", self.session_id)
}
}
#[derive(Debug, Clone)]
pub struct LayerSelection {
pub encoding_id: Option<String>,
pub spatial_layer: Option<u8>,
pub temporal_layer: Option<u8>,
pub max_width: Option<u32>,
pub max_height: Option<u32>,
pub max_bitrate: Option<u64>,
pub max_framerate: Option<f64>,
}
impl LayerSelection {
#[must_use]
pub fn encoding(id: impl Into<String>) -> Self {
Self {
encoding_id: Some(id.into()),
spatial_layer: None,
temporal_layer: None,
max_width: None,
max_height: None,
max_bitrate: None,
max_framerate: None,
}
}
#[must_use]
pub fn svc(spatial: u8, temporal: u8) -> Self {
Self {
encoding_id: None,
spatial_layer: Some(spatial),
temporal_layer: Some(temporal),
max_width: None,
max_height: None,
max_bitrate: None,
max_framerate: None,
}
}
#[must_use]
pub fn with_max_resolution(mut self, width: u32, height: u32) -> Self {
self.max_width = Some(width);
self.max_height = Some(height);
self
}
}
#[derive(Debug, Clone)]
pub struct EndpointConfig {
pub base_url: String,
pub whip_path: String,
pub whep_path: String,
pub ice_servers: Vec<IceServerConfig>,
pub require_auth: bool,
pub max_sessions: usize,
pub session_timeout: Duration,
}
impl Default for EndpointConfig {
fn default() -> Self {
Self {
base_url: "http://localhost:8080".to_owned(),
whip_path: "/whip".to_owned(),
whep_path: "/whep".to_owned(),
ice_servers: vec![IceServerConfig::stun("stun:stun.l.google.com:19302")],
require_auth: false,
max_sessions: 100,
session_timeout: Duration::from_secs(300),
}
}
}
#[derive(Debug)]
pub struct WhipWhepEndpoint {
config: EndpointConfig,
whip_sessions: HashMap<String, WhipSession>,
whep_sessions: HashMap<String, WhepSession>,
session_counter: u64,
}
impl WhipWhepEndpoint {
#[must_use]
pub fn new(config: EndpointConfig) -> Self {
Self {
config,
whip_sessions: HashMap::new(),
whep_sessions: HashMap::new(),
session_counter: 0,
}
}
pub fn create_whip_session(
&mut self,
offer_sdp: &str,
auth_token: Option<&str>,
) -> NetResult<(String, String)> {
if self.whip_sessions.len() >= self.config.max_sessions {
return Err(NetError::connection("Maximum sessions reached"));
}
if self.config.require_auth && auth_token.is_none() {
return Err(NetError::authentication("Bearer token required"));
}
let session_id = self.generate_session_id();
let mut session = WhipSession::new(&session_id);
session.auth_token = auth_token.map(|s| s.to_owned());
let answer = session.process_offer(offer_sdp)?;
let resource_path = session.resource_path();
self.whip_sessions.insert(session_id, session);
Ok((resource_path, answer))
}
pub fn create_whep_session(
&mut self,
offer_sdp: &str,
stream_key: Option<&str>,
auth_token: Option<&str>,
) -> NetResult<(String, String)> {
if self.whep_sessions.len() >= self.config.max_sessions {
return Err(NetError::connection("Maximum sessions reached"));
}
if self.config.require_auth && auth_token.is_none() {
return Err(NetError::authentication("Bearer token required"));
}
let session_id = self.generate_session_id();
let mut session = WhepSession::new(&session_id);
session.auth_token = auth_token.map(|s| s.to_owned());
session.stream_key = stream_key.map(|s| s.to_owned());
let answer = session.process_offer(offer_sdp)?;
let resource_path = session.resource_path();
self.whep_sessions.insert(session_id, session);
Ok((resource_path, answer))
}
pub fn trickle_whip(&mut self, session_id: &str, sdp_fragment: &str) -> NetResult<()> {
let session = self
.whip_sessions
.get_mut(session_id)
.ok_or_else(|| NetError::not_found(format!("WHIP session not found: {session_id}")))?;
session.add_trickle_candidates(sdp_fragment)
}
pub fn trickle_whep(&mut self, session_id: &str, sdp_fragment: &str) -> NetResult<()> {
let session = self
.whep_sessions
.get_mut(session_id)
.ok_or_else(|| NetError::not_found(format!("WHEP session not found: {session_id}")))?;
session.add_trickle_candidates(sdp_fragment)
}
pub fn delete_whip_session(&mut self, session_id: &str) -> NetResult<()> {
let session = self
.whip_sessions
.get_mut(session_id)
.ok_or_else(|| NetError::not_found(format!("WHIP session not found: {session_id}")))?;
session.terminate();
Ok(())
}
pub fn delete_whep_session(&mut self, session_id: &str) -> NetResult<()> {
let session = self
.whep_sessions
.get_mut(session_id)
.ok_or_else(|| NetError::not_found(format!("WHEP session not found: {session_id}")))?;
session.terminate();
Ok(())
}
#[must_use]
pub fn active_whip_count(&self) -> usize {
self.whip_sessions
.values()
.filter(|s| s.state != WhipState::Terminated)
.count()
}
#[must_use]
pub fn active_whep_count(&self) -> usize {
self.whep_sessions
.values()
.filter(|s| s.state != WhepState::Terminated)
.count()
}
pub fn cleanup(&mut self) {
let timeout = self.config.session_timeout;
self.whip_sessions
.retain(|_, s| s.state != WhipState::Terminated && s.created_at.elapsed() < timeout);
self.whep_sessions
.retain(|_, s| s.state != WhepState::Terminated && s.created_at.elapsed() < timeout);
}
#[must_use]
pub fn ice_server_headers(&self) -> Vec<String> {
self.config
.ice_servers
.iter()
.map(|s| s.to_link_header())
.collect()
}
#[must_use]
pub fn get_whip_session(&self, id: &str) -> Option<&WhipSession> {
self.whip_sessions.get(id)
}
#[must_use]
pub fn get_whep_session(&self, id: &str) -> Option<&WhepSession> {
self.whep_sessions.get(id)
}
fn generate_session_id(&mut self) -> String {
self.session_counter += 1;
let ts = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or(0);
format!("{ts:x}-{:04x}", self.session_counter)
}
}
fn generate_sdp_answer(offer: &str) -> String {
let mut answer = String::with_capacity(offer.len());
answer.push_str("v=0\r\n");
answer.push_str("o=- 0 0 IN IP4 0.0.0.0\r\n");
answer.push_str("s=-\r\n");
answer.push_str("t=0 0\r\n");
answer.push_str("a=group:BUNDLE 0\r\n");
for line in offer.lines() {
let trimmed = line.trim();
if trimmed.starts_with("m=") {
answer.push_str(&format!("{trimmed}\r\n"));
answer.push_str("c=IN IP4 0.0.0.0\r\n");
answer.push_str("a=recvonly\r\n");
answer.push_str("a=rtcp-mux\r\n");
} else if trimmed.starts_with("a=ice-ufrag:") || trimmed.starts_with("a=ice-pwd:") {
answer.push_str(&format!("{trimmed}\r\n"));
} else if trimmed.starts_with("a=fingerprint:") {
answer.push_str(&format!("{trimmed}\r\n"));
}
}
answer
}
fn generate_whep_answer(offer: &str) -> String {
let mut answer = String::with_capacity(offer.len());
answer.push_str("v=0\r\n");
answer.push_str("o=- 0 0 IN IP4 0.0.0.0\r\n");
answer.push_str("s=-\r\n");
answer.push_str("t=0 0\r\n");
answer.push_str("a=group:BUNDLE 0\r\n");
for line in offer.lines() {
let trimmed = line.trim();
if trimmed.starts_with("m=") {
answer.push_str(&format!("{trimmed}\r\n"));
answer.push_str("c=IN IP4 0.0.0.0\r\n");
answer.push_str("a=sendonly\r\n");
answer.push_str("a=rtcp-mux\r\n");
} else if trimmed.starts_with("a=ice-ufrag:") || trimmed.starts_with("a=ice-pwd:") {
answer.push_str(&format!("{trimmed}\r\n"));
} else if trimmed.starts_with("a=fingerprint:") {
answer.push_str(&format!("{trimmed}\r\n"));
}
}
answer
}
fn simple_hash(s: &str) -> u64 {
let mut hash: u64 = 5381;
for byte in s.bytes() {
hash = hash.wrapping_mul(33).wrapping_add(u64::from(byte));
}
hash
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_offer() -> &'static str {
"v=0\r\n\
o=- 0 0 IN IP4 0.0.0.0\r\n\
s=-\r\n\
t=0 0\r\n\
a=ice-ufrag:abc123\r\n\
a=ice-pwd:secret\r\n\
a=fingerprint:sha-256 AA:BB:CC\r\n\
m=video 9 UDP/TLS/RTP/SAVPF 96\r\n\
a=sendonly\r\n\
a=rtcp-mux\r\n"
}
fn sample_candidates() -> &'static str {
"a=candidate:1 1 udp 2130706431 192.168.1.1 50000 typ host\r\n\
a=end-of-candidates\r\n"
}
#[test]
fn test_ice_server_stun() {
let cfg = IceServerConfig::stun("stun:stun.example.com:3478");
assert_eq!(cfg.urls.len(), 1);
assert!(cfg.username.is_none());
}
#[test]
fn test_ice_server_turn() {
let cfg = IceServerConfig::turn("turn:turn.example.com", "user", "pass");
assert!(cfg.username.is_some());
assert!(cfg.credential.is_some());
}
#[test]
fn test_ice_server_link_header() {
let cfg = IceServerConfig::stun("stun:stun.example.com:3478");
let header = cfg.to_link_header();
assert!(header.contains("ice-server"));
assert!(header.contains("stun:stun.example.com"));
}
#[test]
fn test_turn_link_header() {
let cfg = IceServerConfig::turn("turn:t.example.com", "user", "pass");
let header = cfg.to_link_header();
assert!(header.contains("username=\"user\""));
assert!(header.contains("credential=\"pass\""));
}
#[test]
fn test_whip_state_names() {
assert_eq!(WhipState::WaitingOffer.name(), "waiting_offer");
assert_eq!(WhipState::Active.name(), "active");
assert_eq!(WhipState::Terminated.name(), "terminated");
}
#[test]
fn test_whip_session_new() {
let session = WhipSession::new("test-session");
assert_eq!(session.state, WhipState::WaitingOffer);
assert_eq!(session.session_id, "test-session");
assert!(!session.is_active());
}
#[test]
fn test_whip_process_offer() {
let mut session = WhipSession::new("test");
let result = session.process_offer(sample_offer());
assert!(result.is_ok());
assert_eq!(session.state, WhipState::Negotiating);
assert!(session.offer_sdp.is_some());
assert!(session.answer_sdp.is_some());
}
#[test]
fn test_whip_answer_recvonly() {
let mut session = WhipSession::new("test");
let answer = session.process_offer(sample_offer()).expect("should work");
assert!(answer.contains("recvonly"));
assert!(answer.contains("rtcp-mux"));
}
#[test]
fn test_whip_trickle_candidates() {
let mut session = WhipSession::new("test");
session.process_offer(sample_offer()).expect("should work");
session
.add_trickle_candidates(sample_candidates())
.expect("should work");
assert_eq!(session.trickle_candidates.len(), 2);
assert_eq!(session.state, WhipState::Active);
}
#[test]
fn test_whip_terminate() {
let mut session = WhipSession::new("test");
session.terminate();
assert_eq!(session.state, WhipState::Terminated);
}
#[test]
fn test_whip_resource_path() {
let session = WhipSession::new("abc123");
assert_eq!(session.resource_path(), "/whip/resource/abc123");
}
#[test]
fn test_whep_session_new() {
let session = WhepSession::new("test-whep");
assert_eq!(session.state, WhepState::WaitingOffer);
assert!(!session.is_active());
}
#[test]
fn test_whep_process_offer() {
let mut session = WhepSession::new("test");
let result = session.process_offer(sample_offer());
assert!(result.is_ok());
assert_eq!(session.state, WhepState::Negotiating);
}
#[test]
fn test_whep_answer_sendonly() {
let mut session = WhepSession::new("test");
let answer = session.process_offer(sample_offer()).expect("should work");
assert!(answer.contains("sendonly"));
}
#[test]
fn test_whep_layer_selection() {
let mut session = WhepSession::new("test");
session.process_offer(sample_offer()).expect("should work");
let layer = LayerSelection::encoding("mid").with_max_resolution(1920, 1080);
session.select_layer(layer).expect("should work");
assert!(session.selected_layer.is_some());
}
#[test]
fn test_whep_svc_layer() {
let layer = LayerSelection::svc(2, 1);
assert_eq!(layer.spatial_layer, Some(2));
assert_eq!(layer.temporal_layer, Some(1));
}
#[test]
fn test_whep_resource_path() {
let session = WhepSession::new("xyz789");
assert_eq!(session.resource_path(), "/whep/resource/xyz789");
}
#[test]
fn test_endpoint_config_default() {
let cfg = EndpointConfig::default();
assert_eq!(cfg.whip_path, "/whip");
assert_eq!(cfg.whep_path, "/whep");
assert!(!cfg.require_auth);
assert_eq!(cfg.max_sessions, 100);
}
#[test]
fn test_endpoint_create_whip() {
let mut endpoint = WhipWhepEndpoint::new(EndpointConfig::default());
let (path, answer) = endpoint
.create_whip_session(sample_offer(), None)
.expect("should work");
assert!(path.starts_with("/whip/resource/"));
assert!(!answer.is_empty());
assert_eq!(endpoint.active_whip_count(), 1);
}
#[test]
fn test_endpoint_create_whep() {
let mut endpoint = WhipWhepEndpoint::new(EndpointConfig::default());
let (path, _answer) = endpoint
.create_whep_session(sample_offer(), Some("stream1"), None)
.expect("should work");
assert!(path.starts_with("/whep/resource/"));
assert_eq!(endpoint.active_whep_count(), 1);
}
#[test]
fn test_endpoint_auth_required() {
let mut cfg = EndpointConfig::default();
cfg.require_auth = true;
let mut endpoint = WhipWhepEndpoint::new(cfg);
let result = endpoint.create_whip_session(sample_offer(), None);
assert!(result.is_err());
}
#[test]
fn test_endpoint_auth_with_token() {
let mut cfg = EndpointConfig::default();
cfg.require_auth = true;
let mut endpoint = WhipWhepEndpoint::new(cfg);
let result = endpoint.create_whip_session(sample_offer(), Some("my-token"));
assert!(result.is_ok());
}
#[test]
fn test_endpoint_max_sessions() {
let mut cfg = EndpointConfig::default();
cfg.max_sessions = 1;
let mut endpoint = WhipWhepEndpoint::new(cfg);
endpoint
.create_whip_session(sample_offer(), None)
.expect("should work");
let result = endpoint.create_whip_session(sample_offer(), None);
assert!(result.is_err());
}
#[test]
fn test_endpoint_trickle_whip() {
let mut endpoint = WhipWhepEndpoint::new(EndpointConfig::default());
let (path, _) = endpoint
.create_whip_session(sample_offer(), None)
.expect("should work");
let session_id = path.trim_start_matches("/whip/resource/").to_owned();
endpoint
.trickle_whip(&session_id, sample_candidates())
.expect("should work");
}
#[test]
fn test_endpoint_delete_whip() {
let mut endpoint = WhipWhepEndpoint::new(EndpointConfig::default());
let (path, _) = endpoint
.create_whip_session(sample_offer(), None)
.expect("should work");
let session_id = path.trim_start_matches("/whip/resource/").to_owned();
endpoint
.delete_whip_session(&session_id)
.expect("should work");
assert_eq!(endpoint.active_whip_count(), 0);
}
#[test]
fn test_endpoint_cleanup() {
let mut endpoint = WhipWhepEndpoint::new(EndpointConfig::default());
endpoint
.create_whip_session(sample_offer(), None)
.expect("should work");
endpoint.cleanup();
assert_eq!(endpoint.active_whip_count(), 1);
}
#[test]
fn test_endpoint_ice_headers() {
let endpoint = WhipWhepEndpoint::new(EndpointConfig::default());
let headers = endpoint.ice_server_headers();
assert!(!headers.is_empty());
assert!(headers[0].contains("ice-server"));
}
#[test]
fn test_endpoint_session_lookup() {
let mut endpoint = WhipWhepEndpoint::new(EndpointConfig::default());
let (path, _) = endpoint
.create_whip_session(sample_offer(), None)
.expect("should work");
let session_id = path.trim_start_matches("/whip/resource/");
assert!(endpoint.get_whip_session(session_id).is_some());
assert!(endpoint.get_whep_session("nonexistent").is_none());
}
#[test]
fn test_whip_offer_after_terminate() {
let mut session = WhipSession::new("test");
session.terminate();
let result = session.process_offer(sample_offer());
assert!(result.is_err());
}
#[test]
fn test_whep_trickle_after_terminate() {
let mut session = WhepSession::new("test");
session.terminate();
let result = session.add_trickle_candidates(sample_candidates());
assert!(result.is_err());
}
#[test]
fn test_whip_session_etag() {
let session = WhipSession::new("test");
assert!(session.etag.starts_with("W/\""));
}
#[test]
fn test_whep_stream_key() {
let mut session = WhepSession::new("test");
session.stream_key = Some("live/stream1".to_owned());
assert_eq!(session.stream_key.as_deref(), Some("live/stream1"));
}
#[test]
fn test_endpoint_not_found() {
let mut endpoint = WhipWhepEndpoint::new(EndpointConfig::default());
assert!(endpoint.trickle_whip("nonexistent", "").is_err());
assert!(endpoint.delete_whep_session("nonexistent").is_err());
}
#[test]
fn test_simple_hash() {
let h1 = simple_hash("test");
let h2 = simple_hash("test");
assert_eq!(h1, h2);
assert_ne!(simple_hash("a"), simple_hash("b"));
}
}