use crate::{
ApplicationResult,
domain::{DomainError, aggregates::stream_session::StreamSession, value_objects::SessionId},
};
use dashmap::DashMap;
use std::{sync::Arc, time::Duration};
use tokio::{
sync::RwLock,
time::{Instant as TokioInstant, interval},
};
#[derive(Debug, Clone)]
pub struct SessionManagerConfig {
pub cleanup_interval_seconds: u64,
pub max_sessions: usize,
pub default_timeout_seconds: u64,
pub grace_period_seconds: u64,
}
impl Default for SessionManagerConfig {
fn default() -> Self {
Self {
cleanup_interval_seconds: 60, max_sessions: 10_000, default_timeout_seconds: 3600, grace_period_seconds: 300, }
}
}
#[derive(Debug, Clone, Default)]
pub struct SessionManagerStats {
pub active_sessions: usize,
pub timeout_cleanups: u64,
pub graceful_cleanups: u64,
pub last_cleanup_at: Option<TokioInstant>,
pub average_session_duration: f64,
}
pub struct SessionManager {
sessions: Arc<DashMap<SessionId, Arc<RwLock<StreamSession>>>>,
config: SessionManagerConfig,
stats: Arc<RwLock<SessionManagerStats>>,
cleanup_handle: Option<tokio::task::JoinHandle<()>>,
}
impl Default for SessionManager {
fn default() -> Self {
Self::with_config(SessionManagerConfig::default())
}
}
impl SessionManager {
pub fn new() -> Self {
Self::default()
}
pub fn with_config(config: SessionManagerConfig) -> Self {
Self {
sessions: Arc::new(DashMap::new()),
config,
stats: Arc::new(RwLock::new(SessionManagerStats::default())),
cleanup_handle: None,
}
}
pub async fn start(&mut self) -> ApplicationResult<()> {
if self.cleanup_handle.is_some() {
return Err(
DomainError::InternalError("Session manager already started".to_string()).into(),
);
}
let sessions = Arc::clone(&self.sessions);
let stats = Arc::clone(&self.stats);
let config = self.config.clone();
let handle = tokio::spawn(async move {
Self::cleanup_task(sessions, stats, config).await;
});
self.cleanup_handle = Some(handle);
Ok(())
}
pub async fn stop(&mut self) -> ApplicationResult<()> {
if let Some(handle) = self.cleanup_handle.take() {
handle.abort();
let _ = handle.await;
}
Ok(())
}
pub async fn add_session(&self, session: StreamSession) -> ApplicationResult<SessionId> {
let session_id = session.id();
if self.sessions.len() >= self.config.max_sessions {
return Err(DomainError::ResourceExhausted(format!(
"Maximum sessions limit reached: {}",
self.config.max_sessions
))
.into());
}
self.sessions
.insert(session_id, Arc::new(RwLock::new(session)));
let mut stats = self.stats.write().await;
stats.active_sessions = self.sessions.len();
Ok(session_id)
}
pub async fn get_session(&self, session_id: &SessionId) -> Option<Arc<RwLock<StreamSession>>> {
self.sessions
.get(session_id)
.map(|entry| Arc::clone(entry.value()))
}
pub async fn remove_session(&self, session_id: &SessionId) -> ApplicationResult<bool> {
let removed = self.sessions.remove(session_id).is_some();
if removed {
let mut stats = self.stats.write().await;
stats.active_sessions = self.sessions.len();
stats.graceful_cleanups += 1;
}
Ok(removed)
}
pub async fn cleanup_expired_sessions(&self) -> ApplicationResult<CleanupReport> {
let mut report = CleanupReport::default();
let mut sessions_to_remove = Vec::new();
for entry in self.sessions.iter() {
let session_id = *entry.key();
let session_arc = entry.value();
let mut session = session_arc.write().await;
if session.is_expired() {
match session.force_close_expired() {
Ok(was_closed) => {
if was_closed {
sessions_to_remove.push(session_id);
report.timeout_cleanups += 1;
}
}
Err(e) => {
report
.errors
.push(format!("Failed to close session {}: {}", session_id, e));
}
}
}
}
for session_id in sessions_to_remove {
self.sessions.remove(&session_id);
report.sessions_removed += 1;
}
let mut stats = self.stats.write().await;
stats.active_sessions = self.sessions.len();
stats.timeout_cleanups += report.timeout_cleanups;
stats.last_cleanup_at = Some(TokioInstant::now());
Ok(report)
}
pub async fn stats(&self) -> SessionManagerStats {
self.stats.read().await.clone()
}
async fn cleanup_task(
sessions: Arc<DashMap<SessionId, Arc<RwLock<StreamSession>>>>,
stats: Arc<RwLock<SessionManagerStats>>,
config: SessionManagerConfig,
) {
let mut interval = interval(Duration::from_secs(config.cleanup_interval_seconds));
loop {
interval.tick().await;
let mut cleanup_count = 0;
let mut sessions_to_remove = Vec::new();
for entry in sessions.iter() {
let session_id = *entry.key();
let session_arc = entry.value();
let mut session = session_arc.write().await;
if session.is_expired() {
match session.force_close_expired() {
Ok(was_closed) => {
if was_closed {
sessions_to_remove.push(session_id);
cleanup_count += 1;
}
}
Err(_) => {
sessions_to_remove.push(session_id);
}
}
}
}
for session_id in sessions_to_remove {
sessions.remove(&session_id);
}
if cleanup_count > 0 {
let mut stats_guard = stats.write().await;
stats_guard.active_sessions = sessions.len();
stats_guard.timeout_cleanups += cleanup_count;
stats_guard.last_cleanup_at = Some(TokioInstant::now());
}
}
}
}
impl Drop for SessionManager {
fn drop(&mut self) {
if let Some(handle) = self.cleanup_handle.take() {
handle.abort();
}
}
}
#[derive(Debug, Clone, Default)]
pub struct CleanupReport {
pub sessions_removed: usize,
pub timeout_cleanups: u64,
pub errors: Vec<String>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::domain::aggregates::stream_session::SessionConfig;
use tokio::time::{Duration, sleep};
#[tokio::test]
async fn test_session_manager_creation() {
let manager = SessionManager::new();
let stats = manager.stats().await;
assert_eq!(stats.active_sessions, 0);
assert_eq!(stats.timeout_cleanups, 0);
}
#[tokio::test]
async fn test_add_and_remove_session() {
let manager = SessionManager::new();
let session = StreamSession::new(SessionConfig::default());
let session_id = session.id();
let added_id = manager.add_session(session).await.unwrap();
assert_eq!(added_id, session_id);
let stats = manager.stats().await;
assert_eq!(stats.active_sessions, 1);
let retrieved = manager.get_session(&session_id).await;
assert!(retrieved.is_some());
let removed = manager.remove_session(&session_id).await.unwrap();
assert!(removed);
let stats = manager.stats().await;
assert_eq!(stats.active_sessions, 0);
}
#[tokio::test]
async fn test_cleanup_expired_sessions() {
let manager = SessionManager::new();
let session_config = SessionConfig {
session_timeout_seconds: 1, ..SessionConfig::default()
};
let session = StreamSession::new(session_config);
let _session_id = session.id();
manager.add_session(session).await.unwrap();
sleep(Duration::from_secs(2)).await;
let report = manager.cleanup_expired_sessions().await.unwrap();
assert_eq!(report.sessions_removed, 1);
assert_eq!(report.timeout_cleanups, 1);
assert!(report.errors.is_empty());
let stats = manager.stats().await;
assert_eq!(stats.active_sessions, 0);
}
#[tokio::test]
async fn test_session_manager_automatic_cleanup() {
let config = SessionManagerConfig {
cleanup_interval_seconds: 1, ..Default::default()
};
let mut manager = SessionManager::with_config(config);
manager.start().await.unwrap();
let session_config = SessionConfig {
session_timeout_seconds: 1,
..SessionConfig::default()
};
let session = StreamSession::new(session_config);
manager.add_session(session).await.unwrap();
sleep(Duration::from_secs(3)).await;
let stats = manager.stats().await;
assert_eq!(stats.active_sessions, 0);
assert!(stats.timeout_cleanups > 0);
manager.stop().await.unwrap();
}
#[tokio::test]
async fn test_session_capacity_limit() {
let config = SessionManagerConfig {
max_sessions: 2,
..Default::default()
};
let manager = SessionManager::with_config(config);
for _ in 0..2 {
let session = StreamSession::new(SessionConfig::default());
manager.add_session(session).await.unwrap();
}
let session = StreamSession::new(SessionConfig::default());
let result = manager.add_session(session).await;
assert!(result.is_err());
}
}