use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::{Duration, Instant},
};
use hyphae::Gettable;
use marshal_entities::Session;
use myko::{core::item::Eventable, server::CellServerCtx, utils::downcast_item};
pub const STALE_AFTER: Duration = Duration::from_secs(10);
pub const TICK_INTERVAL: Duration = Duration::from_secs(3);
pub async fn run_sweeper(ctx: CellServerCtx) {
let mut disconnected_since: HashMap<Arc<str>, Instant> = HashMap::new();
let mut interval = tokio::time::interval(TICK_INTERVAL);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
interval.tick().await;
sweep_once(&ctx, &mut disconnected_since);
}
}
fn sweep_once(ctx: &CellServerCtx, disconnected_since: &mut HashMap<Arc<str>, Instant>) {
let Some(session_store) = ctx.registry.get(Session::ENTITY_NAME_STATIC) else {
return;
};
let Some(client_store) = ctx.registry.get("Client") else {
return;
};
let live_client_ids: HashSet<Arc<str>> = client_store
.entries()
.get()
.into_iter()
.map(|(id, _)| id)
.collect();
let now = Instant::now();
let mut to_delete: Vec<Arc<str>> = Vec::new();
let mut still_disconnected: HashSet<Arc<str>> = HashSet::new();
for (id, item) in session_store.entries().get() {
let Some(session) = downcast_item::<Session>(&item) else {
continue;
};
let bound_to_live_client = session
.client_id
.as_ref()
.map(|cid| live_client_ids.contains(&cid.0))
.unwrap_or(false);
if bound_to_live_client {
disconnected_since.remove(&id);
continue;
}
still_disconnected.insert(id.clone());
let first_seen = *disconnected_since.entry(id.clone()).or_insert(now);
if now.duration_since(first_seen) >= STALE_AFTER {
to_delete.push(id);
}
}
disconnected_since.retain(|id, _| still_disconnected.contains(id));
for id in to_delete {
log::info!(
"[cleanup] DELing session {} (disconnected ≥ {:?})",
id,
STALE_AFTER
);
if let Err(e) = ctx.del_by_id(Session::ENTITY_NAME_STATIC, &id) {
log::warn!("[cleanup] del session {} failed: {}", id, e);
continue;
}
disconnected_since.remove(&id);
}
}