Skip to main content

heliosdb_nano/session/
manager.rs

1//! Session Manager implementation
2//!
3//! Manages concurrent user sessions with ACID guarantees, resource quotas,
4//! and automatic cleanup of inactive sessions.
5//!
6//! # Features
7//!
8//! - **Multi-user support**: Each user can have multiple concurrent sessions
9//! - **Resource quotas**: Configurable limits on sessions per user
10//! - **Automatic cleanup**: Expired sessions are automatically cleaned up
11//! - **Thread-safe**: All operations are safe for concurrent access
12//!
13//! # Example
14//!
15//! ```rust,no_run
16//! use heliosdb_nano::session::{SessionManager, IsolationLevel, User};
17//!
18//! let manager = SessionManager::new();
19//! let user = User::new("alice", "password");
20//!
21//! // Create a session
22//! let session_id = manager.create_session(&user, IsolationLevel::RepeatableRead)?;
23//!
24//! // Use the session...
25//!
26//! // Clean up
27//! manager.destroy_session(session_id)?;
28//! # Ok::<(), Box<dyn std::error::Error>>(())
29//! ```
30
31use super::types::{Session, SessionId, IsolationLevel, User, UserId};
32use crate::{Error, Result};
33use dashmap::DashMap;
34use std::sync::{Arc, Mutex};
35use std::time::Instant;
36
37/// Resource quota configuration for controlling user resource usage
38///
39/// Quotas help prevent resource exhaustion in multi-tenant environments
40/// by limiting how many sessions and queries each user can consume.
41#[derive(Debug, Clone)]
42pub struct ResourceQuota {
43    /// Maximum concurrent sessions allowed per user (default: 10)
44    pub max_sessions: usize,
45    /// Maximum queries per session before forced termination (default: unlimited)
46    pub max_queries: u64,
47    /// Maximum connections per session (default: 100)
48    pub max_connections: u32,
49}
50
51impl Default for ResourceQuota {
52    fn default() -> Self {
53        Self {
54            max_sessions: 10,
55            max_queries: u64::MAX,
56            max_connections: 100,
57        }
58    }
59}
60
61/// Session Manager
62///
63/// Coordinates multi-user access to the database with per-session
64/// isolation levels and resource quotas.
65pub struct SessionManager {
66    /// Active sessions indexed by SessionId
67    sessions: Arc<DashMap<SessionId, Arc<parking_lot::RwLock<Session>>>>,
68    /// Session timeout in seconds (default: 3600 = 1 hour)
69    session_timeout_secs: u64,
70    /// Resource quota configuration
71    quota: ResourceQuota,
72    /// Last cleanup timestamp
73    last_cleanup: Arc<Mutex<Instant>>,
74}
75
76impl SessionManager {
77    /// Create a new SessionManager with default settings
78    pub fn new() -> Self {
79        Self {
80            sessions: Arc::new(DashMap::new()),
81            session_timeout_secs: 3600,
82            quota: ResourceQuota::default(),
83            last_cleanup: Arc::new(Mutex::new(Instant::now())),
84        }
85    }
86
87    /// Create a SessionManager with custom quota limits
88    pub fn with_quota(max_sessions_per_user: usize) -> Self {
89        Self {
90            sessions: Arc::new(DashMap::new()),
91            session_timeout_secs: 3600,
92            quota: ResourceQuota {
93                max_sessions: max_sessions_per_user,
94                ..Default::default()
95            },
96            last_cleanup: Arc::new(Mutex::new(Instant::now())),
97        }
98    }
99
100    /// Create a new session for a user
101    ///
102    /// # Arguments
103    ///
104    /// * `user` - User credentials
105    /// * `isolation` - Desired isolation level
106    ///
107    /// # Returns
108    ///
109    /// SessionId for the newly created session
110    pub fn create_session(&self, user: &User, isolation: IsolationLevel) -> Result<SessionId> {
111        // Enforce resource quota
112        self.enforce_quota(&user.id, &self.quota)?;
113
114        // Create new session
115        let session = Session::new(user.id, isolation);
116        let session_id = session.id;
117
118        // Register session
119        self.sessions.insert(session_id, Arc::new(parking_lot::RwLock::new(session)));
120
121        Ok(session_id)
122    }
123
124    /// Destroy a session
125    pub fn destroy_session(&self, session_id: SessionId) -> Result<()> {
126        self.sessions.remove(&session_id)
127            .ok_or_else(|| Error::Generic(format!("Session {:?} not found", session_id)))?;
128        Ok(())
129    }
130
131    /// Get a session by ID
132    pub fn get_session(&self, session_id: SessionId) -> Result<Arc<parking_lot::RwLock<Session>>> {
133        self.sessions.get(&session_id)
134            .map(|entry| Arc::clone(entry.value()))
135            .ok_or_else(|| Error::Generic(format!("Session {:?} not found", session_id)))
136    }
137
138    /// List all active sessions
139    pub fn list_active_sessions(&self) -> Vec<SessionId> {
140        self.sessions.iter()
141            .map(|entry| *entry.key())
142            .collect()
143    }
144
145    /// Delete a session by ID
146    pub fn delete_session(&self, session_id: SessionId) -> Result<()> {
147        self.destroy_session(session_id)
148    }
149
150    /// List all active session IDs
151    pub fn list_sessions(&self) -> Vec<SessionId> {
152        self.list_active_sessions()
153    }
154
155    /// Get all sessions for a specific user
156    pub fn get_user_sessions(&self, user_id: &UserId) -> Vec<SessionId> {
157        self.sessions.iter()
158            .filter(|entry| {
159                let session = entry.value().read();
160                session.user_id == *user_id
161            })
162            .map(|entry| *entry.key())
163            .collect()
164    }
165
166    /// Update last activity timestamp for a session
167    pub fn update_last_activity(&self, session_id: SessionId) -> Result<()> {
168        let session_lock = self.get_session(session_id)?;
169        let mut session = session_lock.write();
170        session.touch();
171        Ok(())
172    }
173
174    /// Cleanup inactive sessions based on timeout
175    ///
176    /// Returns the number of sessions cleaned up
177    pub fn cleanup_inactive_sessions(&self, timeout_secs: u64) -> usize {
178        // Update last cleanup time
179        if let Ok(mut last_cleanup) = self.last_cleanup.lock() {
180            *last_cleanup = Instant::now();
181        }
182
183        let now = std::time::SystemTime::now()
184            .duration_since(std::time::UNIX_EPOCH)
185            .unwrap_or_default()
186            .as_secs();
187
188        let expired: Vec<SessionId> = self.sessions
189            .iter()
190            .filter_map(|entry| {
191                let session = entry.value().read();
192                if now - session.last_activity > timeout_secs {
193                    Some(*entry.key())
194                } else {
195                    None
196                }
197            })
198            .collect();
199
200        let count = expired.len();
201        for session_id in expired {
202            let _ = self.sessions.remove(&session_id);
203        }
204
205        count
206    }
207
208    /// Clean up expired sessions (uses default timeout)
209    pub fn cleanup_expired_sessions(&self) -> usize {
210        self.cleanup_inactive_sessions(self.session_timeout_secs)
211    }
212
213    /// Enforce resource quota for a user
214    ///
215    /// # Arguments
216    /// * `user_id` - The user identifier
217    /// * `quota` - The resource quota to enforce
218    ///
219    /// # Returns
220    /// * `Ok(())` - If quota is not exceeded
221    /// * `Err(Error)` - If quota would be exceeded
222    pub fn enforce_quota(&self, user_id: &UserId, quota: &ResourceQuota) -> Result<()> {
223        // Count user's active sessions
224        let user_session_count = self.sessions
225            .iter()
226            .filter(|entry| {
227                let session = entry.value().read();
228                session.user_id == *user_id
229            })
230            .count();
231
232        if user_session_count >= quota.max_sessions {
233            return Err(Error::Generic(format!(
234                "Resource quota exceeded: user has {} sessions (max: {})",
235                user_session_count, quota.max_sessions
236            )));
237        }
238
239        Ok(())
240    }
241
242    /// Get total number of active sessions
243    pub fn session_count(&self) -> usize {
244        self.sessions.len()
245    }
246}
247
248impl Default for SessionManager {
249    fn default() -> Self {
250        Self::new()
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257
258    #[test]
259    fn test_create_session_success() {
260        let manager = SessionManager::new();
261        let user = User::new("alice", "password123");
262
263        let session_id = manager.create_session(&user, IsolationLevel::ReadCommitted)
264            .expect("Failed to create session");
265
266        assert!(manager.sessions.contains_key(&session_id));
267    }
268
269    #[test]
270    fn test_session_quota_enforcement() {
271        let manager = SessionManager::with_quota(1); // Max 1 session
272        let user = User::new("bob", "password456");
273
274        let _session1 = manager.create_session(&user, IsolationLevel::ReadCommitted)
275            .expect("First session should succeed");
276
277        let result = manager.create_session(&user, IsolationLevel::ReadCommitted);
278        assert!(result.is_err());
279    }
280
281    #[test]
282    fn test_concurrent_sessions_isolation() {
283        let manager = Arc::new(SessionManager::new());
284        let user1 = User::new("user1", "pass1");
285        let user2 = User::new("user2", "pass2");
286
287        let session1 = manager.create_session(&user1, IsolationLevel::ReadCommitted)
288            .expect("Failed to create session1");
289        let session2 = manager.create_session(&user2, IsolationLevel::ReadCommitted)
290            .expect("Failed to create session2");
291
292        // Both sessions should be independent
293        assert_ne!(session1, session2);
294        assert_eq!(manager.list_active_sessions().len(), 2);
295    }
296
297    #[test]
298    fn test_list_sessions() {
299        let manager = SessionManager::new();
300        let user1 = User::new("alice", "pass");
301        let user2 = User::new("bob", "pass");
302
303        let id1 = manager.create_session(&user1, IsolationLevel::ReadCommitted).unwrap();
304        let id2 = manager.create_session(&user2, IsolationLevel::RepeatableRead).unwrap();
305
306        let sessions = manager.list_sessions();
307        assert_eq!(sessions.len(), 2);
308        assert!(sessions.contains(&id1));
309        assert!(sessions.contains(&id2));
310    }
311
312    #[test]
313    fn test_get_user_sessions() {
314        let manager = SessionManager::new();
315        let user1 = User::new("alice", "pass");
316        let user2 = User::new("bob", "pass");
317
318        manager.create_session(&user1, IsolationLevel::ReadCommitted).unwrap();
319        manager.create_session(&user2, IsolationLevel::RepeatableRead).unwrap();
320        manager.create_session(&user1, IsolationLevel::Serializable).unwrap();
321
322        let alice_sessions = manager.get_user_sessions(&user1.id);
323        let bob_sessions = manager.get_user_sessions(&user2.id);
324
325        assert_eq!(alice_sessions.len(), 2);
326        assert_eq!(bob_sessions.len(), 1);
327    }
328
329    #[test]
330    fn test_update_last_activity() {
331        let manager = SessionManager::new();
332        let user = User::new("alice", "pass");
333        let session_id = manager.create_session(&user, IsolationLevel::ReadCommitted).unwrap();
334
335        // Sleep to ensure time difference
336        std::thread::sleep(std::time::Duration::from_millis(100));
337
338        let session_before = manager.get_session(session_id).unwrap();
339        let activity_before = session_before.read().last_activity;
340
341        std::thread::sleep(std::time::Duration::from_millis(100));
342
343        manager.update_last_activity(session_id).unwrap();
344
345        let session_after = manager.get_session(session_id).unwrap();
346        let activity_after = session_after.read().last_activity;
347
348        assert!(activity_after >= activity_before);
349    }
350
351    #[test]
352    fn test_cleanup_inactive_sessions() {
353        let manager = SessionManager::new();
354        let user1 = User::new("alice", "pass");
355        let user2 = User::new("bob", "pass");
356
357        manager.create_session(&user1, IsolationLevel::ReadCommitted).unwrap();
358        manager.create_session(&user2, IsolationLevel::RepeatableRead).unwrap();
359
360        // Wait for sessions to become inactive
361        std::thread::sleep(std::time::Duration::from_secs(2));
362
363        // Cleanup with 1 second timeout
364        let removed = manager.cleanup_inactive_sessions(1);
365        assert_eq!(removed, 2);
366        assert_eq!(manager.session_count(), 0);
367    }
368
369    #[test]
370    fn test_cleanup_keeps_active_sessions() {
371        let manager = SessionManager::new();
372        let user = User::new("alice", "pass");
373        let session_id = manager.create_session(&user, IsolationLevel::ReadCommitted).unwrap();
374
375        // Keep session active
376        std::thread::sleep(std::time::Duration::from_millis(500));
377        manager.update_last_activity(session_id).unwrap();
378
379        std::thread::sleep(std::time::Duration::from_millis(600));
380
381        // Cleanup with 1 second timeout (should not remove)
382        let removed = manager.cleanup_inactive_sessions(1);
383        assert_eq!(removed, 0);
384        assert_eq!(manager.session_count(), 1);
385    }
386
387    #[test]
388    fn test_delete_session() {
389        let manager = SessionManager::new();
390        let user = User::new("alice", "pass");
391        let session_id = manager.create_session(&user, IsolationLevel::ReadCommitted).unwrap();
392
393        assert_eq!(manager.session_count(), 1);
394
395        manager.delete_session(session_id).unwrap();
396
397        assert_eq!(manager.session_count(), 0);
398        assert!(manager.get_session(session_id).is_err());
399    }
400
401    #[test]
402    fn test_enforce_quota() {
403        let manager = SessionManager::with_quota(2);
404        let user = User::new("alice", "pass");
405
406        // Create two sessions (at limit)
407        manager.create_session(&user, IsolationLevel::ReadCommitted).unwrap();
408        manager.create_session(&user, IsolationLevel::RepeatableRead).unwrap();
409
410        // Third session should fail
411        let result = manager.create_session(&user, IsolationLevel::Serializable);
412        assert!(result.is_err());
413        assert!(result.unwrap_err().to_string().contains("quota exceeded"));
414    }
415
416    #[test]
417    fn test_quota_per_user() {
418        let manager = SessionManager::with_quota(1);
419        let user1 = User::new("alice", "pass");
420        let user2 = User::new("bob", "pass");
421
422        // Alice creates one session
423        manager.create_session(&user1, IsolationLevel::ReadCommitted).unwrap();
424
425        // Bob can still create a session (quota is per-user)
426        manager.create_session(&user2, IsolationLevel::RepeatableRead)
427            .expect("Bob's session should succeed");
428
429        // Alice's second session should fail
430        let result = manager.create_session(&user1, IsolationLevel::Serializable);
431        assert!(result.is_err());
432    }
433
434    #[test]
435    fn test_concurrent_session_creation() {
436        use std::thread;
437
438        let manager = Arc::new(SessionManager::new());
439        let mut handles = vec![];
440
441        // Spawn 10 threads, each creating a session
442        for i in 0..10 {
443            let manager_clone = Arc::clone(&manager);
444            let handle = thread::spawn(move || {
445                let user = User::new(format!("user_{}", i), "pass");
446                manager_clone.create_session(&user, IsolationLevel::ReadCommitted)
447            });
448            handles.push(handle);
449        }
450
451        // Wait for all threads to complete
452        let mut session_ids = vec![];
453        for handle in handles {
454            let session_id = handle.join().unwrap().unwrap();
455            session_ids.push(session_id);
456        }
457
458        // All session IDs should be unique
459        let original_len = session_ids.len();
460        session_ids.sort_by_key(|id| id.0);
461        session_ids.dedup();
462        assert_eq!(session_ids.len(), original_len);
463        assert_eq!(manager.session_count(), 10);
464    }
465
466    #[test]
467    fn test_concurrent_session_operations() {
468        use std::thread;
469
470        let manager = Arc::new(SessionManager::new());
471        let user1 = User::new("alice", "pass");
472        let user2 = User::new("bob", "pass");
473        let user3 = User::new("charlie", "pass");
474
475        // Create initial sessions
476        let id1 = manager.create_session(&user1, IsolationLevel::ReadCommitted).unwrap();
477        let id2 = manager.create_session(&user2, IsolationLevel::RepeatableRead).unwrap();
478        let _id3 = manager.create_session(&user3, IsolationLevel::Serializable).unwrap();
479
480        let mut handles = vec![];
481
482        // Thread 1: Update activity
483        {
484            let manager_clone = Arc::clone(&manager);
485            handles.push(thread::spawn(move || {
486                for _ in 0..100 {
487                    let _ = manager_clone.update_last_activity(id1);
488                }
489            }));
490        }
491
492        // Thread 2: List sessions
493        {
494            let manager_clone = Arc::clone(&manager);
495            handles.push(thread::spawn(move || {
496                for _ in 0..100 {
497                    let _ = manager_clone.list_sessions();
498                }
499            }));
500        }
501
502        // Thread 3: Get sessions
503        {
504            let manager_clone = Arc::clone(&manager);
505            handles.push(thread::spawn(move || {
506                for _ in 0..100 {
507                    let _ = manager_clone.get_session(id2);
508                }
509            }));
510        }
511
512        // Thread 4: Get user sessions
513        {
514            let manager_clone = Arc::clone(&manager);
515            let user_id = user1.id;
516            handles.push(thread::spawn(move || {
517                for _ in 0..100 {
518                    let _ = manager_clone.get_user_sessions(&user_id);
519                }
520            }));
521        }
522
523        // Wait for all threads
524        for handle in handles {
525            handle.join().unwrap();
526        }
527
528        // All sessions should still exist
529        assert_eq!(manager.session_count(), 3);
530    }
531
532    #[test]
533    fn test_isolation_levels() {
534        let manager = SessionManager::new();
535        let user1 = User::new("alice", "pass");
536        let user2 = User::new("bob", "pass");
537        let user3 = User::new("charlie", "pass");
538
539        let id1 = manager.create_session(&user1, IsolationLevel::ReadCommitted).unwrap();
540        let id2 = manager.create_session(&user2, IsolationLevel::RepeatableRead).unwrap();
541        let id3 = manager.create_session(&user3, IsolationLevel::Serializable).unwrap();
542
543        let session1_lock = manager.get_session(id1).unwrap();
544        let session1 = session1_lock.read();
545        assert_eq!(session1.isolation_level, IsolationLevel::ReadCommitted);
546        drop(session1);
547
548        let session2_lock = manager.get_session(id2).unwrap();
549        let session2 = session2_lock.read();
550        assert_eq!(session2.isolation_level, IsolationLevel::RepeatableRead);
551        drop(session2);
552
553        let session3_lock = manager.get_session(id3).unwrap();
554        let session3 = session3_lock.read();
555        assert_eq!(session3.isolation_level, IsolationLevel::Serializable);
556    }
557}