cellos-host-telemetry 0.5.0

Host-side telemetry receiver for CellOS — vsock listener that host-stamps and signs CloudEvents emitted by the in-guest cellos-telemetry agent.
Documentation
//! Keep-alive watcher and `agent_silenced` detection (ADR-0006 §7).
//!
//! Doctrine: silence is observable. When the in-guest agent has not delivered
//! a frame within the keep-alive window, the supervisor emits
//! `cell.observability.guest.agent_silenced` typed as `DeclaredAuthority`
//! *absence* — the absence is itself an Observed event from the host's
//! perspective.
//!
//! This module owns:
//!
//! - [`KeepAlive`] — a small async-friendly tracker the listener pokes on
//!   every successful frame receive. The receiver-side stays simple: read a
//!   frame, call `notify_frame()`, repeat. The keep-alive window enforcement
//!   runs as a separate async task driven by [`watch_for_silence`].
//!
//! - [`AgentSilencedTrigger`] — the single fire-once trigger that returns
//!   the `cell.observability.guest.agent_silenced` value. F1b owns the
//!   additive CloudEvent constructor; until that lands, this module produces
//!   the [`AgentSilencedSignal`] internal value type and **TODO(F1b)** wires
//!   it through to `cellos_core::events`.

use std::sync::Arc;
use std::time::Duration;

use tokio::sync::Mutex;
use tokio::time::Instant;

/// Default keep-alive window (10s).
///
/// Phase F3b ships with this value as a sensible default for the per-cell
/// listener; F4 will surface it as a portable-profile knob alongside
/// `onAgentFailure`. The value is intentionally generous — agent-side
/// drop-with-counter back-pressure (ADR-0006 §5.3) means a frame should
/// always make it through within seconds of the workload doing anything
/// observable.
pub const DEFAULT_KEEPALIVE_WINDOW: Duration = Duration::from_secs(10);

/// Tracker the listener pokes on every successful frame receive. Cheap to
/// clone (`Arc<Mutex<...>>`); listener and watcher hold one each.
#[derive(Debug, Clone)]
pub struct KeepAlive {
    inner: Arc<Mutex<KeepAliveInner>>,
}

#[derive(Debug)]
struct KeepAliveInner {
    last_frame_at: Instant,
    window: Duration,
}

impl KeepAlive {
    /// Construct with the given window. The first frame is "now" — i.e.
    /// the watcher does not fire on a cell that just booted.
    pub fn new(window: Duration) -> Self {
        Self {
            inner: Arc::new(Mutex::new(KeepAliveInner {
                last_frame_at: Instant::now(),
                window,
            })),
        }
    }

    /// Window the watcher uses.
    pub async fn window(&self) -> Duration {
        self.inner.lock().await.window
    }

    /// Reset on every successful frame receive.
    pub async fn notify_frame(&self) {
        let mut g = self.inner.lock().await;
        g.last_frame_at = Instant::now();
    }

    /// True iff the elapsed time since the last frame exceeds the window.
    pub async fn is_silenced(&self) -> bool {
        let g = self.inner.lock().await;
        g.last_frame_at.elapsed() >= g.window
    }
}

/// Internal value type produced when a cell's agent is detected silenced.
/// F4b signs it; F1b will project it to a `CloudEventV1` via the additive
/// constructor.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AgentSilencedSignal {
    /// Per-cell identifier (host-stamped).
    pub cell_id: String,
    /// Per-run identifier (host-stamped).
    pub run_id: String,
    /// Window the watcher was configured with, in milliseconds.
    pub keepalive_window_ms: u64,
    /// Milliseconds elapsed since the last frame at fire time. Always
    /// >= `keepalive_window_ms` by construction.
    pub elapsed_ms: u64,
}

impl AgentSilencedSignal {
    /// CloudEvent type URN this signal will project to (F1b).
    ///
    /// TODO(F1b): once the additive event constructor lands in
    /// `cellos_core::events`, replace `to_cloudevent_stub()` with a call to
    /// `events::observability_guest_agent_silenced_data_v1(...)`. The URN
    /// here is locked so admission-side validators can pre-register the
    /// schema name without waiting on the F1b commit.
    pub const CLOUDEVENT_TYPE: &'static str =
        "dev.cellos.events.cell.observability.v1.guest.agent_silenced";

    /// Stub projection until F1b lands the additive builder.
    ///
    /// Returns a `serde_json::Value` shaped like the eventual schema so
    /// downstream wiring can be tested end-to-end against the JSON shape.
    /// Once F1b ships, this becomes a thin wrapper over the F1b builder
    /// (or is deleted entirely in favour of direct `CloudEventV1`
    /// construction in the supervisor loop).
    pub fn to_cloudevent_stub(&self) -> serde_json::Value {
        // TODO(F1b): replace with
        // `cellos_core::events::observability_guest_agent_silenced_data_v1(...)`.
        serde_json::json!({
            "cellId": self.cell_id,
            "runId": self.run_id,
            "keepaliveWindowMs": self.keepalive_window_ms,
            "elapsedMs": self.elapsed_ms,
            "epistemicStatus": "DECLARED",
            "ruleClass": "GUEST_AGENT_DECLARATION",
            "_todo": "F1b: wire to cellos_core::events::observability_guest_agent_silenced_data_v1",
        })
    }
}

