pjson_rs/infrastructure/services/
session_manager.rs1use crate::{
7 ApplicationResult,
8 domain::{DomainError, aggregates::stream_session::StreamSession, value_objects::SessionId},
9};
10use dashmap::DashMap;
11use std::{sync::Arc, time::Duration};
12use tokio::{
13 sync::RwLock,
14 time::{Instant as TokioInstant, interval},
15};
16
17#[derive(Debug, Clone)]
19pub struct SessionManagerConfig {
20 pub cleanup_interval_seconds: u64,
22 pub max_sessions: usize,
24 pub default_timeout_seconds: u64,
26 pub grace_period_seconds: u64,
28}
29
30impl Default for SessionManagerConfig {
31 fn default() -> Self {
32 Self {
33 cleanup_interval_seconds: 60, max_sessions: 10_000, default_timeout_seconds: 3600, grace_period_seconds: 300, }
38 }
39}
40
41#[derive(Debug, Clone, Default)]
43pub struct SessionManagerStats {
44 pub active_sessions: usize,
46 pub timeout_cleanups: u64,
48 pub graceful_cleanups: u64,
50 pub last_cleanup_at: Option<TokioInstant>,
52 pub average_session_duration: f64,
54}
55
56pub struct SessionManager {
58 sessions: Arc<DashMap<SessionId, Arc<RwLock<StreamSession>>>>,
60 config: SessionManagerConfig,
62 stats: Arc<RwLock<SessionManagerStats>>,
64 cleanup_handle: Option<tokio::task::JoinHandle<()>>,
66}
67
68impl Default for SessionManager {
69 fn default() -> Self {
70 Self::with_config(SessionManagerConfig::default())
71 }
72}
73
74impl SessionManager {
75 pub fn new() -> Self {
77 Self::default()
78 }
79
80 pub fn with_config(config: SessionManagerConfig) -> Self {
82 Self {
83 sessions: Arc::new(DashMap::new()),
84 config,
85 stats: Arc::new(RwLock::new(SessionManagerStats::default())),
86 cleanup_handle: None,
87 }
88 }
89
90 pub async fn start(&mut self) -> ApplicationResult<()> {
92 if self.cleanup_handle.is_some() {
93 return Err(
94 DomainError::InternalError("Session manager already started".to_string()).into(),
95 );
96 }
97
98 let sessions = Arc::clone(&self.sessions);
99 let stats = Arc::clone(&self.stats);
100 let config = self.config.clone();
101
102 let handle = tokio::spawn(async move {
103 Self::cleanup_task(sessions, stats, config).await;
104 });
105
106 self.cleanup_handle = Some(handle);
107 Ok(())
108 }
109
110 pub async fn stop(&mut self) -> ApplicationResult<()> {
112 if let Some(handle) = self.cleanup_handle.take() {
113 handle.abort();
114 let _ = handle.await;
115 }
116 Ok(())
117 }
118
119 pub async fn add_session(&self, session: StreamSession) -> ApplicationResult<SessionId> {
121 let session_id = session.id();
122
123 if self.sessions.len() >= self.config.max_sessions {
125 return Err(DomainError::ResourceExhausted(format!(
126 "Maximum sessions limit reached: {}",
127 self.config.max_sessions
128 ))
129 .into());
130 }
131
132 self.sessions
133 .insert(session_id, Arc::new(RwLock::new(session)));
134
135 let mut stats = self.stats.write().await;
137 stats.active_sessions = self.sessions.len();
138
139 Ok(session_id)
140 }
141
142 pub async fn get_session(&self, session_id: &SessionId) -> Option<Arc<RwLock<StreamSession>>> {
144 self.sessions
145 .get(session_id)
146 .map(|entry| Arc::clone(entry.value()))
147 }
148
149 pub async fn remove_session(&self, session_id: &SessionId) -> ApplicationResult<bool> {
151 let removed = self.sessions.remove(session_id).is_some();
152
153 if removed {
154 let mut stats = self.stats.write().await;
155 stats.active_sessions = self.sessions.len();
156 stats.graceful_cleanups += 1;
157 }
158
159 Ok(removed)
160 }
161
162 pub async fn cleanup_expired_sessions(&self) -> ApplicationResult<CleanupReport> {
164 let mut report = CleanupReport::default();
165 let mut sessions_to_remove = Vec::new();
166
167 for entry in self.sessions.iter() {
169 let session_id = *entry.key();
170 let session_arc = entry.value();
171
172 let mut session = session_arc.write().await;
173
174 if session.is_expired() {
175 match session.force_close_expired() {
177 Ok(was_closed) => {
178 if was_closed {
179 sessions_to_remove.push(session_id);
180 report.timeout_cleanups += 1;
181 }
182 }
183 Err(e) => {
184 report
185 .errors
186 .push(format!("Failed to close session {}: {}", session_id, e));
187 }
188 }
189 }
190 }
191
192 for session_id in sessions_to_remove {
194 self.sessions.remove(&session_id);
195 report.sessions_removed += 1;
196 }
197
198 let mut stats = self.stats.write().await;
200 stats.active_sessions = self.sessions.len();
201 stats.timeout_cleanups += report.timeout_cleanups;
202 stats.last_cleanup_at = Some(TokioInstant::now());
203
204 Ok(report)
205 }
206
207 pub async fn stats(&self) -> SessionManagerStats {
209 self.stats.read().await.clone()
210 }
211
212 async fn cleanup_task(
214 sessions: Arc<DashMap<SessionId, Arc<RwLock<StreamSession>>>>,
215 stats: Arc<RwLock<SessionManagerStats>>,
216 config: SessionManagerConfig,
217 ) {
218 let mut interval = interval(Duration::from_secs(config.cleanup_interval_seconds));
219
220 loop {
221 interval.tick().await;
222
223 let mut cleanup_count = 0;
224 let mut sessions_to_remove = Vec::new();
225
226 for entry in sessions.iter() {
228 let session_id = *entry.key();
229 let session_arc = entry.value();
230
231 let mut session = session_arc.write().await;
232
233 if session.is_expired() {
234 match session.force_close_expired() {
235 Ok(was_closed) => {
236 if was_closed {
237 sessions_to_remove.push(session_id);
238 cleanup_count += 1;
239 }
240 }
241 Err(_) => {
242 sessions_to_remove.push(session_id);
244 }
245 }
246 }
247 }
248
249 for session_id in sessions_to_remove {
251 sessions.remove(&session_id);
252 }
253
254 if cleanup_count > 0 {
256 let mut stats_guard = stats.write().await;
257 stats_guard.active_sessions = sessions.len();
258 stats_guard.timeout_cleanups += cleanup_count;
259 stats_guard.last_cleanup_at = Some(TokioInstant::now());
260 }
261 }
262 }
263}
264
265impl Drop for SessionManager {
266 fn drop(&mut self) {
267 if let Some(handle) = self.cleanup_handle.take() {
268 handle.abort();
269 }
270 }
271}
272
273#[derive(Debug, Clone, Default)]
275pub struct CleanupReport {
276 pub sessions_removed: usize,
278 pub timeout_cleanups: u64,
280 pub errors: Vec<String>,
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287 use crate::domain::aggregates::stream_session::SessionConfig;
288 use tokio::time::{Duration, sleep};
289
290 #[tokio::test]
291 async fn test_session_manager_creation() {
292 let manager = SessionManager::new();
293 let stats = manager.stats().await;
294
295 assert_eq!(stats.active_sessions, 0);
296 assert_eq!(stats.timeout_cleanups, 0);
297 }
298
299 #[tokio::test]
300 async fn test_add_and_remove_session() {
301 let manager = SessionManager::new();
302 let session = StreamSession::new(SessionConfig::default());
303 let session_id = session.id();
304
305 let added_id = manager.add_session(session).await.unwrap();
307 assert_eq!(added_id, session_id);
308
309 let stats = manager.stats().await;
310 assert_eq!(stats.active_sessions, 1);
311
312 let retrieved = manager.get_session(&session_id).await;
314 assert!(retrieved.is_some());
315
316 let removed = manager.remove_session(&session_id).await.unwrap();
318 assert!(removed);
319
320 let stats = manager.stats().await;
321 assert_eq!(stats.active_sessions, 0);
322 }
323
324 #[tokio::test]
325 async fn test_cleanup_expired_sessions() {
326 let manager = SessionManager::new();
327
328 let session_config = SessionConfig {
330 session_timeout_seconds: 1, ..SessionConfig::default()
332 };
333
334 let session = StreamSession::new(session_config);
335 let _session_id = session.id();
336
337 manager.add_session(session).await.unwrap();
338
339 sleep(Duration::from_secs(2)).await;
341
342 let report = manager.cleanup_expired_sessions().await.unwrap();
344
345 assert_eq!(report.sessions_removed, 1);
346 assert_eq!(report.timeout_cleanups, 1);
347 assert!(report.errors.is_empty());
348
349 let stats = manager.stats().await;
350 assert_eq!(stats.active_sessions, 0);
351 }
352
353 #[tokio::test]
354 async fn test_session_manager_automatic_cleanup() {
355 let config = SessionManagerConfig {
356 cleanup_interval_seconds: 1, ..Default::default()
358 };
359
360 let mut manager = SessionManager::with_config(config);
361 manager.start().await.unwrap();
362
363 let session_config = SessionConfig {
365 session_timeout_seconds: 1,
366 ..SessionConfig::default()
367 };
368
369 let session = StreamSession::new(session_config);
370 manager.add_session(session).await.unwrap();
371
372 sleep(Duration::from_secs(3)).await;
374
375 let stats = manager.stats().await;
376 assert_eq!(stats.active_sessions, 0);
377 assert!(stats.timeout_cleanups > 0);
378
379 manager.stop().await.unwrap();
380 }
381
382 #[tokio::test]
383 async fn test_session_capacity_limit() {
384 let config = SessionManagerConfig {
385 max_sessions: 2,
386 ..Default::default()
387 };
388
389 let manager = SessionManager::with_config(config);
390
391 for _ in 0..2 {
393 let session = StreamSession::new(SessionConfig::default());
394 manager.add_session(session).await.unwrap();
395 }
396
397 let session = StreamSession::new(SessionConfig::default());
399 let result = manager.add_session(session).await;
400 assert!(result.is_err());
401 }
402}