Skip to main content

rtsp/
server.rs

1use std::net::{SocketAddr, TcpListener};
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::thread;
5
6use crate::error::{Result, RtspError};
7use crate::media::Packetizer;
8use crate::media::h264::H264Packetizer;
9use crate::mount::{DEFAULT_MOUNT_PATH, MountRegistry};
10use crate::session::SessionManager;
11use crate::transport::UdpTransport;
12use crate::transport::tcp;
13
14/// Server-level configuration used by protocol handlers.
15#[derive(Debug, Clone)]
16pub struct ServerConfig {
17    /// Public host advertised in SDP `o=` and `c=` lines.
18    /// When `None`, host is inferred from request URI/client address.
19    pub public_host: Option<String>,
20    /// Public RTSP port for future URL-based headers (e.g. RTP-Info).
21    pub public_port: Option<u16>,
22    /// SDP origin username field (`o=<username> ...`).
23    pub sdp_username: String,
24    /// SDP origin session id field (`o=... <session-id> ...`).
25    pub sdp_session_id: String,
26    /// SDP origin session version field (`o=... ... <session-version> ...`).
27    pub sdp_session_version: String,
28    /// SDP session name (`s=`).
29    pub sdp_session_name: String,
30}
31
32impl Default for ServerConfig {
33    fn default() -> Self {
34        Self {
35            public_host: None,
36            public_port: None,
37            sdp_username: "-".to_string(),
38            sdp_session_id: "0".to_string(),
39            sdp_session_version: "0".to_string(),
40            sdp_session_name: "Stream".to_string(),
41        }
42    }
43}
44
45/// High-level RTSP server orchestrator.
46///
47/// Owns the mount registry, session manager, and transport layer.
48/// Delegates TCP connection handling to [`crate::transport::tcp`] and
49/// RTP delivery to [`UdpTransport`].
50///
51/// # Simple usage (single stream)
52///
53/// ```no_run
54/// use rtsp::Server;
55/// let mut server = Server::new("0.0.0.0:8554");
56/// server.start().unwrap();
57/// // server.send_frame(&h264_data, 3000).unwrap();
58/// ```
59///
60/// # Multi-mount usage
61///
62/// ```no_run
63/// use rtsp::Server;
64/// use rtsp::media::h264::H264Packetizer;
65/// let mut server = Server::new("0.0.0.0:8554");
66/// server.add_mount("/cam1", Box::new(H264Packetizer::with_random_ssrc(96)));
67/// server.start().unwrap();
68/// // server.send_frame_to("/cam1", &data, 3000).unwrap();
69/// ```
70pub struct Server {
71    session_manager: SessionManager,
72    mounts: MountRegistry,
73    running: Arc<AtomicBool>,
74    bind_addr: String,
75    udp: Option<UdpTransport>,
76    config: Arc<ServerConfig>,
77}
78
79impl Server {
80    /// Create a server with a default H.264 mount at `/stream`.
81    ///
82    /// `bind_addr` must be `host:port` with an explicit non-zero port (e.g. `127.0.0.1:8554`).
83    /// Port 0 is not allowed; validation happens in [`start`](Self::start).
84    pub fn new(bind_addr: &str) -> Self {
85        Self::with_config(bind_addr, ServerConfig::default())
86    }
87
88    /// Create a server with a single H.264 mount at the given path.
89    ///
90    /// Use this when the stream path is configurable (e.g. GStreamer `mount-path` property).
91    /// `bind_addr` must be `host:port` with an explicit non-zero port.
92    pub fn new_with_mount_path(bind_addr: &str, mount_path: &str) -> Self {
93        let mounts = MountRegistry::new();
94        mounts.add(mount_path, Box::new(H264Packetizer::with_random_ssrc(96)));
95        mounts.set_default(mount_path);
96
97        Self {
98            session_manager: SessionManager::new(),
99            mounts,
100            running: Arc::new(AtomicBool::new(false)),
101            bind_addr: bind_addr.to_string(),
102            udp: None,
103            config: Arc::new(ServerConfig::default()),
104        }
105    }
106
107    /// Create a server with custom protocol/SDP configuration.
108    /// A default H.264 mount at `/stream` is created automatically.
109    pub fn with_config(bind_addr: &str, config: ServerConfig) -> Self {
110        let mounts = MountRegistry::new();
111        mounts.add(
112            DEFAULT_MOUNT_PATH,
113            Box::new(H264Packetizer::with_random_ssrc(96)),
114        );
115        mounts.set_default(DEFAULT_MOUNT_PATH);
116
117        Self {
118            session_manager: SessionManager::new(),
119            mounts,
120            running: Arc::new(AtomicBool::new(false)),
121            bind_addr: bind_addr.to_string(),
122            udp: None,
123            config: Arc::new(config),
124        }
125    }
126
127    /// Create a server with a custom packetizer on the default mount.
128    pub fn with_packetizer(bind_addr: &str, packetizer: Box<dyn Packetizer>) -> Self {
129        Self::with_packetizer_and_config(bind_addr, packetizer, ServerConfig::default())
130    }
131
132    /// Create a server with a custom packetizer and protocol/SDP configuration.
133    pub fn with_packetizer_and_config(
134        bind_addr: &str,
135        packetizer: Box<dyn Packetizer>,
136        config: ServerConfig,
137    ) -> Self {
138        let mounts = MountRegistry::new();
139        mounts.add(DEFAULT_MOUNT_PATH, packetizer);
140        mounts.set_default(DEFAULT_MOUNT_PATH);
141
142        Self {
143            session_manager: SessionManager::new(),
144            mounts,
145            running: Arc::new(AtomicBool::new(false)),
146            bind_addr: bind_addr.to_string(),
147            udp: None,
148            config: Arc::new(config),
149        }
150    }
151
152    /// Register a named mount with its own packetizer.
153    ///
154    /// Must be called before [`start`](Self::start).
155    pub fn add_mount(&self, path: &str, packetizer: Box<dyn Packetizer>) {
156        self.mounts.add(path, packetizer);
157    }
158
159    pub fn start(&mut self) -> Result<()> {
160        if self.running.load(Ordering::SeqCst) {
161            return Err(RtspError::AlreadyRunning);
162        }
163
164        let addr: SocketAddr = self.bind_addr.parse().map_err(|_| {
165            RtspError::InvalidBindAddress(format!(
166                "expected host:port with explicit port, got {:?}",
167                self.bind_addr
168            ))
169        })?;
170        if addr.port() == 0 {
171            return Err(RtspError::InvalidBindAddress(
172                "port must be explicit (non-zero)".to_string(),
173            ));
174        }
175
176        self.udp = Some(UdpTransport::bind()?);
177
178        let listener = TcpListener::bind(&self.bind_addr)?;
179        listener.set_nonblocking(true)?;
180
181        self.running.store(true, Ordering::SeqCst);
182
183        let running = self.running.clone();
184        let session_manager = self.session_manager.clone();
185        let mounts = self.mounts.clone();
186        let config = self.config.clone();
187
188        tracing::info!(addr = %self.bind_addr, "RTSP server listening");
189
190        thread::spawn(move || {
191            tcp::accept_loop(listener, session_manager, mounts, config, running);
192        });
193
194        Ok(())
195    }
196
197    pub fn stop(&mut self) {
198        self.running.store(false, Ordering::SeqCst);
199        tracing::info!("server stopping");
200    }
201
202    pub fn is_running(&self) -> bool {
203        self.running.load(Ordering::SeqCst)
204    }
205
206    /// Send a raw encoded frame to the default mount (`/stream`).
207    ///
208    /// Packetizes the data into RTP packets and delivers them to all
209    /// subscribed playing sessions via UDP.
210    pub fn send_frame(&self, data: &[u8], timestamp_increment: u32) -> Result<usize> {
211        self.send_frame_to(DEFAULT_MOUNT_PATH, data, timestamp_increment)
212    }
213
214    /// Send a raw encoded frame to a specific mount.
215    ///
216    /// Packetizes the data using the mount's codec and delivers the
217    /// resulting RTP packets to all subscribed playing sessions.
218    pub fn send_frame_to(
219        &self,
220        mount_path: &str,
221        data: &[u8],
222        timestamp_increment: u32,
223    ) -> Result<usize> {
224        let udp = self.udp.as_ref().ok_or(RtspError::NotStarted)?;
225        let mount = self
226            .mounts
227            .get(mount_path)
228            .ok_or_else(|| RtspError::MountNotFound(mount_path.to_string()))?;
229
230        let packets = mount.packetize(data, timestamp_increment);
231        let session_ids = mount.subscribed_session_ids();
232
233        let mut sent = 0;
234        for session_id in &session_ids {
235            let session = match self.session_manager.get_session(session_id) {
236                Some(s) if s.is_playing() => s,
237                _ => continue,
238            };
239            let transport = match session.get_transport() {
240                Some(t) => t,
241                None => continue,
242            };
243            for packet in &packets {
244                match udp.send_to(packet, transport.client_addr) {
245                    Ok(_) => {}
246                    Err(e) => {
247                        tracing::warn!(
248                            session_id,
249                            addr = %transport.client_addr,
250                            error = %e,
251                            "failed to send RTP packet"
252                        );
253                    }
254                }
255            }
256            sent += 1;
257        }
258
259        Ok(sent)
260    }
261
262    /// Send a pre-packetized RTP packet to a specific session.
263    pub fn send_rtp_packet(&self, session_id: &str, payload: &[u8]) -> Result<usize> {
264        let udp = self.udp.as_ref().ok_or(RtspError::NotStarted)?;
265        let session = self
266            .session_manager
267            .get_session(session_id)
268            .ok_or_else(|| RtspError::SessionNotFound(session_id.to_string()))?;
269        if !session.is_playing() {
270            return Err(RtspError::SessionNotPlaying(session_id.to_string()));
271        }
272        let transport = session
273            .get_transport()
274            .ok_or_else(|| RtspError::TransportNotConfigured(session_id.to_string()))?;
275        udp.send_to(payload, transport.client_addr)
276    }
277
278    /// Broadcast a pre-packetized RTP packet to all playing sessions
279    /// on the default mount.
280    pub fn broadcast_rtp_packet(&self, payload: &[u8]) -> Result<usize> {
281        let udp = self.udp.as_ref().ok_or(RtspError::NotStarted)?;
282        let mount = self
283            .mounts
284            .get(DEFAULT_MOUNT_PATH)
285            .ok_or_else(|| RtspError::MountNotFound(DEFAULT_MOUNT_PATH.to_string()))?;
286
287        let session_ids = mount.subscribed_session_ids();
288        let mut sent = 0;
289        for session_id in &session_ids {
290            let session = match self.session_manager.get_session(session_id) {
291                Some(s) if s.is_playing() => s,
292                _ => continue,
293            };
294            if let Some(transport) = session.get_transport() {
295                match udp.send_to(payload, transport.client_addr) {
296                    Ok(_) => sent += 1,
297                    Err(e) => {
298                        tracing::warn!(
299                            session_id,
300                            addr = %transport.client_addr,
301                            error = %e,
302                            "failed to send RTP packet"
303                        );
304                    }
305                }
306            }
307        }
308        Ok(sent)
309    }
310
311    pub fn get_viewers(&self) -> Vec<Viewer> {
312        self.session_manager
313            .get_playing_sessions()
314            .iter()
315            .filter_map(|session| {
316                session.get_transport().map(|transport| Viewer {
317                    session_id: session.id.clone(),
318                    uri: session.uri.clone(),
319                    client_addr: transport.client_addr.to_string(),
320                    client_rtp_port: transport.client_rtp_port,
321                })
322            })
323            .collect()
324    }
325
326    pub fn session_manager(&self) -> &SessionManager {
327        &self.session_manager
328    }
329
330    /// Returns the mount registry (used by adapters that need mount access).
331    pub fn mounts(&self) -> &MountRegistry {
332        &self.mounts
333    }
334
335    /// Returns the server's protocol configuration.
336    pub fn config(&self) -> Arc<ServerConfig> {
337        self.config.clone()
338    }
339}
340
341/// Information about a connected viewer (client in PLAY state).
342#[derive(Debug, Clone)]
343pub struct Viewer {
344    pub session_id: String,
345    pub uri: String,
346    pub client_addr: String,
347    pub client_rtp_port: u16,
348}
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353
354    #[test]
355    fn start_rejects_port_zero() {
356        let mut server = Server::new("127.0.0.1:0");
357        let err = server.start().unwrap_err();
358        match &err {
359            RtspError::InvalidBindAddress(msg) => assert!(msg.contains("non-zero"), "{}", msg),
360            _ => panic!("expected InvalidBindAddress, got {:?}", err),
361        }
362    }
363
364    #[test]
365    fn start_rejects_missing_port() {
366        let mut server = Server::new("127.0.0.1");
367        let err = server.start().unwrap_err();
368        match &err {
369            RtspError::InvalidBindAddress(_) => {}
370            _ => panic!("expected InvalidBindAddress, got {:?}", err),
371        }
372    }
373
374    #[test]
375    fn start_accepts_explicit_port() {
376        let mut server = Server::new("127.0.0.1:18555");
377        server.start().expect("explicit port should be accepted");
378        assert!(server.is_running());
379        server.stop();
380    }
381}