Skip to main content

rtsp/session/
mod.rs

1//! RTSP session management (RFC 2326 §3, §12.37).
2//!
3//! An RTSP session is a server-side state object created during SETUP and
4//! destroyed by TEARDOWN (or timeout). It tracks:
5//!
6//! - A unique session ID (hex string, returned in the `Session` header).
7//! - The playback state: Ready -> Playing <-> Paused.
8//! - Transport parameters (client/server UDP ports) negotiated during SETUP.
9//! - A timeout (default 60s, per RFC 2326 §12.37) — the client must send
10//!   a request (e.g. GET_PARAMETER) before the timeout expires.
11//!
12//! ## Session lifecycle (RFC 2326 §A.1)
13//!
14//! ```text
15//! SETUP         -> Ready
16//! PLAY          -> Playing
17//! PAUSE         -> Paused   (from Playing)
18//! PLAY          -> Playing  (from Paused)
19//! TEARDOWN      -> (removed)
20//! TCP disconnect -> (removed, via cleanup)
21//! ```
22
23pub mod transport;
24
25use parking_lot::RwLock;
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29
30use crate::error::Result;
31pub use transport::Transport;
32
33static SESSION_COUNTER: AtomicU64 = AtomicU64::new(0);
34
35const SERVER_PORT_MIN: u64 = 5000;
36const SERVER_PORT_MAX: u64 = 65534;
37
38/// Default session timeout in seconds (RFC 2326 §12.37).
39pub const DEFAULT_SESSION_TIMEOUT_SECS: u64 = 60;
40
41/// RTSP session state machine (RFC 2326 §A.1).
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub enum SessionState {
44    /// Session created via SETUP, not yet playing.
45    Ready,
46    /// Media is being delivered (RTP packets sent to client).
47    Playing,
48    /// Delivery suspended; can resume via PLAY.
49    Paused,
50}
51
52/// A single RTSP session (RFC 2326 §3).
53///
54/// Created during SETUP, destroyed by TEARDOWN or TCP disconnect.
55/// Interior mutability via `RwLock` allows shared references across threads.
56#[derive(Debug)]
57pub struct Session {
58    /// Unique session identifier (16-char hex string).
59    pub id: String,
60    /// The RTSP URI this session was created for (from the SETUP request).
61    pub uri: String,
62    /// Transport parameters negotiated during SETUP (RFC 2326 §12.39).
63    pub transport: RwLock<Option<Transport>>,
64    /// Current playback state.
65    pub state: RwLock<SessionState>,
66    /// Session timeout in seconds (included in the `Session` response header).
67    pub timeout_secs: u64,
68}
69
70impl Session {
71    /// Create a new session with a unique auto-incrementing ID.
72    pub fn new(uri: &str) -> Self {
73        let id = SESSION_COUNTER.fetch_add(1, Ordering::SeqCst);
74        Session {
75            id: format!("{:016X}", id),
76            uri: uri.to_string(),
77            transport: RwLock::new(None),
78            state: RwLock::new(SessionState::Ready),
79            timeout_secs: DEFAULT_SESSION_TIMEOUT_SECS,
80        }
81    }
82
83    /// Set the transport parameters (called during SETUP).
84    pub fn set_transport(&self, transport: Transport) {
85        tracing::debug!(session_id = %self.id, client_addr = %transport.client_addr, "transport configured");
86        *self.transport.write() = Some(transport);
87    }
88
89    /// Returns a clone of the transport parameters, if configured.
90    pub fn get_transport(&self) -> Option<Transport> {
91        self.transport.read().clone()
92    }
93
94    /// Transition to a new playback state.
95    pub fn set_state(&self, state: SessionState) {
96        tracing::debug!(session_id = %self.id, old_state = ?*self.state.read(), new_state = ?state, "state transition");
97        *self.state.write() = state;
98    }
99
100    /// Returns the current playback state.
101    pub fn get_state(&self) -> SessionState {
102        self.state.read().clone()
103    }
104
105    /// Whether this session is actively receiving media.
106    pub fn is_playing(&self) -> bool {
107        *self.state.read() == SessionState::Playing
108    }
109
110    /// Format the `Session` response header value per RFC 2326 §12.37.
111    ///
112    /// Example: `"0000000000000001;timeout=60"`
113    pub fn session_header_value(&self) -> String {
114        format!("{};timeout={}", self.id, self.timeout_secs)
115    }
116}
117
118/// Thread-safe registry of active sessions.
119///
120/// Backed by `parking_lot::RwLock` for fast concurrent reads. Session
121/// lookups happen on every RTP delivery cycle, so read performance matters.
122#[derive(Clone)]
123pub struct SessionManager {
124    sessions: Arc<RwLock<HashMap<String, Arc<Session>>>>,
125    next_server_port: Arc<AtomicU64>,
126}
127
128impl SessionManager {
129    pub fn new() -> Self {
130        SessionManager {
131            sessions: Arc::new(RwLock::new(HashMap::new())),
132            next_server_port: Arc::new(AtomicU64::new(SERVER_PORT_MIN)),
133        }
134    }
135
136    /// Create a new session for the given URI and register it.
137    pub fn create_session(&self, uri: &str) -> Arc<Session> {
138        let session = Arc::new(Session::new(uri));
139        let id = session.id.clone();
140        self.sessions.write().insert(id.clone(), session.clone());
141
142        let total = self.sessions.read().len();
143        tracing::debug!(session_id = %id, uri, total_sessions = total, "session created");
144
145        session
146    }
147
148    /// Look up a session by ID.
149    pub fn get_session(&self, id: &str) -> Option<Arc<Session>> {
150        self.sessions.read().get(id).cloned()
151    }
152
153    /// Remove and return a session by ID (used by TEARDOWN).
154    pub fn remove_session(&self, id: &str) -> Option<Arc<Session>> {
155        let removed = self.sessions.write().remove(id);
156        if removed.is_some() {
157            let total = self.sessions.read().len();
158            tracing::debug!(session_id = %id, total_sessions = total, "session removed");
159        }
160        removed
161    }
162
163    /// Remove multiple sessions at once (used during TCP disconnect cleanup).
164    pub fn remove_sessions(&self, ids: &[String]) -> usize {
165        let mut sessions = self.sessions.write();
166        let mut removed = 0;
167        for id in ids {
168            if sessions.remove(id).is_some() {
169                removed += 1;
170            }
171        }
172        if removed > 0 {
173            tracing::debug!(removed, remaining = sessions.len(), "batch session cleanup");
174        }
175        removed
176    }
177
178    /// Allocate a pair of (RTP, RTCP) server ports.
179    ///
180    /// Ports are allocated from a monotonic counter starting at 5000.
181    /// When the range is exhausted (> 65534), it wraps back to 5000.
182    /// Per RFC 3550 §11, RTP ports should be even and RTCP = RTP + 1.
183    pub fn allocate_server_ports(&self) -> Result<(u16, u16)> {
184        let rtp = self.next_server_port.fetch_add(2, Ordering::SeqCst);
185
186        if rtp > SERVER_PORT_MAX {
187            tracing::warn!(rtp, "port range exhausted, wrapping to {SERVER_PORT_MIN}");
188            self.next_server_port
189                .store(SERVER_PORT_MIN, Ordering::SeqCst);
190            let rtp = self.next_server_port.fetch_add(2, Ordering::SeqCst);
191            return Ok((rtp as u16, rtp as u16 + 1));
192        }
193
194        tracing::trace!(
195            rtp_port = rtp,
196            rtcp_port = rtp + 1,
197            "allocated server ports"
198        );
199        Ok((rtp as u16, rtp as u16 + 1))
200    }
201
202    /// Returns all sessions currently in the [`SessionState::Playing`] state.
203    pub fn get_playing_sessions(&self) -> Vec<Arc<Session>> {
204        self.sessions
205            .read()
206            .values()
207            .filter(|s| s.is_playing())
208            .cloned()
209            .collect()
210    }
211}
212
213impl Default for SessionManager {
214    fn default() -> Self {
215        Self::new()
216    }
217}