cellos-host-telemetry 0.5.1

Host-side telemetry receiver for CellOS — vsock listener that host-stamps and signs CloudEvents emitted by the in-guest cellos-telemetry agent.
Documentation
//! TAP-link state host probe (Slot F1a).
//!
//! Per-cell TAP devices (`cfc-<short-id>`) live on the host and are the
//! kernel-side anchor of Firecracker's network path. Their `operstate`
//! (`/sys/class/net/<iface>/operstate`) plus `carrier`
//! (`/sys/class/net/<iface>/carrier`) are the canonical "is the cell's
//! network up?" signal — this probe exposes them so taudit / projector can
//! correlate "guest declares connect_attempted" with "host shows TAP up".
//!
//! Slot F1a's contract: the probe reads the two sysfs files plus the
//! interface's stats counters under `/sys/class/net/<iface>/statistics/`
//! (`rx_packets`, `tx_packets`, `rx_bytes`, `tx_bytes`) and reports the
//! whole tuple under `output`. Counter parsing is the same flat-kv shape
//! cgroup-v2 uses — but each statistic is its own file, so the probe reads
//! them individually.
//!
//! Non-Linux: returns [`ProbeError::PlatformUnsupported`].

use std::path::{Path, PathBuf};

use async_trait::async_trait;

use super::{HostProbe, ProbeContext, ProbeError, ProbeReading};

/// Probe-source identifier emitted in [`ProbeReading::probe_source`].
pub const PROBE_SOURCE: &str = "host.tap_link";

/// Per-cell TAP-link state probe.
///
/// `iface` is the kernel-side interface name (e.g. `"cfc-abcd1234"`). On
/// Linux the probe reads `/sys/class/net/<iface>/{operstate,carrier}` plus
/// the four canonical statistics files; on non-Linux it returns
/// [`ProbeError::PlatformUnsupported`].
#[derive(Debug, Clone)]
pub struct TapLinkProbe {
    /// Host-side interface name created by the supervisor.
    pub iface: String,
}

impl TapLinkProbe {
    /// Build a probe for the named interface.
    pub fn new(iface: impl Into<String>) -> Self {
        Self {
            iface: iface.into(),
        }
    }

    /// `/sys/class/net/<iface>` root the probe reads under.
    fn sys_class_net(&self) -> PathBuf {
        PathBuf::from("/sys/class/net").join(&self.iface)
    }
}

#[async_trait]
impl HostProbe for TapLinkProbe {
    fn probe_name(&self) -> &'static str {
        "tap_link"
    }

    async fn read(&self, _ctx: &ProbeContext) -> Result<ProbeReading, ProbeError> {
        let sysfs = self.sys_class_net();
        let operstate = read_trimmed(&sysfs.join("operstate")).await?;
        let carrier = read_trimmed(&sysfs.join("carrier")).await?;
        let rx_packets = read_u64(&sysfs.join("statistics/rx_packets")).await?;
        let tx_packets = read_u64(&sysfs.join("statistics/tx_packets")).await?;
        let rx_bytes = read_u64(&sysfs.join("statistics/rx_bytes")).await?;
        let tx_bytes = read_u64(&sysfs.join("statistics/tx_bytes")).await?;

        let inputs = serde_json::json!({
            "iface": self.iface,
            "sysfs": sysfs.to_string_lossy(),
        });
        let output = serde_json::json!({
            "operstate": operstate,
            "carrier": carrier,
            "rxPackets": rx_packets,
            "txPackets": tx_packets,
            "rxBytes": rx_bytes,
            "txBytes": tx_bytes,
        });
        Ok(ProbeReading::new(PROBE_SOURCE, inputs, output))
    }
}

#[cfg(target_os = "linux")]
async fn read_trimmed(path: &Path) -> Result<String, ProbeError> {
    match tokio::fs::read_to_string(path).await {
        Ok(s) => Ok(s.trim().to_string()),
        Err(e) => Err(ProbeError::Io(format!(
            "tap-link read failed at {}: {e}",
            path.display()
        ))),
    }
}

#[cfg(target_os = "linux")]
async fn read_u64(path: &Path) -> Result<u64, ProbeError> {
    let raw = read_trimmed(path).await?;
    raw.parse::<u64>().map_err(|e| {
        ProbeError::Parse(format!(
            "tap-link counter at {} not u64 ({raw:?}): {e}",
            path.display()
        ))
    })
}

#[cfg(not(target_os = "linux"))]
async fn read_trimmed(_path: &Path) -> Result<String, ProbeError> {
    Err(ProbeError::PlatformUnsupported)
}

#[cfg(not(target_os = "linux"))]
async fn read_u64(_path: &Path) -> Result<u64, ProbeError> {
    Err(ProbeError::PlatformUnsupported)
}

#[cfg(test)]
mod tests {
    use std::sync::Arc;

    use cellos_core::ports::EventSink;

    use super::super::test_support::CapturingSink;
    use super::super::{emit_reading, ProbeContext, ProbeReading};
    use super::PROBE_SOURCE;

    #[tokio::test]
    async fn emitted_event_carries_d12_fields() {
        let snapshot_handle = Arc::new(CapturingSink::new());
        let probe_sink: Arc<dyn EventSink> = snapshot_handle.clone();

        let reading = ProbeReading::new(
            PROBE_SOURCE,
            serde_json::json!({
                "iface": "cfc-abcd1234",
                "sysfs": "/sys/class/net/cfc-abcd1234",
            }),
            serde_json::json!({
                "operstate": "up",
                "carrier": "1",
                "rxPackets": 7,
                "txPackets": 12,
                "rxBytes": 840,
                "txBytes": 1500,
            }),
        );
        let ctx = ProbeContext::new("cell-tap-1", "run-tap-1", "sha256:tap");
        emit_reading(&probe_sink, &ctx, reading)
            .await
            .expect("emit");

        let events = snapshot_handle.snapshot().await;
        assert_eq!(events.len(), 1);
        let data = events[0].data.as_ref().expect("data");
        assert_eq!(data["probeSource"], PROBE_SOURCE);
        assert_eq!(data["cellId"], "cell-tap-1");
        assert_eq!(data["runId"], "run-tap-1");
        assert!(data["hostReceivedAt"].is_string());
        assert_eq!(data["inputs"]["iface"], "cfc-abcd1234");
        assert_eq!(data["output"]["operstate"], "up");
        assert_eq!(data["output"]["carrier"], "1");
        assert_eq!(data["output"]["txBytes"], 1500);
        assert!(events[0]
            .ty
            .ends_with(".cell.observability.host.v1.tap_link"));
    }
}