use std::net::{SocketAddr, TcpListener};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use crate::error::{Result, RtspError};
use crate::media::Packetizer;
use crate::media::h264::H264Packetizer;
use crate::mount::{DEFAULT_MOUNT_PATH, MountRegistry};
use crate::session::SessionManager;
use crate::transport::UdpTransport;
use crate::transport::tcp;
#[derive(Debug, Clone)]
pub struct ServerConfig {
pub public_host: Option<String>,
pub public_port: Option<u16>,
pub sdp_username: String,
pub sdp_session_id: String,
pub sdp_session_version: String,
pub sdp_session_name: String,
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
public_host: None,
public_port: None,
sdp_username: "-".to_string(),
sdp_session_id: "0".to_string(),
sdp_session_version: "0".to_string(),
sdp_session_name: "Stream".to_string(),
}
}
}
pub struct Server {
session_manager: SessionManager,
mounts: MountRegistry,
running: Arc<AtomicBool>,
bind_addr: String,
udp: Option<UdpTransport>,
config: Arc<ServerConfig>,
}
impl Server {
pub fn new(bind_addr: &str) -> Self {
Self::with_config(bind_addr, ServerConfig::default())
}
pub fn new_with_mount_path(bind_addr: &str, mount_path: &str) -> Self {
let mounts = MountRegistry::new();
mounts.add(mount_path, Box::new(H264Packetizer::with_random_ssrc(96)));
mounts.set_default(mount_path);
Self {
session_manager: SessionManager::new(),
mounts,
running: Arc::new(AtomicBool::new(false)),
bind_addr: bind_addr.to_string(),
udp: None,
config: Arc::new(ServerConfig::default()),
}
}
pub fn with_config(bind_addr: &str, config: ServerConfig) -> Self {
let mounts = MountRegistry::new();
mounts.add(
DEFAULT_MOUNT_PATH,
Box::new(H264Packetizer::with_random_ssrc(96)),
);
mounts.set_default(DEFAULT_MOUNT_PATH);
Self {
session_manager: SessionManager::new(),
mounts,
running: Arc::new(AtomicBool::new(false)),
bind_addr: bind_addr.to_string(),
udp: None,
config: Arc::new(config),
}
}
pub fn with_packetizer(bind_addr: &str, packetizer: Box<dyn Packetizer>) -> Self {
Self::with_packetizer_and_config(bind_addr, packetizer, ServerConfig::default())
}
pub fn with_packetizer_and_config(
bind_addr: &str,
packetizer: Box<dyn Packetizer>,
config: ServerConfig,
) -> Self {
let mounts = MountRegistry::new();
mounts.add(DEFAULT_MOUNT_PATH, packetizer);
mounts.set_default(DEFAULT_MOUNT_PATH);
Self {
session_manager: SessionManager::new(),
mounts,
running: Arc::new(AtomicBool::new(false)),
bind_addr: bind_addr.to_string(),
udp: None,
config: Arc::new(config),
}
}
pub fn add_mount(&self, path: &str, packetizer: Box<dyn Packetizer>) {
self.mounts.add(path, packetizer);
}
pub fn start(&mut self) -> Result<()> {
if self.running.load(Ordering::SeqCst) {
return Err(RtspError::AlreadyRunning);
}
let addr: SocketAddr = self.bind_addr.parse().map_err(|_| {
RtspError::InvalidBindAddress(format!(
"expected host:port with explicit port, got {:?}",
self.bind_addr
))
})?;
if addr.port() == 0 {
return Err(RtspError::InvalidBindAddress(
"port must be explicit (non-zero)".to_string(),
));
}
self.udp = Some(UdpTransport::bind()?);
let listener = TcpListener::bind(&self.bind_addr)?;
listener.set_nonblocking(true)?;
self.running.store(true, Ordering::SeqCst);
let running = self.running.clone();
let session_manager = self.session_manager.clone();
let mounts = self.mounts.clone();
let config = self.config.clone();
tracing::info!(addr = %self.bind_addr, "RTSP server listening");
thread::spawn(move || {
tcp::accept_loop(listener, session_manager, mounts, config, running);
});
Ok(())
}
pub fn stop(&mut self) {
self.running.store(false, Ordering::SeqCst);
tracing::info!("server stopping");
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
pub fn send_frame(&self, data: &[u8], timestamp_increment: u32) -> Result<usize> {
self.send_frame_to(DEFAULT_MOUNT_PATH, data, timestamp_increment)
}
pub fn send_frame_to(
&self,
mount_path: &str,
data: &[u8],
timestamp_increment: u32,
) -> Result<usize> {
let udp = self.udp.as_ref().ok_or(RtspError::NotStarted)?;
let mount = self
.mounts
.get(mount_path)
.ok_or_else(|| RtspError::MountNotFound(mount_path.to_string()))?;
let packets = mount.packetize(data, timestamp_increment);
let session_ids = mount.subscribed_session_ids();
let mut sent = 0;
for session_id in &session_ids {
let session = match self.session_manager.get_session(session_id) {
Some(s) if s.is_playing() => s,
_ => continue,
};
let transport = match session.get_transport() {
Some(t) => t,
None => continue,
};
for packet in &packets {
match udp.send_to(packet, transport.client_addr) {
Ok(_) => {}
Err(e) => {
tracing::warn!(
session_id,
addr = %transport.client_addr,
error = %e,
"failed to send RTP packet"
);
}
}
}
sent += 1;
}
Ok(sent)
}
pub fn send_rtp_packet(&self, session_id: &str, payload: &[u8]) -> Result<usize> {
let udp = self.udp.as_ref().ok_or(RtspError::NotStarted)?;
let session = self
.session_manager
.get_session(session_id)
.ok_or_else(|| RtspError::SessionNotFound(session_id.to_string()))?;
if !session.is_playing() {
return Err(RtspError::SessionNotPlaying(session_id.to_string()));
}
let transport = session
.get_transport()
.ok_or_else(|| RtspError::TransportNotConfigured(session_id.to_string()))?;
udp.send_to(payload, transport.client_addr)
}
pub fn broadcast_rtp_packet(&self, payload: &[u8]) -> Result<usize> {
let udp = self.udp.as_ref().ok_or(RtspError::NotStarted)?;
let mount = self
.mounts
.get(DEFAULT_MOUNT_PATH)
.ok_or_else(|| RtspError::MountNotFound(DEFAULT_MOUNT_PATH.to_string()))?;
let session_ids = mount.subscribed_session_ids();
let mut sent = 0;
for session_id in &session_ids {
let session = match self.session_manager.get_session(session_id) {
Some(s) if s.is_playing() => s,
_ => continue,
};
if let Some(transport) = session.get_transport() {
match udp.send_to(payload, transport.client_addr) {
Ok(_) => sent += 1,
Err(e) => {
tracing::warn!(
session_id,
addr = %transport.client_addr,
error = %e,
"failed to send RTP packet"
);
}
}
}
}
Ok(sent)
}
pub fn get_viewers(&self) -> Vec<Viewer> {
self.session_manager
.get_playing_sessions()
.iter()
.filter_map(|session| {
session.get_transport().map(|transport| Viewer {
session_id: session.id.clone(),
uri: session.uri.clone(),
client_addr: transport.client_addr.to_string(),
client_rtp_port: transport.client_rtp_port,
})
})
.collect()
}
pub fn session_manager(&self) -> &SessionManager {
&self.session_manager
}
pub fn mounts(&self) -> &MountRegistry {
&self.mounts
}
pub fn config(&self) -> Arc<ServerConfig> {
self.config.clone()
}
}
#[derive(Debug, Clone)]
pub struct Viewer {
pub session_id: String,
pub uri: String,
pub client_addr: String,
pub client_rtp_port: u16,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn start_rejects_port_zero() {
let mut server = Server::new("127.0.0.1:0");
let err = server.start().unwrap_err();
match &err {
RtspError::InvalidBindAddress(msg) => assert!(msg.contains("non-zero"), "{}", msg),
_ => panic!("expected InvalidBindAddress, got {:?}", err),
}
}
#[test]
fn start_rejects_missing_port() {
let mut server = Server::new("127.0.0.1");
let err = server.start().unwrap_err();
match &err {
RtspError::InvalidBindAddress(_) => {}
_ => panic!("expected InvalidBindAddress, got {:?}", err),
}
}
#[test]
fn start_accepts_explicit_port() {
let mut server = Server::new("127.0.0.1:18555");
server.start().expect("explicit port should be accepted");
assert!(server.is_running());
server.stop();
}
}