use crate::catalog::manager::CatalogManager;
use crate::session::models::{SessionPermissionCache, UserSession};
use crate::storage::StorageManager;
use crate::txn::TransactionManager;
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
thread_local! {
pub(crate) static SESSION_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create runtime for session operations");
}
pub struct SessionManager {
sessions: RwLock<HashMap<String, Arc<RwLock<UserSession>>>>,
transaction_manager: Arc<TransactionManager>,
storage_manager: Arc<StorageManager>,
catalog_manager: Arc<RwLock<CatalogManager>>,
}
impl SessionManager {
pub fn new(
transaction_manager: Arc<TransactionManager>,
storage_manager: Arc<StorageManager>,
catalog_manager: Arc<RwLock<CatalogManager>>,
) -> Self {
Self {
sessions: RwLock::new(HashMap::new()),
transaction_manager,
storage_manager,
catalog_manager,
}
}
pub fn get_storage_manager(&self) -> Arc<StorageManager> {
self.storage_manager.clone()
}
pub fn get_catalog_manager(&self) -> Arc<RwLock<CatalogManager>> {
self.catalog_manager.clone()
}
pub fn get_transaction_manager(&self) -> Arc<TransactionManager> {
self.transaction_manager.clone()
}
pub fn create_session(
&self,
username: String,
roles: Vec<String>,
permissions: SessionPermissionCache,
) -> Result<String, String> {
let user_session = UserSession::new(
username,
roles,
permissions,
self.transaction_manager.clone(),
);
let session_id = user_session.session_id.clone();
let session_arc = Arc::new(RwLock::new(user_session));
{
let mut sessions = self
.sessions
.write()
.map_err(|_| "Failed to acquire sessions write lock")?;
sessions.insert(session_id.clone(), session_arc);
}
Ok(session_id)
}
pub fn get_session(&self, session_id: &str) -> Option<Arc<RwLock<UserSession>>> {
let sessions = self.sessions.read().ok()?;
sessions.get(session_id).cloned()
}
pub fn remove_session(&self, session_id: &str) -> Result<(), String> {
let mut sessions = self
.sessions
.write()
.map_err(|_| "Failed to acquire sessions write lock")?;
if let Some(session_arc) = sessions.remove(session_id) {
if let Ok(mut session) = session_arc.write() {
session.deactivate();
}
}
if let Ok(catalog_manager) = self.catalog_manager.write() {
let persist_result =
SESSION_RUNTIME.with(|rt| rt.block_on(catalog_manager.persist_all()));
if let Err(e) = persist_result {
log::warn!("Failed to persist catalogs during session removal: {}", e);
}
}
Ok(())
}
pub fn get_active_session_ids(&self) -> Vec<String> {
let sessions = match self.sessions.read() {
Ok(sessions) => sessions,
Err(_) => return Vec::new(), };
sessions.keys().cloned().collect()
}
pub fn cleanup_expired_sessions(&self) -> Result<usize, String> {
let expired_ids: Vec<String> = {
let sessions = self
.sessions
.read()
.map_err(|_| "Failed to acquire sessions read lock")?;
sessions
.iter()
.filter_map(|(id, session_arc)| {
if let Ok(session) = session_arc.read() {
if session.is_expired() {
Some(id.clone())
} else {
None
}
} else {
None
}
})
.collect()
};
let count = expired_ids.len();
for session_id in expired_ids {
self.remove_session(&session_id)?;
}
Ok(count)
}
pub fn session_count(&self) -> usize {
self.sessions
.read()
.map(|sessions| sessions.len())
.unwrap_or(0)
}
pub fn create_anonymous_session(&self) -> Result<String, String> {
let permissions = SessionPermissionCache::new();
self.create_session(
"anonymous".to_string(),
vec!["user".to_string()],
permissions,
)
}
pub fn invalidate_sessions_for_graph(&self, graph_name: &str) -> usize {
let sessions_guard = match self.sessions.read() {
Ok(guard) => guard,
Err(_) => {
log::error!("Failed to acquire sessions read lock for graph invalidation");
return 0;
}
};
let mut invalidated_count = 0;
for (session_id, session_arc) in sessions_guard.iter() {
let mut session = match session_arc.write() {
Ok(guard) => guard,
Err(_) => {
log::warn!("Failed to acquire write lock for session {}", session_id);
continue;
}
};
if let Some(current_graph) = &session.current_graph {
let current_graph_clone = current_graph.clone();
let matches = current_graph == graph_name
|| current_graph == &format!("/{}", graph_name)
|| current_graph.strip_prefix('/') == Some(graph_name);
if matches {
session.current_graph = None;
log::info!(
"Invalidated session {} using dropped graph '{}' (was: '{}')",
session_id,
graph_name,
current_graph_clone
);
invalidated_count += 1;
} else {
log::debug!(
"Session {} using different graph '{}', not invalidating for '{}'",
session_id,
current_graph_clone,
graph_name
);
}
}
}
if invalidated_count > 0 {
log::info!(
"Successfully invalidated {} sessions using dropped graph '{}'",
invalidated_count,
graph_name
);
}
invalidated_count
}
pub fn shutdown(&self) -> Result<(), String> {
log::info!("SessionManager shutting down gracefully...");
if let Ok(catalog_manager) = self.catalog_manager.write() {
let persist_result =
SESSION_RUNTIME.with(|rt| rt.block_on(catalog_manager.persist_all()));
if let Err(e) = persist_result {
log::error!("Failed to persist catalogs during shutdown: {}", e);
return Err(format!("Failed to persist catalogs during shutdown: {}", e));
}
log::info!("Successfully persisted all catalogs during shutdown");
}
let session_ids: Vec<String> = {
if let Ok(sessions) = self.sessions.read() {
sessions.keys().cloned().collect()
} else {
Vec::new()
}
};
for session_id in session_ids {
if let Ok(mut sessions) = self.sessions.write() {
if let Some(session_arc) = sessions.remove(&session_id) {
if let Ok(mut session) = session_arc.write() {
session.deactivate();
}
}
}
}
if let Err(e) = self.storage_manager.shutdown() {
log::error!("Failed to shutdown storage manager: {}", e);
return Err(format!("Failed to shutdown storage manager: {}", e));
}
log::info!("SessionManager shutdown completed");
Ok(())
}
}
pub static SESSION_MANAGER: Lazy<RwLock<Option<Arc<SessionManager>>>> =
Lazy::new(|| RwLock::new(None));
pub fn get_session_manager() -> Option<Arc<SessionManager>> {
SESSION_MANAGER.read().ok()?.clone()
}
#[allow(dead_code)] pub fn clear_session_manager() -> Result<(), String> {
if let Ok(mut global_manager) = SESSION_MANAGER.write() {
*global_manager = None;
Ok(())
} else {
Err("Failed to acquire write lock on session manager".to_string())
}
}
pub fn set_session_manager(manager: Arc<SessionManager>) -> Result<(), String> {
if let Ok(mut global_manager) = SESSION_MANAGER.write() {
*global_manager = Some(manager);
log::info!("✅ Global session manager initialized");
Ok(())
} else {
Err("Failed to acquire write lock on session manager".to_string())
}
}
pub fn get_session(session_id: &str) -> Option<Arc<RwLock<UserSession>>> {
get_session_manager()?.get_session(session_id)
}
#[allow(dead_code)] pub fn create_session(
username: String,
roles: Vec<String>,
permissions: SessionPermissionCache,
) -> Result<String, String> {
get_session_manager()
.ok_or("Session manager not initialized")?
.create_session(username, roles, permissions)
}
#[allow(dead_code)] pub fn create_anonymous_session() -> Result<String, String> {
get_session_manager()
.ok_or("Session manager not initialized")?
.create_anonymous_session()
}
#[allow(dead_code)] pub fn shutdown_session_manager() -> Result<(), String> {
if let Some(session_manager) = get_session_manager() {
session_manager.shutdown()
} else {
log::warn!("No session manager to shutdown");
Ok(())
}
}