1use std::net::{SocketAddr, TcpListener};
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::thread;
5
6use crate::error::{Result, RtspError};
7use crate::media::Packetizer;
8use crate::media::h264::H264Packetizer;
9use crate::mount::{DEFAULT_MOUNT_PATH, MountRegistry};
10use crate::session::SessionManager;
11use crate::transport::UdpTransport;
12use crate::transport::tcp;
13
14#[derive(Debug, Clone)]
16pub struct ServerConfig {
17 pub public_host: Option<String>,
20 pub public_port: Option<u16>,
22 pub sdp_username: String,
24 pub sdp_session_id: String,
26 pub sdp_session_version: String,
28 pub sdp_session_name: String,
30}
31
32impl Default for ServerConfig {
33 fn default() -> Self {
34 Self {
35 public_host: None,
36 public_port: None,
37 sdp_username: "-".to_string(),
38 sdp_session_id: "0".to_string(),
39 sdp_session_version: "0".to_string(),
40 sdp_session_name: "Stream".to_string(),
41 }
42 }
43}
44
45pub struct Server {
71 session_manager: SessionManager,
72 mounts: MountRegistry,
73 running: Arc<AtomicBool>,
74 bind_addr: String,
75 udp: Option<UdpTransport>,
76 config: Arc<ServerConfig>,
77}
78
79impl Server {
80 pub fn new(bind_addr: &str) -> Self {
85 Self::with_config(bind_addr, ServerConfig::default())
86 }
87
88 pub fn new_with_mount_path(bind_addr: &str, mount_path: &str) -> Self {
93 let mounts = MountRegistry::new();
94 mounts.add(mount_path, Box::new(H264Packetizer::with_random_ssrc(96)));
95 mounts.set_default(mount_path);
96
97 Self {
98 session_manager: SessionManager::new(),
99 mounts,
100 running: Arc::new(AtomicBool::new(false)),
101 bind_addr: bind_addr.to_string(),
102 udp: None,
103 config: Arc::new(ServerConfig::default()),
104 }
105 }
106
107 pub fn with_config(bind_addr: &str, config: ServerConfig) -> Self {
110 let mounts = MountRegistry::new();
111 mounts.add(
112 DEFAULT_MOUNT_PATH,
113 Box::new(H264Packetizer::with_random_ssrc(96)),
114 );
115 mounts.set_default(DEFAULT_MOUNT_PATH);
116
117 Self {
118 session_manager: SessionManager::new(),
119 mounts,
120 running: Arc::new(AtomicBool::new(false)),
121 bind_addr: bind_addr.to_string(),
122 udp: None,
123 config: Arc::new(config),
124 }
125 }
126
127 pub fn with_packetizer(bind_addr: &str, packetizer: Box<dyn Packetizer>) -> Self {
129 Self::with_packetizer_and_config(bind_addr, packetizer, ServerConfig::default())
130 }
131
132 pub fn with_packetizer_and_config(
134 bind_addr: &str,
135 packetizer: Box<dyn Packetizer>,
136 config: ServerConfig,
137 ) -> Self {
138 let mounts = MountRegistry::new();
139 mounts.add(DEFAULT_MOUNT_PATH, packetizer);
140 mounts.set_default(DEFAULT_MOUNT_PATH);
141
142 Self {
143 session_manager: SessionManager::new(),
144 mounts,
145 running: Arc::new(AtomicBool::new(false)),
146 bind_addr: bind_addr.to_string(),
147 udp: None,
148 config: Arc::new(config),
149 }
150 }
151
152 pub fn add_mount(&self, path: &str, packetizer: Box<dyn Packetizer>) {
156 self.mounts.add(path, packetizer);
157 }
158
159 pub fn start(&mut self) -> Result<()> {
160 if self.running.load(Ordering::SeqCst) {
161 return Err(RtspError::AlreadyRunning);
162 }
163
164 let addr: SocketAddr = self.bind_addr.parse().map_err(|_| {
165 RtspError::InvalidBindAddress(format!(
166 "expected host:port with explicit port, got {:?}",
167 self.bind_addr
168 ))
169 })?;
170 if addr.port() == 0 {
171 return Err(RtspError::InvalidBindAddress(
172 "port must be explicit (non-zero)".to_string(),
173 ));
174 }
175
176 self.udp = Some(UdpTransport::bind()?);
177
178 let listener = TcpListener::bind(&self.bind_addr)?;
179 listener.set_nonblocking(true)?;
180
181 self.running.store(true, Ordering::SeqCst);
182
183 let running = self.running.clone();
184 let session_manager = self.session_manager.clone();
185 let mounts = self.mounts.clone();
186 let config = self.config.clone();
187
188 tracing::info!(addr = %self.bind_addr, "RTSP server listening");
189
190 thread::spawn(move || {
191 tcp::accept_loop(listener, session_manager, mounts, config, running);
192 });
193
194 Ok(())
195 }
196
197 pub fn stop(&mut self) {
198 self.running.store(false, Ordering::SeqCst);
199 tracing::info!("server stopping");
200 }
201
202 pub fn is_running(&self) -> bool {
203 self.running.load(Ordering::SeqCst)
204 }
205
206 pub fn send_frame(&self, data: &[u8], timestamp_increment: u32) -> Result<usize> {
211 self.send_frame_to(DEFAULT_MOUNT_PATH, data, timestamp_increment)
212 }
213
214 pub fn send_frame_to(
219 &self,
220 mount_path: &str,
221 data: &[u8],
222 timestamp_increment: u32,
223 ) -> Result<usize> {
224 let udp = self.udp.as_ref().ok_or(RtspError::NotStarted)?;
225 let mount = self
226 .mounts
227 .get(mount_path)
228 .ok_or_else(|| RtspError::MountNotFound(mount_path.to_string()))?;
229
230 let packets = mount.packetize(data, timestamp_increment);
231 let session_ids = mount.subscribed_session_ids();
232
233 let mut sent = 0;
234 for session_id in &session_ids {
235 let session = match self.session_manager.get_session(session_id) {
236 Some(s) if s.is_playing() => s,
237 _ => continue,
238 };
239 let transport = match session.get_transport() {
240 Some(t) => t,
241 None => continue,
242 };
243 for packet in &packets {
244 match udp.send_to(packet, transport.client_addr) {
245 Ok(_) => {}
246 Err(e) => {
247 tracing::warn!(
248 session_id,
249 addr = %transport.client_addr,
250 error = %e,
251 "failed to send RTP packet"
252 );
253 }
254 }
255 }
256 sent += 1;
257 }
258
259 Ok(sent)
260 }
261
262 pub fn send_rtp_packet(&self, session_id: &str, payload: &[u8]) -> Result<usize> {
264 let udp = self.udp.as_ref().ok_or(RtspError::NotStarted)?;
265 let session = self
266 .session_manager
267 .get_session(session_id)
268 .ok_or_else(|| RtspError::SessionNotFound(session_id.to_string()))?;
269 if !session.is_playing() {
270 return Err(RtspError::SessionNotPlaying(session_id.to_string()));
271 }
272 let transport = session
273 .get_transport()
274 .ok_or_else(|| RtspError::TransportNotConfigured(session_id.to_string()))?;
275 udp.send_to(payload, transport.client_addr)
276 }
277
278 pub fn broadcast_rtp_packet(&self, payload: &[u8]) -> Result<usize> {
281 let udp = self.udp.as_ref().ok_or(RtspError::NotStarted)?;
282 let mount = self
283 .mounts
284 .get(DEFAULT_MOUNT_PATH)
285 .ok_or_else(|| RtspError::MountNotFound(DEFAULT_MOUNT_PATH.to_string()))?;
286
287 let session_ids = mount.subscribed_session_ids();
288 let mut sent = 0;
289 for session_id in &session_ids {
290 let session = match self.session_manager.get_session(session_id) {
291 Some(s) if s.is_playing() => s,
292 _ => continue,
293 };
294 if let Some(transport) = session.get_transport() {
295 match udp.send_to(payload, transport.client_addr) {
296 Ok(_) => sent += 1,
297 Err(e) => {
298 tracing::warn!(
299 session_id,
300 addr = %transport.client_addr,
301 error = %e,
302 "failed to send RTP packet"
303 );
304 }
305 }
306 }
307 }
308 Ok(sent)
309 }
310
311 pub fn get_viewers(&self) -> Vec<Viewer> {
312 self.session_manager
313 .get_playing_sessions()
314 .iter()
315 .filter_map(|session| {
316 session.get_transport().map(|transport| Viewer {
317 session_id: session.id.clone(),
318 uri: session.uri.clone(),
319 client_addr: transport.client_addr.to_string(),
320 client_rtp_port: transport.client_rtp_port,
321 })
322 })
323 .collect()
324 }
325
326 pub fn session_manager(&self) -> &SessionManager {
327 &self.session_manager
328 }
329
330 pub fn mounts(&self) -> &MountRegistry {
332 &self.mounts
333 }
334
335 pub fn config(&self) -> Arc<ServerConfig> {
337 self.config.clone()
338 }
339}
340
341#[derive(Debug, Clone)]
343pub struct Viewer {
344 pub session_id: String,
345 pub uri: String,
346 pub client_addr: String,
347 pub client_rtp_port: u16,
348}
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353
354 #[test]
355 fn start_rejects_port_zero() {
356 let mut server = Server::new("127.0.0.1:0");
357 let err = server.start().unwrap_err();
358 match &err {
359 RtspError::InvalidBindAddress(msg) => assert!(msg.contains("non-zero"), "{}", msg),
360 _ => panic!("expected InvalidBindAddress, got {:?}", err),
361 }
362 }
363
364 #[test]
365 fn start_rejects_missing_port() {
366 let mut server = Server::new("127.0.0.1");
367 let err = server.start().unwrap_err();
368 match &err {
369 RtspError::InvalidBindAddress(_) => {}
370 _ => panic!("expected InvalidBindAddress, got {:?}", err),
371 }
372 }
373
374 #[test]
375 fn start_accepts_explicit_port() {
376 let mut server = Server::new("127.0.0.1:18555");
377 server.start().expect("explicit port should be accepted");
378 assert!(server.is_running());
379 server.stop();
380 }
381}