Skip to main content

daemon/
cleanup.rs

1//! Periodic sweep that DELs sessions whose bound client has been gone for
2//! more than `STALE_AFTER`.
3//!
4//! Sessions intentionally outlive their clients (Session no longer has a
5//! `belongs_to(Client)` cascade) so the roster can show a brief
6//! "disconnected" indicator across reconnect blips. Without this sweeper
7//! that grace period would last forever — abandoned sessions would
8//! accumulate in the registry and the on-disk log.
9//!
10//! Tracking is in-memory: a `disconnected_since` map records the first
11//! tick on which each session was observed disconnected. Daemon restart
12//! resets it, which is fine — every session loaded from disk gets a fresh
13//! 10-second grace window before it can be swept, giving its shim time to
14//! reconnect.
15
16use std::{
17    collections::{HashMap, HashSet},
18    sync::Arc,
19    time::{Duration, Instant},
20};
21
22use hyphae::Gettable;
23use marshal_entities::Session;
24use myko::{core::item::Eventable, server::CellServerCtx, utils::downcast_item};
25
26/// How long a session must be without a live client before it is DEL'd.
27pub const STALE_AFTER: Duration = Duration::from_secs(10);
28
29/// How often the sweeper wakes up to check for stale sessions. Anything
30/// roughly under `STALE_AFTER` is fine; the trade-off is reaction latency
31/// (lower) vs. wake-ups per minute (higher).
32pub const TICK_INTERVAL: Duration = Duration::from_secs(3);
33
34/// Run the sweeper forever. Spawn this on a tokio task and forget it.
35pub async fn run_sweeper(ctx: CellServerCtx) {
36    let mut disconnected_since: HashMap<Arc<str>, Instant> = HashMap::new();
37    let mut interval = tokio::time::interval(TICK_INTERVAL);
38    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
39
40    loop {
41        interval.tick().await;
42        sweep_once(&ctx, &mut disconnected_since);
43    }
44}
45
46fn sweep_once(ctx: &CellServerCtx, disconnected_since: &mut HashMap<Arc<str>, Instant>) {
47    let Some(session_store) = ctx.registry.get(Session::ENTITY_NAME_STATIC) else {
48        return;
49    };
50    let Some(client_store) = ctx.registry.get("Client") else {
51        // No Client store yet (server still warming up). Treat all sessions
52        // as "live" this tick — wait until we have the snapshot before
53        // making delete decisions.
54        return;
55    };
56
57    let live_client_ids: HashSet<Arc<str>> = client_store
58        .entries()
59        .get()
60        .into_iter()
61        .map(|(id, _)| id)
62        .collect();
63
64    let now = Instant::now();
65    let mut to_delete: Vec<Arc<str>> = Vec::new();
66    let mut still_disconnected: HashSet<Arc<str>> = HashSet::new();
67
68    for (id, item) in session_store.entries().get() {
69        let Some(session) = downcast_item::<Session>(&item) else {
70            continue;
71        };
72        let bound_to_live_client = session
73            .client_id
74            .as_ref()
75            .map(|cid| live_client_ids.contains(&cid.0))
76            .unwrap_or(false);
77
78        if bound_to_live_client {
79            disconnected_since.remove(&id);
80            continue;
81        }
82
83        still_disconnected.insert(id.clone());
84        let first_seen = *disconnected_since.entry(id.clone()).or_insert(now);
85        if now.duration_since(first_seen) >= STALE_AFTER {
86            to_delete.push(id);
87        }
88    }
89
90    // Drop tracking for sessions that no longer exist in the store (e.g.
91    // someone DEL'd them out from under us).
92    disconnected_since.retain(|id, _| still_disconnected.contains(id));
93
94    for id in to_delete {
95        log::info!(
96            "[cleanup] DELing session {} (disconnected ≥ {:?})",
97            id,
98            STALE_AFTER
99        );
100        if let Err(e) = ctx.del_by_id(Session::ENTITY_NAME_STATIC, &id) {
101            log::warn!("[cleanup] del session {} failed: {}", id, e);
102            continue;
103        }
104        disconnected_since.remove(&id);
105    }
106}