daemon/cleanup.rs
1//! Periodic sweep that DELs abandoned sessions.
2//!
3//! Two session kinds, two liveness rules:
4//!
5//! - **WS shim sessions** (`client_id: Some`): live while that client id
6//! is in the live `Client` store. When the client drops, a `STALE_AFTER`
7//! grace covers reconnect blips, then the session is DEL'd. Tracking is
8//! in-memory (`disconnected_since`); daemon restart resets it, giving
9//! every reloaded session a fresh grace window.
10//!
11//! - **Pull/hook sessions** (`client_id: None`): an HTTP-MCP agent that
12//! registered via the `/hook/session-start` endpoint has no WS client at
13//! all — it would be swept instantly by the client-id rule. Its liveness
14//! is instead its `last_activity_at` (the hook bumps it every turn) plus
15//! a long `HOOK_BACKSTOP` grace. The clean teardown path is the explicit
16//! `/hook/session-end` DEL; this grace is only a backstop for a client
17//! that crashed without firing SessionEnd. Because `last_activity_at` is
18//! wall-clock and persisted, the backstop survives daemon restarts.
19
20use std::{
21 collections::{HashMap, HashSet},
22 sync::Arc,
23 time::{Duration, Instant},
24};
25
26use chrono::Utc;
27use hyphae::Gettable;
28use marshal_entities::Session;
29use myko::{core::item::Eventable, server::CellServerCtx, utils::downcast_item};
30
31/// How long a WS-shim session must be without a live client before DEL.
32pub const STALE_AFTER: Duration = Duration::from_secs(10);
33
34/// How long a pull/hook session (no WS client) may go without any hook
35/// activity before the backstop DELs it. Generous: merely-idle sessions
36/// re-register on their next turn, so this only needs to be short enough
37/// to eventually reclaim sessions whose client crashed without firing
38/// `/hook/session-end`. 60 min.
39pub const HOOK_BACKSTOP: Duration = Duration::from_secs(60 * 60);
40
41/// How often the sweeper wakes up to check for stale sessions. Anything
42/// roughly under `STALE_AFTER` is fine; the trade-off is reaction latency
43/// (lower) vs. wake-ups per minute (higher).
44pub const TICK_INTERVAL: Duration = Duration::from_secs(3);
45
46/// Run the sweeper forever. Spawn this on a tokio task and forget it.
47pub async fn run_sweeper(ctx: CellServerCtx) {
48 let mut disconnected_since: HashMap<Arc<str>, Instant> = HashMap::new();
49 let mut interval = tokio::time::interval(TICK_INTERVAL);
50 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
51
52 loop {
53 interval.tick().await;
54 sweep_once(&ctx, &mut disconnected_since);
55 }
56}
57
58fn sweep_once(ctx: &CellServerCtx, disconnected_since: &mut HashMap<Arc<str>, Instant>) {
59 let Some(session_store) = ctx.registry.get(Session::ENTITY_NAME_STATIC) else {
60 return;
61 };
62 let Some(client_store) = ctx.registry.get("Client") else {
63 // No Client store yet (server still warming up). Treat all sessions
64 // as "live" this tick — wait until we have the snapshot before
65 // making delete decisions.
66 return;
67 };
68
69 let live_client_ids: HashSet<Arc<str>> = client_store
70 .entries()
71 .get()
72 .into_iter()
73 .map(|(id, _)| id)
74 .collect();
75
76 let now = Instant::now();
77 let now_ms = Utc::now().timestamp_millis();
78 let backstop_ms = HOOK_BACKSTOP.as_millis() as i64;
79 let mut to_delete: Vec<Arc<str>> = Vec::new();
80 let mut still_disconnected: HashSet<Arc<str>> = HashSet::new();
81
82 for (id, item) in session_store.entries().get() {
83 let Some(session) = downcast_item::<Session>(&item) else {
84 continue;
85 };
86
87 match session.client_id.as_ref() {
88 // Pull/hook session: no WS client by design. Liveness is hook
89 // activity + the long backstop; the SessionEnd hook is the
90 // clean DEL path. Not subject to the WS reconnect grace.
91 None => {
92 let last = session.last_activity_at.unwrap_or(session.connected_at);
93 if now_ms.saturating_sub(last) >= backstop_ms {
94 to_delete.push(id);
95 }
96 }
97 // WS shim session bound to a live client: healthy.
98 Some(cid) if live_client_ids.contains(&cid.0) => {
99 disconnected_since.remove(&id);
100 }
101 // WS shim session whose client has gone: reconnect grace.
102 Some(_) => {
103 still_disconnected.insert(id.clone());
104 let first_seen = *disconnected_since.entry(id.clone()).or_insert(now);
105 if now.duration_since(first_seen) >= STALE_AFTER {
106 to_delete.push(id);
107 }
108 }
109 }
110 }
111
112 // Drop tracking for sessions that no longer exist in the store (e.g.
113 // someone DEL'd them out from under us).
114 disconnected_since.retain(|id, _| still_disconnected.contains(id));
115
116 for id in to_delete {
117 log::info!("[cleanup] DELing abandoned session {}", id);
118 if let Err(e) = ctx.del_by_id(Session::ENTITY_NAME_STATIC, &id) {
119 log::warn!("[cleanup] del session {} failed: {}", id, e);
120 continue;
121 }
122 disconnected_since.remove(&id);
123 }
124}