cellos-host-telemetry 0.5.1

Host-side telemetry receiver for CellOS — vsock listener that host-stamps and signs CloudEvents emitted by the in-guest cellos-telemetry agent.
Documentation
//! cgroup-v2 host probe (Slot F1a).
//!
//! Reads the three cgroup-v2 leaf files ADR-0006 §"What 1.0 ships" calls
//! out for Path B:
//!
//! - `memory.events` — flat key/value of `low|high|max|oom|oom_kill` event
//!   counters.
//! - `cpu.stat` — flat key/value of `usage_usec|user_usec|system_usec|...`
//!   counters.
//! - `pids.events` — flat key/value of `max` (and post-5.7 `max.imposed`)
//!   counters.
//!
//! All three files share the same trivial line format — `<key> <value>` per
//! line — so [`parse_flat_kv`] handles all three. The probe reports the
//! parsed map verbatim under `output.<file>` so taudit / projector can
//! attribute "OOM-killed" / "throttled" etc. without re-parsing kernel text.
//!
//! Reading these files requires the supervisor's cgroup leaf to exist; the
//! probe takes the leaf path at construction time. Per ADR-0006, the
//! supervisor populates this from `CellHandle.cgroup_path` *after* the
//! workload's pid is written to `cgroup.procs`, so the probe sees the same
//! leaf the workload's accounting hits.
//!
//! Non-Linux: returns [`ProbeError::PlatformUnsupported`].

use std::collections::BTreeMap;
use std::path::{Path, PathBuf};

use async_trait::async_trait;

use super::{HostProbe, ProbeContext, ProbeError, ProbeReading};

/// Probe-source identifier emitted in [`ProbeReading::probe_source`].
pub const PROBE_SOURCE: &str = "host.cgroup";

/// cgroup-v2 leaf-file scrape probe.
///
/// `cgroup_leaf` is the absolute path to the per-cell cgroup-v2 directory
/// (e.g. `/sys/fs/cgroup/cellos/<safe-id>`). The probe reads the three
/// canonical files (`memory.events`, `cpu.stat`, `pids.events`) under that
/// leaf each tick.
#[derive(Debug, Clone)]
pub struct CgroupProbe {
    /// Absolute path to the cgroup-v2 leaf directory for this cell.
    pub cgroup_leaf: PathBuf,
}

impl CgroupProbe {
    /// Construct a probe pointed at `cgroup_leaf`.
    pub fn new(cgroup_leaf: impl Into<PathBuf>) -> Self {
        Self {
            cgroup_leaf: cgroup_leaf.into(),
        }
    }
}

#[async_trait]
impl HostProbe for CgroupProbe {
    fn probe_name(&self) -> &'static str {
        "cgroup"
    }

    async fn read(&self, _ctx: &ProbeContext) -> Result<ProbeReading, ProbeError> {
        let memory = read_flat_kv_at(&self.cgroup_leaf.join("memory.events")).await?;
        let cpu = read_flat_kv_at(&self.cgroup_leaf.join("cpu.stat")).await?;
        let pids = read_flat_kv_at(&self.cgroup_leaf.join("pids.events")).await?;

        let inputs = serde_json::json!({
            "cgroupLeaf": self.cgroup_leaf.to_string_lossy(),
            "files": ["memory.events", "cpu.stat", "pids.events"],
        });
        let output = serde_json::json!({
            "memory_events": memory,
            "cpu_stat": cpu,
            "pids_events": pids,
        });
        Ok(ProbeReading::new(PROBE_SOURCE, inputs, output))
    }
}

/// Linux: read `path` and parse it as a cgroup-v2 flat-kv document.
#[cfg(target_os = "linux")]
async fn read_flat_kv_at(path: &Path) -> Result<BTreeMap<String, u64>, ProbeError> {
    match tokio::fs::read_to_string(path).await {
        Ok(text) => parse_flat_kv(&text),
        Err(e) => Err(ProbeError::Io(format!(
            "cgroup read failed at {}: {e}",
            path.display()
        ))),
    }
}

