cellos-host-telemetry 0.5.1

Host-side telemetry receiver for CellOS — vsock listener that host-stamps and signs CloudEvents emitted by the in-guest cellos-telemetry agent.
Documentation
//! Firecracker `/metrics` host probe (Slot F1a).
//!
//! Firecracker exposes a JSON metrics document over a unix socket (the same
//! API socket used for VM configuration). Per ADR-0006 §5.8, the 1.0 line
//! ships *only* the `/metrics` endpoint as the KVM-exit signal — debugfs
//! requires `CAP_SYS_ADMIN` and is post-1.0.
//!
//! This probe is read-only: it scrapes the metrics document and lifts the
//! whole JSON payload into the [`ProbeReading::output`] block. Downstream
//! consumers (taudit, cellos-projector) parse the parts they need; the
//! supervisor does not pre-extract a "vmm exits" gauge to avoid drifting
//! from upstream Firecracker's metric set.
//!
//! On non-Linux hosts (and on Linux hosts where the operator did not wire a
//! Firecracker socket) the probe returns [`ProbeError::PlatformUnsupported`]
//! / [`ProbeError::Io`] respectively — both are safe to drop on the floor
//! per the F1a wiring contract: missing host probes don't fail-cell, they
//! widen the silence the projector flags.

use async_trait::async_trait;
use serde_json::Value;

use super::{HostProbe, ProbeContext, ProbeError, ProbeReading};

/// Probe-source identifier emitted in [`ProbeReading::probe_source`].
pub const PROBE_SOURCE: &str = "host.fc_metrics";

/// Firecracker `/metrics` scrape probe.
///
/// `endpoint` is a stable, audit-replayable identifier of where the metrics
/// document came from — typically `unix://<api-socket>/metrics` on Linux
/// hosts. The probe carries the endpoint in [`ProbeReading::inputs`] so
/// auditors can reproduce the read on a quiescent VM.
#[derive(Debug, Clone)]
pub struct FcMetricsProbe {
    /// The audit-recorded endpoint identifier (e.g.
    /// `"unix:///run/firecracker/cell-abc.sock/metrics"`).
    pub endpoint: String,
}

impl FcMetricsProbe {
    /// Build a probe pointed at `endpoint`. The string is recorded verbatim
    /// in the reading's `inputs` block — pick one stable across the run.
    pub fn new(endpoint: impl Into<String>) -> Self {
        Self {
            endpoint: endpoint.into(),
        }
    }
}

#[async_trait]
impl HostProbe for FcMetricsProbe {
    fn probe_name(&self) -> &'static str {
        "fc_metrics"
    }

    async fn read(&self, _ctx: &ProbeContext) -> Result<ProbeReading, ProbeError> {
        let raw = read_metrics_document(&self.endpoint).await?;
        let parsed: Value = serde_json::from_str(&raw)
            .map_err(|e| ProbeError::Parse(format!("fc /metrics not JSON: {e}")))?;
        let inputs = serde_json::json!({
            "endpoint": self.endpoint,
            "kind": "firecracker.metrics",
        });
        let output = serde_json::json!({
            "metrics": parsed,
        });
        Ok(ProbeReading::new(PROBE_SOURCE, inputs, output))
    }
}

/// Read the metrics document from `endpoint`. Linux-only path is a stub
/// today: Slot F1a wires the trait + envelope; the actual unix-socket
/// scrape lands with the supervisor wiring slot. On non-Linux this is
/// hard-`PlatformUnsupported` per `d9f3b7a`.
#[cfg(target_os = "linux")]
async fn read_metrics_document(_endpoint: &str) -> Result<String, ProbeError> {
    // Slot F1a defers the actual unix-socket read to the supervisor wiring
    // slot — it owns the api-socket path discipline (cgroup-scoped temp
    // dirs, jailer uid). Returning Io here keeps the trait shape honest
    // while the wiring slot lands so a probe instantiated against an
    // un-wired endpoint reports a real failure rather than a synthetic
    // success.
    Err(ProbeError::Io(
        "fc /metrics socket scrape not yet wired (Phase F1a — pending supervisor integration)"
            .into(),
    ))
}

#[cfg(not(target_os = "linux"))]
async fn read_metrics_document(_endpoint: &str) -> Result<String, ProbeError> {
    Err(ProbeError::PlatformUnsupported)
}

#[cfg(test)]
mod tests {
    use std::sync::Arc;

    use cellos_core::ports::EventSink;

    use super::super::test_support::CapturingSink;
    use super::super::{emit_reading, ProbeContext, ProbeReading};
    use super::PROBE_SOURCE;

    #[tokio::test]
    async fn emitted_event_carries_d12_fields() {
        // Hand-construct a reading instead of calling read() so the test
        // exercises the dispatch / envelope shape on every platform (the
        // real read() path requires Linux + a wired unix socket).
        let reading = ProbeReading::new(
            PROBE_SOURCE,
            serde_json::json!({
                "endpoint": "unix:///run/firecracker/test.sock/metrics",
                "kind": "firecracker.metrics",
            }),
            serde_json::json!({
                "metrics": {"vmm": {"reads": 7, "writes": 2}},
            }),
        );
        let ctx = ProbeContext::new("cell-fc-1", "run-fc-1", "sha256:fc");

        // Hold the sink as `Arc<CapturingSink>` so we can both dispatch via
        // `Arc<dyn EventSink>` and read `snapshot()` afterwards.
        let snapshot_handle = Arc::new(CapturingSink::new());
        let probe_sink: Arc<dyn EventSink> = snapshot_handle.clone();
        emit_reading(&probe_sink, &ctx, reading)
            .await
            .expect("emit");

        let events = snapshot_handle.snapshot().await;
        assert_eq!(events.len(), 1);
        let data = events[0].data.as_ref().expect("data");
        assert_eq!(data["probeSource"], PROBE_SOURCE);
        assert_eq!(data["cellId"], "cell-fc-1");
        assert_eq!(data["runId"], "run-fc-1");
        assert!(data["hostReceivedAt"].is_string());
        assert!(data["inputs"]["endpoint"].is_string());
        assert!(data["output"]["metrics"].is_object());
        assert!(events[0]
            .ty
            .ends_with(".cell.observability.host.v1.fc_metrics"));
    }
}