cellos-host-telemetry 0.5.1

Host-side telemetry receiver for CellOS — vsock listener that host-stamps and signs CloudEvents emitted by the in-guest cellos-telemetry agent.
Documentation
//! nftables byte/packet counter host probe (Slot F1a).
//!
//! Per ADR-0006 §"What 1.0 ships", Path B includes per-cell nftables
//! counters: how many packets and bytes did each rule in the cell's
//! ruleset see. The supervisor already shells out to
//! `nft list ruleset --json` post-run for `network_flow_decision` events
//! (see [`crates/cellos-supervisor/src/nft_counters.rs`]); this probe
//! complements that with a per-tick scrape so taudit gets a counter series
//! rather than a single end-of-run snapshot.
//!
//! Slot F1a's contract: the probe takes the cell's nftables table name at
//! construction; each `read()` returns the parsed counter rows under
//! `output.rows` (an array of `{family, table, chain, handle, packets,
//! bytes, ruleRepr}` objects matching the supervisor's [`NftCounterRow`]
//! shape). The supervisor wiring slot owns the actual subprocess call to
//! `nft -j list table inet <table>`; F1a tests the trait + envelope on
//! every platform via [`CapturingSink`].
//!
//! Non-Linux: returns [`ProbeError::PlatformUnsupported`].

use async_trait::async_trait;
use serde::{Deserialize, Serialize};

use super::{HostProbe, ProbeContext, ProbeError, ProbeReading};

/// Probe-source identifier emitted in [`ProbeReading::probe_source`].
pub const PROBE_SOURCE: &str = "host.nftables";

/// One counter row in the per-cell nftables ruleset. Mirrors the supervisor's
/// `NftCounterRow` one-for-one — this probe re-declares the shape locally so
/// the host-telemetry crate doesn't take a path dep on the supervisor (the
/// dep arrow points the other way per ADR-0006).
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NftCounterRow {
    /// nft family, e.g. `"inet"`, `"ip"`, `"ip6"`.
    pub family: String,
    /// Per-cell table name (e.g. `"cellos_<safe-id>"`).
    pub table: String,
    /// Chain within the table.
    pub chain: String,
    /// nft handle (stable per rule for the lifetime of the table).
    pub handle: u64,
    /// Cumulative packets seen by this rule.
    pub packets: u64,
    /// Cumulative bytes seen by this rule.
    pub bytes: u64,
    /// Synthesized text representation of the rule's expression.
    pub rule_repr: String,
}

/// Per-cell nftables counter probe.
///
/// `table_name` is the per-cell table (e.g. `"cellos_<safe-id>"`); the
/// supervisor names tables deterministically per cell so the audit trail
/// can attribute the counter series back to a single cell.
#[derive(Debug, Clone)]
pub struct NftablesProbe {
    /// nft family hosting the per-cell table (typically `"inet"`).
    pub family: String,
    /// Per-cell table name.
    pub table_name: String,
}

impl NftablesProbe {
    /// Build a probe pointed at `family`/`table_name`.
    pub fn new(family: impl Into<String>, table_name: impl Into<String>) -> Self {
        Self {
            family: family.into(),
            table_name: table_name.into(),
        }
    }
}

#[async_trait]
impl HostProbe for NftablesProbe {
    fn probe_name(&self) -> &'static str {
        "nftables"
    }

    async fn read(&self, _ctx: &ProbeContext) -> Result<ProbeReading, ProbeError> {
        let rows = scrape_counter_rows(&self.family, &self.table_name).await?;
        let inputs = serde_json::json!({
            "family": self.family,
            "table": self.table_name,
            "command": format!("nft -j list table {} {}", self.family, self.table_name),
        });
        let output = serde_json::json!({
            "rows": rows,
        });
        Ok(ProbeReading::new(PROBE_SOURCE, inputs, output))
    }
}

/// Linux: scrape counter rows for `family`/`table`. Slot F1a defers the
/// actual `nft` subprocess invocation to the supervisor wiring slot — that
/// slot owns the `nsenter`/jailer-uid discipline. Returning `Io` here keeps
/// the trait shape honest until wiring lands.
#[cfg(target_os = "linux")]
async fn scrape_counter_rows(
    _family: &str,
    _table: &str,
) -> Result<Vec<NftCounterRow>, ProbeError> {
    Err(ProbeError::Io(
        "nftables counter scrape not yet wired (Phase F1a — pending supervisor integration)".into(),
    ))
}

#[cfg(not(target_os = "linux"))]
async fn scrape_counter_rows(
    _family: &str,
    _table: &str,
) -> Result<Vec<NftCounterRow>, ProbeError> {
    Err(ProbeError::PlatformUnsupported)
}

#[cfg(test)]
mod tests {
    use std::sync::Arc;

    use cellos_core::ports::EventSink;

    use super::super::test_support::CapturingSink;
    use super::super::{emit_reading, ProbeContext, ProbeReading};
    use super::{NftCounterRow, PROBE_SOURCE};

    #[tokio::test]
    async fn emitted_event_carries_d12_fields() {
        let snapshot_handle = Arc::new(CapturingSink::new());
        let probe_sink: Arc<dyn EventSink> = snapshot_handle.clone();

        let rows = vec![NftCounterRow {
            family: "inet".into(),
            table: "cellos_test".into(),
            chain: "egress".into(),
            handle: 4,
            packets: 12,
            bytes: 1284,
            rule_repr: "ip daddr 1.2.3.4 tcp dport 443 accept".into(),
        }];

        let reading = ProbeReading::new(
            PROBE_SOURCE,
            serde_json::json!({
                "family": "inet",
                "table": "cellos_test",
                "command": "nft -j list table inet cellos_test",
            }),
            serde_json::json!({"rows": rows}),
        );
        let ctx = ProbeContext::new("cell-nft-1", "run-nft-1", "sha256:nft");
        emit_reading(&probe_sink, &ctx, reading)
            .await
            .expect("emit");

        let events = snapshot_handle.snapshot().await;
        assert_eq!(events.len(), 1);
        let data = events[0].data.as_ref().expect("data");
        assert_eq!(data["probeSource"], PROBE_SOURCE);
        assert_eq!(data["cellId"], "cell-nft-1");
        assert_eq!(data["runId"], "run-nft-1");
        assert!(data["hostReceivedAt"].is_string());
        assert_eq!(data["inputs"]["table"], "cellos_test");
        let out_rows = data["output"]["rows"].as_array().expect("rows is array");
        assert_eq!(out_rows.len(), 1);
        assert_eq!(out_rows[0]["packets"], 12);
        assert_eq!(out_rows[0]["bytes"], 1284);
        assert!(events[0]
            .ty
            .ends_with(".cell.observability.host.v1.nftables"));
    }
}