1use super::types::{Session, SessionId, IsolationLevel, User, UserId};
32use crate::{Error, Result};
33use dashmap::DashMap;
34use std::sync::{Arc, Mutex};
35use std::time::Instant;
36
37#[derive(Debug, Clone)]
42pub struct ResourceQuota {
43 pub max_sessions: usize,
45 pub max_queries: u64,
47 pub max_connections: u32,
49}
50
51impl Default for ResourceQuota {
52 fn default() -> Self {
53 Self {
54 max_sessions: 10,
55 max_queries: u64::MAX,
56 max_connections: 100,
57 }
58 }
59}
60
61pub struct SessionManager {
66 sessions: Arc<DashMap<SessionId, Arc<parking_lot::RwLock<Session>>>>,
68 session_timeout_secs: u64,
70 quota: ResourceQuota,
72 last_cleanup: Arc<Mutex<Instant>>,
74}
75
76impl SessionManager {
77 pub fn new() -> Self {
79 Self {
80 sessions: Arc::new(DashMap::new()),
81 session_timeout_secs: 3600,
82 quota: ResourceQuota::default(),
83 last_cleanup: Arc::new(Mutex::new(Instant::now())),
84 }
85 }
86
87 pub fn with_quota(max_sessions_per_user: usize) -> Self {
89 Self {
90 sessions: Arc::new(DashMap::new()),
91 session_timeout_secs: 3600,
92 quota: ResourceQuota {
93 max_sessions: max_sessions_per_user,
94 ..Default::default()
95 },
96 last_cleanup: Arc::new(Mutex::new(Instant::now())),
97 }
98 }
99
100 pub fn create_session(&self, user: &User, isolation: IsolationLevel) -> Result<SessionId> {
111 self.enforce_quota(&user.id, &self.quota)?;
113
114 let session = Session::new(user.id, isolation);
116 let session_id = session.id;
117
118 self.sessions.insert(session_id, Arc::new(parking_lot::RwLock::new(session)));
120
121 Ok(session_id)
122 }
123
124 pub fn destroy_session(&self, session_id: SessionId) -> Result<()> {
126 self.sessions.remove(&session_id)
127 .ok_or_else(|| Error::Generic(format!("Session {:?} not found", session_id)))?;
128 Ok(())
129 }
130
131 pub fn get_session(&self, session_id: SessionId) -> Result<Arc<parking_lot::RwLock<Session>>> {
133 self.sessions.get(&session_id)
134 .map(|entry| Arc::clone(entry.value()))
135 .ok_or_else(|| Error::Generic(format!("Session {:?} not found", session_id)))
136 }
137
138 pub fn list_active_sessions(&self) -> Vec<SessionId> {
140 self.sessions.iter()
141 .map(|entry| *entry.key())
142 .collect()
143 }
144
145 pub fn delete_session(&self, session_id: SessionId) -> Result<()> {
147 self.destroy_session(session_id)
148 }
149
150 pub fn list_sessions(&self) -> Vec<SessionId> {
152 self.list_active_sessions()
153 }
154
155 pub fn get_user_sessions(&self, user_id: &UserId) -> Vec<SessionId> {
157 self.sessions.iter()
158 .filter(|entry| {
159 let session = entry.value().read();
160 session.user_id == *user_id
161 })
162 .map(|entry| *entry.key())
163 .collect()
164 }
165
166 pub fn update_last_activity(&self, session_id: SessionId) -> Result<()> {
168 let session_lock = self.get_session(session_id)?;
169 let mut session = session_lock.write();
170 session.touch();
171 Ok(())
172 }
173
174 pub fn cleanup_inactive_sessions(&self, timeout_secs: u64) -> usize {
178 if let Ok(mut last_cleanup) = self.last_cleanup.lock() {
180 *last_cleanup = Instant::now();
181 }
182
183 let now = std::time::SystemTime::now()
184 .duration_since(std::time::UNIX_EPOCH)
185 .unwrap_or_default()
186 .as_secs();
187
188 let expired: Vec<SessionId> = self.sessions
189 .iter()
190 .filter_map(|entry| {
191 let session = entry.value().read();
192 if now - session.last_activity > timeout_secs {
193 Some(*entry.key())
194 } else {
195 None
196 }
197 })
198 .collect();
199
200 let count = expired.len();
201 for session_id in expired {
202 let _ = self.sessions.remove(&session_id);
203 }
204
205 count
206 }
207
208 pub fn cleanup_expired_sessions(&self) -> usize {
210 self.cleanup_inactive_sessions(self.session_timeout_secs)
211 }
212
213 pub fn enforce_quota(&self, user_id: &UserId, quota: &ResourceQuota) -> Result<()> {
223 let user_session_count = self.sessions
225 .iter()
226 .filter(|entry| {
227 let session = entry.value().read();
228 session.user_id == *user_id
229 })
230 .count();
231
232 if user_session_count >= quota.max_sessions {
233 return Err(Error::Generic(format!(
234 "Resource quota exceeded: user has {} sessions (max: {})",
235 user_session_count, quota.max_sessions
236 )));
237 }
238
239 Ok(())
240 }
241
242 pub fn session_count(&self) -> usize {
244 self.sessions.len()
245 }
246}
247
248impl Default for SessionManager {
249 fn default() -> Self {
250 Self::new()
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257
258 #[test]
259 fn test_create_session_success() {
260 let manager = SessionManager::new();
261 let user = User::new("alice", "password123");
262
263 let session_id = manager.create_session(&user, IsolationLevel::ReadCommitted)
264 .expect("Failed to create session");
265
266 assert!(manager.sessions.contains_key(&session_id));
267 }
268
269 #[test]
270 fn test_session_quota_enforcement() {
271 let manager = SessionManager::with_quota(1); let user = User::new("bob", "password456");
273
274 let _session1 = manager.create_session(&user, IsolationLevel::ReadCommitted)
275 .expect("First session should succeed");
276
277 let result = manager.create_session(&user, IsolationLevel::ReadCommitted);
278 assert!(result.is_err());
279 }
280
281 #[test]
282 fn test_concurrent_sessions_isolation() {
283 let manager = Arc::new(SessionManager::new());
284 let user1 = User::new("user1", "pass1");
285 let user2 = User::new("user2", "pass2");
286
287 let session1 = manager.create_session(&user1, IsolationLevel::ReadCommitted)
288 .expect("Failed to create session1");
289 let session2 = manager.create_session(&user2, IsolationLevel::ReadCommitted)
290 .expect("Failed to create session2");
291
292 assert_ne!(session1, session2);
294 assert_eq!(manager.list_active_sessions().len(), 2);
295 }
296
297 #[test]
298 fn test_list_sessions() {
299 let manager = SessionManager::new();
300 let user1 = User::new("alice", "pass");
301 let user2 = User::new("bob", "pass");
302
303 let id1 = manager.create_session(&user1, IsolationLevel::ReadCommitted).unwrap();
304 let id2 = manager.create_session(&user2, IsolationLevel::RepeatableRead).unwrap();
305
306 let sessions = manager.list_sessions();
307 assert_eq!(sessions.len(), 2);
308 assert!(sessions.contains(&id1));
309 assert!(sessions.contains(&id2));
310 }
311
312 #[test]
313 fn test_get_user_sessions() {
314 let manager = SessionManager::new();
315 let user1 = User::new("alice", "pass");
316 let user2 = User::new("bob", "pass");
317
318 manager.create_session(&user1, IsolationLevel::ReadCommitted).unwrap();
319 manager.create_session(&user2, IsolationLevel::RepeatableRead).unwrap();
320 manager.create_session(&user1, IsolationLevel::Serializable).unwrap();
321
322 let alice_sessions = manager.get_user_sessions(&user1.id);
323 let bob_sessions = manager.get_user_sessions(&user2.id);
324
325 assert_eq!(alice_sessions.len(), 2);
326 assert_eq!(bob_sessions.len(), 1);
327 }
328
329 #[test]
330 fn test_update_last_activity() {
331 let manager = SessionManager::new();
332 let user = User::new("alice", "pass");
333 let session_id = manager.create_session(&user, IsolationLevel::ReadCommitted).unwrap();
334
335 std::thread::sleep(std::time::Duration::from_millis(100));
337
338 let session_before = manager.get_session(session_id).unwrap();
339 let activity_before = session_before.read().last_activity;
340
341 std::thread::sleep(std::time::Duration::from_millis(100));
342
343 manager.update_last_activity(session_id).unwrap();
344
345 let session_after = manager.get_session(session_id).unwrap();
346 let activity_after = session_after.read().last_activity;
347
348 assert!(activity_after >= activity_before);
349 }
350
351 #[test]
352 fn test_cleanup_inactive_sessions() {
353 let manager = SessionManager::new();
354 let user1 = User::new("alice", "pass");
355 let user2 = User::new("bob", "pass");
356
357 manager.create_session(&user1, IsolationLevel::ReadCommitted).unwrap();
358 manager.create_session(&user2, IsolationLevel::RepeatableRead).unwrap();
359
360 std::thread::sleep(std::time::Duration::from_secs(2));
362
363 let removed = manager.cleanup_inactive_sessions(1);
365 assert_eq!(removed, 2);
366 assert_eq!(manager.session_count(), 0);
367 }
368
369 #[test]
370 fn test_cleanup_keeps_active_sessions() {
371 let manager = SessionManager::new();
372 let user = User::new("alice", "pass");
373 let session_id = manager.create_session(&user, IsolationLevel::ReadCommitted).unwrap();
374
375 std::thread::sleep(std::time::Duration::from_millis(500));
377 manager.update_last_activity(session_id).unwrap();
378
379 std::thread::sleep(std::time::Duration::from_millis(600));
380
381 let removed = manager.cleanup_inactive_sessions(1);
383 assert_eq!(removed, 0);
384 assert_eq!(manager.session_count(), 1);
385 }
386
387 #[test]
388 fn test_delete_session() {
389 let manager = SessionManager::new();
390 let user = User::new("alice", "pass");
391 let session_id = manager.create_session(&user, IsolationLevel::ReadCommitted).unwrap();
392
393 assert_eq!(manager.session_count(), 1);
394
395 manager.delete_session(session_id).unwrap();
396
397 assert_eq!(manager.session_count(), 0);
398 assert!(manager.get_session(session_id).is_err());
399 }
400
401 #[test]
402 fn test_enforce_quota() {
403 let manager = SessionManager::with_quota(2);
404 let user = User::new("alice", "pass");
405
406 manager.create_session(&user, IsolationLevel::ReadCommitted).unwrap();
408 manager.create_session(&user, IsolationLevel::RepeatableRead).unwrap();
409
410 let result = manager.create_session(&user, IsolationLevel::Serializable);
412 assert!(result.is_err());
413 assert!(result.unwrap_err().to_string().contains("quota exceeded"));
414 }
415
416 #[test]
417 fn test_quota_per_user() {
418 let manager = SessionManager::with_quota(1);
419 let user1 = User::new("alice", "pass");
420 let user2 = User::new("bob", "pass");
421
422 manager.create_session(&user1, IsolationLevel::ReadCommitted).unwrap();
424
425 manager.create_session(&user2, IsolationLevel::RepeatableRead)
427 .expect("Bob's session should succeed");
428
429 let result = manager.create_session(&user1, IsolationLevel::Serializable);
431 assert!(result.is_err());
432 }
433
434 #[test]
435 fn test_concurrent_session_creation() {
436 use std::thread;
437
438 let manager = Arc::new(SessionManager::new());
439 let mut handles = vec![];
440
441 for i in 0..10 {
443 let manager_clone = Arc::clone(&manager);
444 let handle = thread::spawn(move || {
445 let user = User::new(format!("user_{}", i), "pass");
446 manager_clone.create_session(&user, IsolationLevel::ReadCommitted)
447 });
448 handles.push(handle);
449 }
450
451 let mut session_ids = vec![];
453 for handle in handles {
454 let session_id = handle.join().unwrap().unwrap();
455 session_ids.push(session_id);
456 }
457
458 let original_len = session_ids.len();
460 session_ids.sort_by_key(|id| id.0);
461 session_ids.dedup();
462 assert_eq!(session_ids.len(), original_len);
463 assert_eq!(manager.session_count(), 10);
464 }
465
466 #[test]
467 fn test_concurrent_session_operations() {
468 use std::thread;
469
470 let manager = Arc::new(SessionManager::new());
471 let user1 = User::new("alice", "pass");
472 let user2 = User::new("bob", "pass");
473 let user3 = User::new("charlie", "pass");
474
475 let id1 = manager.create_session(&user1, IsolationLevel::ReadCommitted).unwrap();
477 let id2 = manager.create_session(&user2, IsolationLevel::RepeatableRead).unwrap();
478 let _id3 = manager.create_session(&user3, IsolationLevel::Serializable).unwrap();
479
480 let mut handles = vec![];
481
482 {
484 let manager_clone = Arc::clone(&manager);
485 handles.push(thread::spawn(move || {
486 for _ in 0..100 {
487 let _ = manager_clone.update_last_activity(id1);
488 }
489 }));
490 }
491
492 {
494 let manager_clone = Arc::clone(&manager);
495 handles.push(thread::spawn(move || {
496 for _ in 0..100 {
497 let _ = manager_clone.list_sessions();
498 }
499 }));
500 }
501
502 {
504 let manager_clone = Arc::clone(&manager);
505 handles.push(thread::spawn(move || {
506 for _ in 0..100 {
507 let _ = manager_clone.get_session(id2);
508 }
509 }));
510 }
511
512 {
514 let manager_clone = Arc::clone(&manager);
515 let user_id = user1.id;
516 handles.push(thread::spawn(move || {
517 for _ in 0..100 {
518 let _ = manager_clone.get_user_sessions(&user_id);
519 }
520 }));
521 }
522
523 for handle in handles {
525 handle.join().unwrap();
526 }
527
528 assert_eq!(manager.session_count(), 3);
530 }
531
532 #[test]
533 fn test_isolation_levels() {
534 let manager = SessionManager::new();
535 let user1 = User::new("alice", "pass");
536 let user2 = User::new("bob", "pass");
537 let user3 = User::new("charlie", "pass");
538
539 let id1 = manager.create_session(&user1, IsolationLevel::ReadCommitted).unwrap();
540 let id2 = manager.create_session(&user2, IsolationLevel::RepeatableRead).unwrap();
541 let id3 = manager.create_session(&user3, IsolationLevel::Serializable).unwrap();
542
543 let session1_lock = manager.get_session(id1).unwrap();
544 let session1 = session1_lock.read();
545 assert_eq!(session1.isolation_level, IsolationLevel::ReadCommitted);
546 drop(session1);
547
548 let session2_lock = manager.get_session(id2).unwrap();
549 let session2 = session2_lock.read();
550 assert_eq!(session2.isolation_level, IsolationLevel::RepeatableRead);
551 drop(session2);
552
553 let session3_lock = manager.get_session(id3).unwrap();
554 let session3 = session3_lock.read();
555 assert_eq!(session3.isolation_level, IsolationLevel::Serializable);
556 }
557}