1use std::{
17 collections::{HashMap, HashSet},
18 sync::Arc,
19 time::{Duration, Instant},
20};
21
22use hyphae::Gettable;
23use marshal_entities::Session;
24use myko::{core::item::Eventable, server::CellServerCtx, utils::downcast_item};
25
26pub const STALE_AFTER: Duration = Duration::from_secs(10);
28
29pub const TICK_INTERVAL: Duration = Duration::from_secs(3);
33
34pub async fn run_sweeper(ctx: CellServerCtx) {
36 let mut disconnected_since: HashMap<Arc<str>, Instant> = HashMap::new();
37 let mut interval = tokio::time::interval(TICK_INTERVAL);
38 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
39
40 loop {
41 interval.tick().await;
42 sweep_once(&ctx, &mut disconnected_since);
43 }
44}
45
46fn sweep_once(ctx: &CellServerCtx, disconnected_since: &mut HashMap<Arc<str>, Instant>) {
47 let Some(session_store) = ctx.registry.get(Session::ENTITY_NAME_STATIC) else {
48 return;
49 };
50 let Some(client_store) = ctx.registry.get("Client") else {
51 return;
55 };
56
57 let live_client_ids: HashSet<Arc<str>> = client_store
58 .entries()
59 .get()
60 .into_iter()
61 .map(|(id, _)| id)
62 .collect();
63
64 let now = Instant::now();
65 let mut to_delete: Vec<Arc<str>> = Vec::new();
66 let mut still_disconnected: HashSet<Arc<str>> = HashSet::new();
67
68 for (id, item) in session_store.entries().get() {
69 let Some(session) = downcast_item::<Session>(&item) else {
70 continue;
71 };
72 let bound_to_live_client = session
73 .client_id
74 .as_ref()
75 .map(|cid| live_client_ids.contains(&cid.0))
76 .unwrap_or(false);
77
78 if bound_to_live_client {
79 disconnected_since.remove(&id);
80 continue;
81 }
82
83 still_disconnected.insert(id.clone());
84 let first_seen = *disconnected_since.entry(id.clone()).or_insert(now);
85 if now.duration_since(first_seen) >= STALE_AFTER {
86 to_delete.push(id);
87 }
88 }
89
90 disconnected_since.retain(|id, _| still_disconnected.contains(id));
93
94 for id in to_delete {
95 log::info!(
96 "[cleanup] DELing session {} (disconnected ≥ {:?})",
97 id,
98 STALE_AFTER
99 );
100 if let Err(e) = ctx.del_by_id(Session::ENTITY_NAME_STATIC, &id) {
101 log::warn!("[cleanup] del session {} failed: {}", id, e);
102 continue;
103 }
104 disconnected_since.remove(&id);
105 }
106}