Skip to main content

daemon/
cleanup.rs

1//! Periodic sweep that DELs abandoned sessions.
2//!
3//! Two session kinds, two liveness rules:
4//!
5//! - **WS shim sessions** (`client_id: Some`): live while that client id
6//!   is in the live `Client` store. When the client drops, a `STALE_AFTER`
7//!   grace covers reconnect blips, then the session is DEL'd. Tracking is
8//!   in-memory (`disconnected_since`); daemon restart resets it, giving
9//!   every reloaded session a fresh grace window.
10//!
11//! - **Pull/hook sessions** (`client_id: None`): an HTTP-MCP agent that
12//!   registered via the `/hook/session-start` endpoint has no WS client at
13//!   all — it would be swept instantly by the client-id rule. Its liveness
14//!   is instead its `last_activity_at` (the hook bumps it every turn) plus
15//!   a long `HOOK_BACKSTOP` grace. The clean teardown path is the explicit
16//!   `/hook/session-end` DEL; this grace is only a backstop for a client
17//!   that crashed without firing SessionEnd. Because `last_activity_at` is
18//!   wall-clock and persisted, the backstop survives daemon restarts.
19
20use std::{
21    collections::{HashMap, HashSet},
22    sync::Arc,
23    time::{Duration, Instant},
24};
25
26use chrono::Utc;
27use hyphae::Gettable;
28use marshal_entities::Session;
29use myko::{core::item::Eventable, server::CellServerCtx, utils::downcast_item};
30
31/// How long a WS-shim session must be without a live client before DEL.
32pub const STALE_AFTER: Duration = Duration::from_secs(10);
33
34/// How long a pull/hook session (no WS client) may go without any hook
35/// activity before the backstop DELs it. Generous: merely-idle sessions
36/// re-register on their next turn, so this only needs to be short enough
37/// to eventually reclaim sessions whose client crashed without firing
38/// `/hook/session-end`. 60 min.
39pub const HOOK_BACKSTOP: Duration = Duration::from_secs(60 * 60);
40
41/// How often the sweeper wakes up to check for stale sessions. Anything
42/// roughly under `STALE_AFTER` is fine; the trade-off is reaction latency
43/// (lower) vs. wake-ups per minute (higher).
44pub const TICK_INTERVAL: Duration = Duration::from_secs(3);
45
46/// Run the sweeper forever. Spawn this on a tokio task and forget it.
47pub async fn run_sweeper(ctx: CellServerCtx) {
48    let mut disconnected_since: HashMap<Arc<str>, Instant> = HashMap::new();
49    let mut interval = tokio::time::interval(TICK_INTERVAL);
50    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
51
52    loop {
53        interval.tick().await;
54        sweep_once(&ctx, &mut disconnected_since);
55    }
56}
57
58fn sweep_once(ctx: &CellServerCtx, disconnected_since: &mut HashMap<Arc<str>, Instant>) {
59    let Some(session_store) = ctx.registry.get(Session::ENTITY_NAME_STATIC) else {
60        return;
61    };
62    let Some(client_store) = ctx.registry.get("Client") else {
63        // No Client store yet (server still warming up). Treat all sessions
64        // as "live" this tick — wait until we have the snapshot before
65        // making delete decisions.
66        return;
67    };
68
69    let live_client_ids: HashSet<Arc<str>> = client_store
70        .entries()
71        .get()
72        .into_iter()
73        .map(|(id, _)| id)
74        .collect();
75
76    let now = Instant::now();
77    let now_ms = Utc::now().timestamp_millis();
78    let backstop_ms = HOOK_BACKSTOP.as_millis() as i64;
79    let mut to_delete: Vec<Arc<str>> = Vec::new();
80    let mut still_disconnected: HashSet<Arc<str>> = HashSet::new();
81
82    for (id, item) in session_store.entries().get() {
83        let Some(session) = downcast_item::<Session>(&item) else {
84            continue;
85        };
86
87        match session.client_id.as_ref() {
88            // Pull/hook session: no WS client by design. Liveness is hook
89            // activity + the long backstop; the SessionEnd hook is the
90            // clean DEL path. Not subject to the WS reconnect grace.
91            None => {
92                let last = session.last_activity_at.unwrap_or(session.connected_at);
93                if now_ms.saturating_sub(last) >= backstop_ms {
94                    to_delete.push(id);
95                }
96            }
97            // WS shim session bound to a live client: healthy.
98            Some(cid) if live_client_ids.contains(&cid.0) => {
99                disconnected_since.remove(&id);
100            }
101            // WS shim session whose client has gone: reconnect grace.
102            Some(_) => {
103                still_disconnected.insert(id.clone());
104                let first_seen = *disconnected_since.entry(id.clone()).or_insert(now);
105                if now.duration_since(first_seen) >= STALE_AFTER {
106                    to_delete.push(id);
107                }
108            }
109        }
110    }
111
112    // Drop tracking for sessions that no longer exist in the store (e.g.
113    // someone DEL'd them out from under us).
114    disconnected_since.retain(|id, _| still_disconnected.contains(id));
115
116    for id in to_delete {
117        log::info!("[cleanup] DELing abandoned session {}", id);
118        if let Err(e) = ctx.del_by_id(Session::ENTITY_NAME_STATIC, &id) {
119            log::warn!("[cleanup] del session {} failed: {}", id, e);
120            continue;
121        }
122        disconnected_since.remove(&id);
123    }
124}