/// One-shot trigger for the watcher. The listener loop holds the trigger;
/// the watcher fires it exactly once when silence is detected.
///
/// Fire-once is structural: F4b should not emit two `agent_silenced`
/// events for the same run, even under spurious wakeups. The fire-once
/// invariant is maintained by [`AgentSilencedTrigger::fire`] returning
/// `None` on the second call.
#[derive(Debug)]
pub struct AgentSilencedTrigger {
    fired: tokio::sync::Mutex<bool>,
    cell_id: String,
    run_id: String,
    keepalive_window: Duration,
}

impl AgentSilencedTrigger {
    /// Construct. The trigger is bound to a specific cell/run.
    pub fn new(
        cell_id: impl Into<String>,
        run_id: impl Into<String>,
        keepalive_window: Duration,
    ) -> Self {
        Self {
            fired: tokio::sync::Mutex::new(false),
            cell_id: cell_id.into(),
            run_id: run_id.into(),
            keepalive_window,
        }
    }

    /// Fire — returns `Some(signal)` the first time, `None` thereafter.
    pub async fn fire(&self, elapsed: Duration) -> Option<AgentSilencedSignal> {
        let mut fired = self.fired.lock().await;
        if *fired {
            return None;
        }
        *fired = true;
        Some(AgentSilencedSignal {
            cell_id: self.cell_id.clone(),
            run_id: self.run_id.clone(),
            keepalive_window_ms: self.keepalive_window.as_millis() as u64,
            elapsed_ms: elapsed.as_millis() as u64,
        })
    }

    /// Whether the trigger has already fired.
    pub async fn has_fired(&self) -> bool {
        *self.fired.lock().await
    }
}

/// Watcher loop: poll the keep-alive tracker every `poll_interval` and fire
/// the trigger when silence is observed. Runs until either the trigger
/// fires or the caller drops the futures (cancel-safe — every `await` is a
/// `sleep` or a `Mutex::lock`).
///
/// Returns `Some(signal)` when silence is detected, `None` only if the
/// trigger had already fired (e.g. by a sibling watcher in a degenerate
/// topology) — in normal use this function fires exactly once and returns
/// `Some`.
pub async fn watch_for_silence(
    keepalive: KeepAlive,
    trigger: Arc<AgentSilencedTrigger>,
    poll_interval: Duration,
) -> Option<AgentSilencedSignal> {
    loop {
        tokio::time::sleep(poll_interval).await;
        // Single critical section: read elapsed and decide-to-fire under the
        // same lock so a concurrent `notify_frame()` cannot land between the
        // silenced-check and the elapsed read. Without this, a frame
        // arriving exactly at the window boundary could yield an
        // elapsed_ms < keepalive_window_ms in the emitted signal — the
        // doctrine says elapsed >= window when we fire.
        let elapsed = {
            let g = keepalive.inner.lock().await;
            let elapsed = g.last_frame_at.elapsed();
            if elapsed < g.window {
                continue;
            }
            elapsed
        };
        return trigger.fire(elapsed).await;
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test(start_paused = true)]
    async fn keepalive_fresh_is_not_silenced() {
        let ka = KeepAlive::new(Duration::from_millis(100));
        assert!(!ka.is_silenced().await);
    }

    #[tokio::test(start_paused = true)]
    async fn keepalive_after_window_is_silenced() {
        let ka = KeepAlive::new(Duration::from_millis(100));
        tokio::time::advance(Duration::from_millis(150)).await;
        assert!(ka.is_silenced().await);
    }

    #[tokio::test(start_paused = true)]
    async fn notify_frame_resets_timer() {
        let ka = KeepAlive::new(Duration::from_millis(100));
        tokio::time::advance(Duration::from_millis(80)).await;
        ka.notify_frame().await;
        tokio::time::advance(Duration::from_millis(80)).await;
        // 80ms since last notify, window is 100ms — not silenced.
        assert!(!ka.is_silenced().await);
    }

    #[tokio::test]
    async fn trigger_fires_exactly_once() {
        let t = AgentSilencedTrigger::new("c", "r", Duration::from_millis(100));
        let first = t.fire(Duration::from_millis(150)).await;
        let second = t.fire(Duration::from_millis(160)).await;
        assert!(first.is_some());
        assert!(second.is_none());
        let s = first.unwrap();
        assert_eq!(s.cell_id, "c");
        assert_eq!(s.run_id, "r");
        assert_eq!(s.keepalive_window_ms, 100);
        assert_eq!(s.elapsed_ms, 150);
    }

    #[tokio::test(start_paused = true)]
    async fn watcher_fires_after_window() {
        let ka = KeepAlive::new(Duration::from_millis(50));
        let trigger = Arc::new(AgentSilencedTrigger::new(
            "cell-1",
            "run-1",
            Duration::from_millis(50),
        ));
        let watcher_handle = {
            let ka = ka.clone();
            let trigger = trigger.clone();
            tokio::spawn(
                async move { watch_for_silence(ka, trigger, Duration::from_millis(10)).await },
            )
        };
        // Never poke notify_frame — silence the entire run.
        tokio::time::advance(Duration::from_millis(200)).await;
        let result = watcher_handle.await.expect("task panicked");
        assert!(result.is_some());
        assert!(trigger.has_fired().await);
    }
}