pjson_rs/infrastructure/services/
session_manager.rs

1//! Session management service with timeout monitoring and cleanup
2//!
3//! This service provides centralized session lifecycle management with
4//! automatic timeout detection and cleanup capabilities.
5
6use crate::{
7    ApplicationResult,
8    domain::{DomainError, aggregates::stream_session::StreamSession, value_objects::SessionId},
9};
10use dashmap::DashMap;
11use std::{sync::Arc, time::Duration};
12use tokio::{
13    sync::RwLock,
14    time::{Instant as TokioInstant, interval},
15};
16
17/// Configuration for session management
18#[derive(Debug, Clone)]
19pub struct SessionManagerConfig {
20    /// How often to check for expired sessions (in seconds)
21    pub cleanup_interval_seconds: u64,
22    /// Maximum number of sessions to keep in memory
23    pub max_sessions: usize,
24    /// Default session timeout if not specified
25    pub default_timeout_seconds: u64,
26    /// Grace period before forced cleanup (in seconds)
27    pub grace_period_seconds: u64,
28}
29
30impl Default for SessionManagerConfig {
31    fn default() -> Self {
32        Self {
33            cleanup_interval_seconds: 60,  // Check every minute
34            max_sessions: 10_000,          // 10K concurrent sessions max
35            default_timeout_seconds: 3600, // 1 hour default
36            grace_period_seconds: 300,     // 5 minute grace period
37        }
38    }
39}
40
41/// Statistics about session management
42#[derive(Debug, Clone, Default)]
43pub struct SessionManagerStats {
44    /// Total number of active sessions
45    pub active_sessions: usize,
46    /// Number of sessions cleaned up due to timeout
47    pub timeout_cleanups: u64,
48    /// Number of sessions cleaned up gracefully
49    pub graceful_cleanups: u64,
50    /// Last cleanup timestamp
51    pub last_cleanup_at: Option<TokioInstant>,
52    /// Average session duration in seconds
53    pub average_session_duration: f64,
54}
55
56/// Session management service with timeout monitoring
57pub struct SessionManager {
58    /// Active sessions storage
59    sessions: Arc<DashMap<SessionId, Arc<RwLock<StreamSession>>>>,
60    /// Configuration
61    config: SessionManagerConfig,
62    /// Statistics
63    stats: Arc<RwLock<SessionManagerStats>>,
64    /// Cleanup task handle
65    cleanup_handle: Option<tokio::task::JoinHandle<()>>,
66}
67
68impl Default for SessionManager {
69    fn default() -> Self {
70        Self::with_config(SessionManagerConfig::default())
71    }
72}
73
74impl SessionManager {
75    /// Create new session manager with default config
76    pub fn new() -> Self {
77        Self::default()
78    }
79
80    /// Create new session manager with custom config
81    pub fn with_config(config: SessionManagerConfig) -> Self {
82        Self {
83            sessions: Arc::new(DashMap::new()),
84            config,
85            stats: Arc::new(RwLock::new(SessionManagerStats::default())),
86            cleanup_handle: None,
87        }
88    }
89
90    /// Start the session manager with automatic cleanup
91    pub async fn start(&mut self) -> ApplicationResult<()> {
92        if self.cleanup_handle.is_some() {
93            return Err(
94                DomainError::InternalError("Session manager already started".to_string()).into(),
95            );
96        }
97
98        let sessions = Arc::clone(&self.sessions);
99        let stats = Arc::clone(&self.stats);
100        let config = self.config.clone();
101
102        let handle = tokio::spawn(async move {
103            Self::cleanup_task(sessions, stats, config).await;
104        });
105
106        self.cleanup_handle = Some(handle);
107        Ok(())
108    }
109
110    /// Stop the session manager
111    pub async fn stop(&mut self) -> ApplicationResult<()> {
112        if let Some(handle) = self.cleanup_handle.take() {
113            handle.abort();
114            let _ = handle.await;
115        }
116        Ok(())
117    }
118
119    /// Add session to management
120    pub async fn add_session(&self, session: StreamSession) -> ApplicationResult<SessionId> {
121        let session_id = session.id();
122
123        // Check capacity
124        if self.sessions.len() >= self.config.max_sessions {
125            return Err(DomainError::ResourceExhausted(format!(
126                "Maximum sessions limit reached: {}",
127                self.config.max_sessions
128            ))
129            .into());
130        }
131
132        self.sessions
133            .insert(session_id, Arc::new(RwLock::new(session)));
134
135        // Update stats
136        let mut stats = self.stats.write().await;
137        stats.active_sessions = self.sessions.len();
138
139        Ok(session_id)
140    }
141
142    /// Get session by ID
143    pub async fn get_session(&self, session_id: &SessionId) -> Option<Arc<RwLock<StreamSession>>> {
144        self.sessions
145            .get(session_id)
146            .map(|entry| Arc::clone(entry.value()))
147    }
148
149    /// Remove session from management
150    pub async fn remove_session(&self, session_id: &SessionId) -> ApplicationResult<bool> {
151        let removed = self.sessions.remove(session_id).is_some();
152
153        if removed {
154            let mut stats = self.stats.write().await;
155            stats.active_sessions = self.sessions.len();
156            stats.graceful_cleanups += 1;
157        }
158
159        Ok(removed)
160    }
161
162    /// Force cleanup of expired sessions
163    pub async fn cleanup_expired_sessions(&self) -> ApplicationResult<CleanupReport> {
164        let mut report = CleanupReport::default();
165        let mut sessions_to_remove = Vec::new();
166
167        // Collect expired sessions
168        for entry in self.sessions.iter() {
169            let session_id = *entry.key();
170            let session_arc = entry.value();
171
172            let mut session = session_arc.write().await;
173
174            if session.is_expired() {
175                // Try to force close with cleanup
176                match session.force_close_expired() {
177                    Ok(was_closed) => {
178                        if was_closed {
179                            sessions_to_remove.push(session_id);
180                            report.timeout_cleanups += 1;
181                        }
182                    }
183                    Err(e) => {
184                        report
185                            .errors
186                            .push(format!("Failed to close session {}: {}", session_id, e));
187                    }
188                }
189            }
190        }
191
192        // Remove cleaned up sessions
193        for session_id in sessions_to_remove {
194            self.sessions.remove(&session_id);
195            report.sessions_removed += 1;
196        }
197
198        // Update stats
199        let mut stats = self.stats.write().await;
200        stats.active_sessions = self.sessions.len();
201        stats.timeout_cleanups += report.timeout_cleanups;
202        stats.last_cleanup_at = Some(TokioInstant::now());
203
204        Ok(report)
205    }
206
207    /// Get current statistics
208    pub async fn stats(&self) -> SessionManagerStats {
209        self.stats.read().await.clone()
210    }
211
212    /// Internal cleanup task
213    async fn cleanup_task(
214        sessions: Arc<DashMap<SessionId, Arc<RwLock<StreamSession>>>>,
215        stats: Arc<RwLock<SessionManagerStats>>,
216        config: SessionManagerConfig,
217    ) {
218        let mut interval = interval(Duration::from_secs(config.cleanup_interval_seconds));
219
220        loop {
221            interval.tick().await;
222
223            let mut cleanup_count = 0;
224            let mut sessions_to_remove = Vec::new();
225
226            // Check each session for expiration
227            for entry in sessions.iter() {
228                let session_id = *entry.key();
229                let session_arc = entry.value();
230
231                let mut session = session_arc.write().await;
232
233                if session.is_expired() {
234                    match session.force_close_expired() {
235                        Ok(was_closed) => {
236                            if was_closed {
237                                sessions_to_remove.push(session_id);
238                                cleanup_count += 1;
239                            }
240                        }
241                        Err(_) => {
242                            // Log error but continue cleanup
243                            sessions_to_remove.push(session_id);
244                        }
245                    }
246                }
247            }
248
249            // Remove expired sessions
250            for session_id in sessions_to_remove {
251                sessions.remove(&session_id);
252            }
253
254            // Update stats
255            if cleanup_count > 0 {
256                let mut stats_guard = stats.write().await;
257                stats_guard.active_sessions = sessions.len();
258                stats_guard.timeout_cleanups += cleanup_count;
259                stats_guard.last_cleanup_at = Some(TokioInstant::now());
260            }
261        }
262    }
263}
264
265impl Drop for SessionManager {
266    fn drop(&mut self) {
267        if let Some(handle) = self.cleanup_handle.take() {
268            handle.abort();
269        }
270    }
271}
272
273/// Report from cleanup operation
274#[derive(Debug, Clone, Default)]
275pub struct CleanupReport {
276    /// Number of sessions removed
277    pub sessions_removed: usize,
278    /// Number of sessions closed due to timeout
279    pub timeout_cleanups: u64,
280    /// Any errors encountered during cleanup
281    pub errors: Vec<String>,
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287    use crate::domain::aggregates::stream_session::SessionConfig;
288    use tokio::time::{Duration, sleep};
289
290    #[tokio::test]
291    async fn test_session_manager_creation() {
292        let manager = SessionManager::new();
293        let stats = manager.stats().await;
294
295        assert_eq!(stats.active_sessions, 0);
296        assert_eq!(stats.timeout_cleanups, 0);
297    }
298
299    #[tokio::test]
300    async fn test_add_and_remove_session() {
301        let manager = SessionManager::new();
302        let session = StreamSession::new(SessionConfig::default());
303        let session_id = session.id();
304
305        // Add session
306        let added_id = manager.add_session(session).await.unwrap();
307        assert_eq!(added_id, session_id);
308
309        let stats = manager.stats().await;
310        assert_eq!(stats.active_sessions, 1);
311
312        // Get session
313        let retrieved = manager.get_session(&session_id).await;
314        assert!(retrieved.is_some());
315
316        // Remove session
317        let removed = manager.remove_session(&session_id).await.unwrap();
318        assert!(removed);
319
320        let stats = manager.stats().await;
321        assert_eq!(stats.active_sessions, 0);
322    }
323
324    #[tokio::test]
325    async fn test_cleanup_expired_sessions() {
326        let manager = SessionManager::new();
327
328        // Create session with very short timeout
329        let session_config = SessionConfig {
330            session_timeout_seconds: 1, // 1 second
331            ..SessionConfig::default()
332        };
333
334        let session = StreamSession::new(session_config);
335        let _session_id = session.id();
336
337        manager.add_session(session).await.unwrap();
338
339        // Wait for expiration
340        sleep(Duration::from_secs(2)).await;
341
342        // Cleanup expired sessions
343        let report = manager.cleanup_expired_sessions().await.unwrap();
344
345        assert_eq!(report.sessions_removed, 1);
346        assert_eq!(report.timeout_cleanups, 1);
347        assert!(report.errors.is_empty());
348
349        let stats = manager.stats().await;
350        assert_eq!(stats.active_sessions, 0);
351    }
352
353    #[tokio::test]
354    async fn test_session_manager_automatic_cleanup() {
355        let config = SessionManagerConfig {
356            cleanup_interval_seconds: 1, // Clean every second
357            ..Default::default()
358        };
359
360        let mut manager = SessionManager::with_config(config);
361        manager.start().await.unwrap();
362
363        // Create expired session
364        let session_config = SessionConfig {
365            session_timeout_seconds: 1,
366            ..SessionConfig::default()
367        };
368
369        let session = StreamSession::new(session_config);
370        manager.add_session(session).await.unwrap();
371
372        // Wait for automatic cleanup
373        sleep(Duration::from_secs(3)).await;
374
375        let stats = manager.stats().await;
376        assert_eq!(stats.active_sessions, 0);
377        assert!(stats.timeout_cleanups > 0);
378
379        manager.stop().await.unwrap();
380    }
381
382    #[tokio::test]
383    async fn test_session_capacity_limit() {
384        let config = SessionManagerConfig {
385            max_sessions: 2,
386            ..Default::default()
387        };
388
389        let manager = SessionManager::with_config(config);
390
391        // Add sessions up to limit
392        for _ in 0..2 {
393            let session = StreamSession::new(SessionConfig::default());
394            manager.add_session(session).await.unwrap();
395        }
396
397        // Try to add one more (should fail)
398        let session = StreamSession::new(SessionConfig::default());
399        let result = manager.add_session(session).await;
400        assert!(result.is_err());
401    }
402}