use std::sync::Arc;
use std::time::{Duration, Instant};
use axum::response::sse::Event;
use dashmap::DashMap;
use once_cell::sync::Lazy;
use tokio::sync::mpsc;
const SESSION_TTL: Duration = Duration::from_secs(5 * 60);
#[derive(Debug, Clone)]
pub struct Session {
pub sender: mpsc::UnboundedSender<Event>,
pub last_seen: Instant,
}
static SESSIONS: Lazy<Arc<DashMap<String, Session>>> = Lazy::new(|| Arc::new(DashMap::new()));
pub fn register(session_id: String) -> mpsc::UnboundedReceiver<Event> {
sweep_expired();
let (tx, rx) = mpsc::unbounded_channel();
SESSIONS.insert(
session_id,
Session { sender: tx, last_seen: Instant::now() },
);
rx
}
pub fn drop_session(session_id: &str) {
SESSIONS.remove(session_id);
}
pub fn sender_for(session_id: &str) -> Option<mpsc::UnboundedSender<Event>> {
let mut entry = SESSIONS.get_mut(session_id)?;
entry.last_seen = Instant::now();
Some(entry.sender.clone())
}
pub fn session_count() -> usize {
SESSIONS.len()
}
fn sweep_expired() {
let now = Instant::now();
SESSIONS.retain(|_, s| {
now.duration_since(s.last_seen) < SESSION_TTL
&& !s.sender.is_closed()
});
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn register_and_lookup() {
let id = format!("test-{}", uuid::Uuid::new_v4());
let _rx = register(id.clone());
assert!(sender_for(&id).is_some());
drop_session(&id);
assert!(sender_for(&id).is_none());
}
#[tokio::test]
async fn closed_receiver_is_swept() {
let id = format!("test-{}", uuid::Uuid::new_v4());
{
let _rx = register(id.clone());
assert!(sender_for(&id).is_some());
}
let _other = register(format!("test-other-{}", uuid::Uuid::new_v4()));
assert!(sender_for(&id).is_none());
}
}