use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::{Duration, Instant},
};
use chrono::Utc;
use hyphae::Gettable;
use marshal_entities::Session;
use myko::{core::item::Eventable, server::CellServerCtx, utils::downcast_item};
pub const STALE_AFTER: Duration = Duration::from_secs(10);
pub const HOOK_BACKSTOP: Duration = Duration::from_secs(60 * 60);
pub const TICK_INTERVAL: Duration = Duration::from_secs(3);
pub async fn run_sweeper(ctx: CellServerCtx) {
let mut disconnected_since: HashMap<Arc<str>, Instant> = HashMap::new();
let mut interval = tokio::time::interval(TICK_INTERVAL);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
interval.tick().await;
sweep_once(&ctx, &mut disconnected_since);
}
}
fn sweep_once(ctx: &CellServerCtx, disconnected_since: &mut HashMap<Arc<str>, Instant>) {
let Some(session_store) = ctx.registry.get(Session::ENTITY_NAME_STATIC) else {
return;
};
let Some(client_store) = ctx.registry.get("Client") else {
return;
};
let live_client_ids: HashSet<Arc<str>> = client_store
.entries()
.get()
.into_iter()
.map(|(id, _)| id)
.collect();
let now = Instant::now();
let now_ms = Utc::now().timestamp_millis();
let backstop_ms = HOOK_BACKSTOP.as_millis() as i64;
let mut to_delete: Vec<Arc<str>> = Vec::new();
let mut still_disconnected: HashSet<Arc<str>> = HashSet::new();
for (id, item) in session_store.entries().get() {
let Some(session) = downcast_item::<Session>(&item) else {
continue;
};
match session.client_id.as_ref() {
None => {
let last = session.last_activity_at.unwrap_or(session.connected_at);
if now_ms.saturating_sub(last) >= backstop_ms {
to_delete.push(id);
}
}
Some(cid) if live_client_ids.contains(&cid.0) => {
disconnected_since.remove(&id);
}
Some(_) => {
still_disconnected.insert(id.clone());
let first_seen = *disconnected_since.entry(id.clone()).or_insert(now);
if now.duration_since(first_seen) >= STALE_AFTER {
to_delete.push(id);
}
}
}
}
disconnected_since.retain(|id, _| still_disconnected.contains(id));
for id in to_delete {
log::info!("[cleanup] DELing abandoned session {}", id);
if let Err(e) = ctx.del_by_id(Session::ENTITY_NAME_STATIC, &id) {
log::warn!("[cleanup] del session {} failed: {}", id, e);
continue;
}
disconnected_since.remove(&id);
}
}