1pub mod transport;
24
25use parking_lot::RwLock;
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29
30use crate::error::Result;
31pub use transport::Transport;
32
33static SESSION_COUNTER: AtomicU64 = AtomicU64::new(0);
34
35const SERVER_PORT_MIN: u64 = 5000;
36const SERVER_PORT_MAX: u64 = 65534;
37
38pub const DEFAULT_SESSION_TIMEOUT_SECS: u64 = 60;
40
41#[derive(Debug, Clone, PartialEq, Eq)]
43pub enum SessionState {
44 Ready,
46 Playing,
48 Paused,
50}
51
52#[derive(Debug)]
57pub struct Session {
58 pub id: String,
60 pub uri: String,
62 pub transport: RwLock<Option<Transport>>,
64 pub state: RwLock<SessionState>,
66 pub timeout_secs: u64,
68}
69
70impl Session {
71 pub fn new(uri: &str) -> Self {
73 let id = SESSION_COUNTER.fetch_add(1, Ordering::SeqCst);
74 Session {
75 id: format!("{:016X}", id),
76 uri: uri.to_string(),
77 transport: RwLock::new(None),
78 state: RwLock::new(SessionState::Ready),
79 timeout_secs: DEFAULT_SESSION_TIMEOUT_SECS,
80 }
81 }
82
83 pub fn set_transport(&self, transport: Transport) {
85 tracing::debug!(session_id = %self.id, client_addr = %transport.client_addr, "transport configured");
86 *self.transport.write() = Some(transport);
87 }
88
89 pub fn get_transport(&self) -> Option<Transport> {
91 self.transport.read().clone()
92 }
93
94 pub fn set_state(&self, state: SessionState) {
96 tracing::debug!(session_id = %self.id, old_state = ?*self.state.read(), new_state = ?state, "state transition");
97 *self.state.write() = state;
98 }
99
100 pub fn get_state(&self) -> SessionState {
102 self.state.read().clone()
103 }
104
105 pub fn is_playing(&self) -> bool {
107 *self.state.read() == SessionState::Playing
108 }
109
110 pub fn session_header_value(&self) -> String {
114 format!("{};timeout={}", self.id, self.timeout_secs)
115 }
116}
117
118#[derive(Clone)]
123pub struct SessionManager {
124 sessions: Arc<RwLock<HashMap<String, Arc<Session>>>>,
125 next_server_port: Arc<AtomicU64>,
126}
127
128impl SessionManager {
129 pub fn new() -> Self {
130 SessionManager {
131 sessions: Arc::new(RwLock::new(HashMap::new())),
132 next_server_port: Arc::new(AtomicU64::new(SERVER_PORT_MIN)),
133 }
134 }
135
136 pub fn create_session(&self, uri: &str) -> Arc<Session> {
138 let session = Arc::new(Session::new(uri));
139 let id = session.id.clone();
140 self.sessions.write().insert(id.clone(), session.clone());
141
142 let total = self.sessions.read().len();
143 tracing::debug!(session_id = %id, uri, total_sessions = total, "session created");
144
145 session
146 }
147
148 pub fn get_session(&self, id: &str) -> Option<Arc<Session>> {
150 self.sessions.read().get(id).cloned()
151 }
152
153 pub fn remove_session(&self, id: &str) -> Option<Arc<Session>> {
155 let removed = self.sessions.write().remove(id);
156 if removed.is_some() {
157 let total = self.sessions.read().len();
158 tracing::debug!(session_id = %id, total_sessions = total, "session removed");
159 }
160 removed
161 }
162
163 pub fn remove_sessions(&self, ids: &[String]) -> usize {
165 let mut sessions = self.sessions.write();
166 let mut removed = 0;
167 for id in ids {
168 if sessions.remove(id).is_some() {
169 removed += 1;
170 }
171 }
172 if removed > 0 {
173 tracing::debug!(removed, remaining = sessions.len(), "batch session cleanup");
174 }
175 removed
176 }
177
178 pub fn allocate_server_ports(&self) -> Result<(u16, u16)> {
184 let rtp = self.next_server_port.fetch_add(2, Ordering::SeqCst);
185
186 if rtp > SERVER_PORT_MAX {
187 tracing::warn!(rtp, "port range exhausted, wrapping to {SERVER_PORT_MIN}");
188 self.next_server_port
189 .store(SERVER_PORT_MIN, Ordering::SeqCst);
190 let rtp = self.next_server_port.fetch_add(2, Ordering::SeqCst);
191 return Ok((rtp as u16, rtp as u16 + 1));
192 }
193
194 tracing::trace!(
195 rtp_port = rtp,
196 rtcp_port = rtp + 1,
197 "allocated server ports"
198 );
199 Ok((rtp as u16, rtp as u16 + 1))
200 }
201
202 pub fn get_playing_sessions(&self) -> Vec<Arc<Session>> {
204 self.sessions
205 .read()
206 .values()
207 .filter(|s| s.is_playing())
208 .cloned()
209 .collect()
210 }
211}
212
213impl Default for SessionManager {
214 fn default() -> Self {
215 Self::new()
216 }
217}