Skip to main content

cellos_host_telemetry/
keepalive.rs

1//! Keep-alive watcher and `agent_silenced` detection (ADR-0006 §7).
2//!
3//! Doctrine: silence is observable. When the in-guest agent has not delivered
4//! a frame within the keep-alive window, the supervisor emits
5//! `cell.observability.guest.agent_silenced` typed as `DeclaredAuthority`
6//! *absence* — the absence is itself an Observed event from the host's
7//! perspective.
8//!
9//! This module owns:
10//!
11//! - [`KeepAlive`] — a small async-friendly tracker the listener pokes on
12//!   every successful frame receive. The receiver-side stays simple: read a
13//!   frame, call `notify_frame()`, repeat. The keep-alive window enforcement
14//!   runs as a separate async task driven by [`watch_for_silence`].
15//!
16//! - [`AgentSilencedTrigger`] — the single fire-once trigger that returns
17//!   the `cell.observability.guest.agent_silenced` value. F1b owns the
18//!   additive CloudEvent constructor; until that lands, this module produces
19//!   the [`AgentSilencedSignal`] internal value type and **TODO(F1b)** wires
20//!   it through to `cellos_core::events`.
21
22use std::sync::Arc;
23use std::time::Duration;
24
25use tokio::sync::Mutex;
26use tokio::time::Instant;
27
28/// Default keep-alive window (10s).
29///
30/// Phase F3b ships with this value as a sensible default for the per-cell
31/// listener; F4 will surface it as a portable-profile knob alongside
32/// `onAgentFailure`. The value is intentionally generous — agent-side
33/// drop-with-counter back-pressure (ADR-0006 §5.3) means a frame should
34/// always make it through within seconds of the workload doing anything
35/// observable.
36pub const DEFAULT_KEEPALIVE_WINDOW: Duration = Duration::from_secs(10);
37
38/// Tracker the listener pokes on every successful frame receive. Cheap to
39/// clone (`Arc<Mutex<...>>`); listener and watcher hold one each.
40#[derive(Debug, Clone)]
41pub struct KeepAlive {
42    inner: Arc<Mutex<KeepAliveInner>>,
43}
44
45#[derive(Debug)]
46struct KeepAliveInner {
47    last_frame_at: Instant,
48    window: Duration,
49}
50
51impl KeepAlive {
52    /// Construct with the given window. The first frame is "now" — i.e.
53    /// the watcher does not fire on a cell that just booted.
54    pub fn new(window: Duration) -> Self {
55        Self {
56            inner: Arc::new(Mutex::new(KeepAliveInner {
57                last_frame_at: Instant::now(),
58                window,
59            })),
60        }
61    }
62
63    /// Window the watcher uses.
64    pub async fn window(&self) -> Duration {
65        self.inner.lock().await.window
66    }
67
68    /// Reset on every successful frame receive.
69    pub async fn notify_frame(&self) {
70        let mut g = self.inner.lock().await;
71        g.last_frame_at = Instant::now();
72    }
73
74    /// True iff the elapsed time since the last frame exceeds the window.
75    pub async fn is_silenced(&self) -> bool {
76        let g = self.inner.lock().await;
77        g.last_frame_at.elapsed() >= g.window
78    }
79}
80
81/// Internal value type produced when a cell's agent is detected silenced.
82/// F4b signs it; F1b will project it to a `CloudEventV1` via the additive
83/// constructor.
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub struct AgentSilencedSignal {
86    /// Per-cell identifier (host-stamped).
87    pub cell_id: String,
88    /// Per-run identifier (host-stamped).
89    pub run_id: String,
90    /// Window the watcher was configured with, in milliseconds.
91    pub keepalive_window_ms: u64,
92    /// Milliseconds elapsed since the last frame at fire time. Always
93    /// >= `keepalive_window_ms` by construction.
94    pub elapsed_ms: u64,
95}
96
97impl AgentSilencedSignal {
98    /// CloudEvent type URN this signal will project to (F1b).
99    ///
100    /// TODO(F1b): once the additive event constructor lands in
101    /// `cellos_core::events`, replace `to_cloudevent_stub()` with a call to
102    /// `events::observability_guest_agent_silenced_data_v1(...)`. The URN
103    /// here is locked so admission-side validators can pre-register the
104    /// schema name without waiting on the F1b commit.
105    pub const CLOUDEVENT_TYPE: &'static str =
106        "dev.cellos.events.cell.observability.v1.guest.agent_silenced";
107
108    /// Stub projection until F1b lands the additive builder.
109    ///
110    /// Returns a `serde_json::Value` shaped like the eventual schema so
111    /// downstream wiring can be tested end-to-end against the JSON shape.
112    /// Once F1b ships, this becomes a thin wrapper over the F1b builder
113    /// (or is deleted entirely in favour of direct `CloudEventV1`
114    /// construction in the supervisor loop).
115    pub fn to_cloudevent_stub(&self) -> serde_json::Value {
116        // TODO(F1b): replace with
117        // `cellos_core::events::observability_guest_agent_silenced_data_v1(...)`.
118        serde_json::json!({
119            "cellId": self.cell_id,
120            "runId": self.run_id,
121            "keepaliveWindowMs": self.keepalive_window_ms,
122            "elapsedMs": self.elapsed_ms,
123            "epistemicStatus": "DECLARED",
124            "ruleClass": "GUEST_AGENT_DECLARATION",
125            "_todo": "F1b: wire to cellos_core::events::observability_guest_agent_silenced_data_v1",
126        })
127    }
128}
129
130/// One-shot trigger for the watcher. The listener loop holds the trigger;
131/// the watcher fires it exactly once when silence is detected.
132///
133/// Fire-once is structural: F4b should not emit two `agent_silenced`
134/// events for the same run, even under spurious wakeups. The fire-once
135/// invariant is maintained by [`AgentSilencedTrigger::fire`] returning
136/// `None` on the second call.
137#[derive(Debug)]
138pub struct AgentSilencedTrigger {
139    fired: tokio::sync::Mutex<bool>,
140    cell_id: String,
141    run_id: String,
142    keepalive_window: Duration,
143}
144
145impl AgentSilencedTrigger {
146    /// Construct. The trigger is bound to a specific cell/run.
147    pub fn new(
148        cell_id: impl Into<String>,
149        run_id: impl Into<String>,
150        keepalive_window: Duration,
151    ) -> Self {
152        Self {
153            fired: tokio::sync::Mutex::new(false),
154            cell_id: cell_id.into(),
155            run_id: run_id.into(),
156            keepalive_window,
157        }
158    }
159
160    /// Fire — returns `Some(signal)` the first time, `None` thereafter.
161    pub async fn fire(&self, elapsed: Duration) -> Option<AgentSilencedSignal> {
162        let mut fired = self.fired.lock().await;
163        if *fired {
164            return None;
165        }
166        *fired = true;
167        Some(AgentSilencedSignal {
168            cell_id: self.cell_id.clone(),
169            run_id: self.run_id.clone(),
170            keepalive_window_ms: self.keepalive_window.as_millis() as u64,
171            elapsed_ms: elapsed.as_millis() as u64,
172        })
173    }
174
175    /// Whether the trigger has already fired.
176    pub async fn has_fired(&self) -> bool {
177        *self.fired.lock().await
178    }
179}
180
181/// Watcher loop: poll the keep-alive tracker every `poll_interval` and fire
182/// the trigger when silence is observed. Runs until either the trigger
183/// fires or the caller drops the futures (cancel-safe — every `await` is a
184/// `sleep` or a `Mutex::lock`).
185///
186/// Returns `Some(signal)` when silence is detected, `None` only if the
187/// trigger had already fired (e.g. by a sibling watcher in a degenerate
188/// topology) — in normal use this function fires exactly once and returns
189/// `Some`.
190pub async fn watch_for_silence(
191    keepalive: KeepAlive,
192    trigger: Arc<AgentSilencedTrigger>,
193    poll_interval: Duration,
194) -> Option<AgentSilencedSignal> {
195    loop {
196        tokio::time::sleep(poll_interval).await;
197        // Single critical section: read elapsed and decide-to-fire under the
198        // same lock so a concurrent `notify_frame()` cannot land between the
199        // silenced-check and the elapsed read. Without this, a frame
200        // arriving exactly at the window boundary could yield an
201        // elapsed_ms < keepalive_window_ms in the emitted signal — the
202        // doctrine says elapsed >= window when we fire.
203        let elapsed = {
204            let g = keepalive.inner.lock().await;
205            let elapsed = g.last_frame_at.elapsed();
206            if elapsed < g.window {
207                continue;
208            }
209            elapsed
210        };
211        return trigger.fire(elapsed).await;
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218
219    #[tokio::test(start_paused = true)]
220    async fn keepalive_fresh_is_not_silenced() {
221        let ka = KeepAlive::new(Duration::from_millis(100));
222        assert!(!ka.is_silenced().await);
223    }
224
225    #[tokio::test(start_paused = true)]
226    async fn keepalive_after_window_is_silenced() {
227        let ka = KeepAlive::new(Duration::from_millis(100));
228        tokio::time::advance(Duration::from_millis(150)).await;
229        assert!(ka.is_silenced().await);
230    }
231
232    #[tokio::test(start_paused = true)]
233    async fn notify_frame_resets_timer() {
234        let ka = KeepAlive::new(Duration::from_millis(100));
235        tokio::time::advance(Duration::from_millis(80)).await;
236        ka.notify_frame().await;
237        tokio::time::advance(Duration::from_millis(80)).await;
238        // 80ms since last notify, window is 100ms — not silenced.
239        assert!(!ka.is_silenced().await);
240    }
241
242    #[tokio::test]
243    async fn trigger_fires_exactly_once() {
244        let t = AgentSilencedTrigger::new("c", "r", Duration::from_millis(100));
245        let first = t.fire(Duration::from_millis(150)).await;
246        let second = t.fire(Duration::from_millis(160)).await;
247        assert!(first.is_some());
248        assert!(second.is_none());
249        let s = first.unwrap();
250        assert_eq!(s.cell_id, "c");
251        assert_eq!(s.run_id, "r");
252        assert_eq!(s.keepalive_window_ms, 100);
253        assert_eq!(s.elapsed_ms, 150);
254    }
255
256    #[tokio::test(start_paused = true)]
257    async fn watcher_fires_after_window() {
258        let ka = KeepAlive::new(Duration::from_millis(50));
259        let trigger = Arc::new(AgentSilencedTrigger::new(
260            "cell-1",
261            "run-1",
262            Duration::from_millis(50),
263        ));
264        let watcher_handle = {
265            let ka = ka.clone();
266            let trigger = trigger.clone();
267            tokio::spawn(
268                async move { watch_for_silence(ka, trigger, Duration::from_millis(10)).await },
269            )
270        };
271        // Never poke notify_frame — silence the entire run.
272        tokio::time::advance(Duration::from_millis(200)).await;
273        let result = watcher_handle.await.expect("task panicked");
274        assert!(result.is_some());
275        assert!(trigger.has_fired().await);
276    }
277}