/// Non-Linux: short-circuit. cgroup-v2 doesn't exist outside Linux.
#[cfg(not(target_os = "linux"))]
async fn read_flat_kv_at(_path: &Path) -> Result<BTreeMap<String, u64>, ProbeError> {
    Err(ProbeError::PlatformUnsupported)
}

/// Parse cgroup-v2 flat-key-value text. One `<key> <value>` per line; blank
/// lines and trailing whitespace tolerated. Non-numeric values are
/// rejected with [`ProbeError::Parse`] — these files are always integer
/// counters.
pub fn parse_flat_kv(text: &str) -> Result<BTreeMap<String, u64>, ProbeError> {
    let mut out = BTreeMap::new();
    for (lineno, raw) in text.lines().enumerate() {
        let line = raw.trim();
        if line.is_empty() {
            continue;
        }
        let mut parts = line.split_whitespace();
        let key = parts
            .next()
            .ok_or_else(|| ProbeError::Parse(format!("flat-kv line {} has no key", lineno + 1)))?;
        let val = parts.next().ok_or_else(|| {
            ProbeError::Parse(format!("flat-kv line {} has no value", lineno + 1))
        })?;
        let parsed: u64 = val.parse().map_err(|e| {
            ProbeError::Parse(format!(
                "flat-kv line {} value {val:?} not u64: {e}",
                lineno + 1
            ))
        })?;
        out.insert(key.to_string(), parsed);
    }
    Ok(out)
}

#[cfg(test)]
mod tests {
    use std::sync::Arc;

    use cellos_core::ports::EventSink;

    use super::super::test_support::CapturingSink;
    use super::super::{emit_reading, ProbeContext, ProbeReading};
    use super::{parse_flat_kv, PROBE_SOURCE};

    #[test]
    fn parse_flat_kv_handles_canonical_memory_events() {
        let text = "low 0\nhigh 0\nmax 12\noom 1\noom_kill 1\n";
        let parsed = parse_flat_kv(text).expect("parse");
        assert_eq!(parsed["low"], 0);
        assert_eq!(parsed["max"], 12);
        assert_eq!(parsed["oom"], 1);
        assert_eq!(parsed["oom_kill"], 1);
    }

    #[test]
    fn parse_flat_kv_rejects_non_numeric_value() {
        let text = "low not-a-number\n";
        let err = parse_flat_kv(text).expect_err("rejects bad value");
        let msg = format!("{err}");
        assert!(msg.contains("flat-kv"), "expected parse error, got {msg}");
    }

    #[tokio::test]
    async fn emitted_event_carries_d12_fields() {
        let snapshot_handle = Arc::new(CapturingSink::new());
        let probe_sink: Arc<dyn EventSink> = snapshot_handle.clone();
        let ctx = ProbeContext::new("cell-cg-1", "run-cg-1", "sha256:cg");
        let reading = ProbeReading::new(
            PROBE_SOURCE,
            serde_json::json!({
                "cgroupLeaf": "/sys/fs/cgroup/cellos/test",
                "files": ["memory.events", "cpu.stat", "pids.events"],
            }),
            serde_json::json!({
                "memory_events": {"oom": 1, "oom_kill": 1},
                "cpu_stat": {"usage_usec": 12345},
                "pids_events": {"max": 0},
            }),
        );
        emit_reading(&probe_sink, &ctx, reading)
            .await
            .expect("emit");

        let events = snapshot_handle.snapshot().await;
        assert_eq!(events.len(), 1);
        let data = events[0].data.as_ref().expect("data");
        assert_eq!(data["probeSource"], PROBE_SOURCE);
        assert_eq!(data["cellId"], "cell-cg-1");
        assert_eq!(data["runId"], "run-cg-1");
        assert!(data["hostReceivedAt"].is_string());
        assert!(data["inputs"]["cgroupLeaf"].is_string());
        assert_eq!(data["output"]["memory_events"]["oom_kill"], 1);
        assert!(events[0].ty.ends_with(".cell.observability.host.v1.cgroup"));
    }
}