cellos-host-telemetry 0.5.0

Host-side telemetry receiver for CellOS — vsock listener that host-stamps and signs CloudEvents emitted by the in-guest cellos-telemetry agent.
Documentation
//! Per-cell vsock UDS listener — Phase F3b.
//!
//! Mirrors the [`listen_for_exit_code`] pattern in
//! [`cellos-host-firecracker`] (`crates/cellos-host-firecracker/src/lib.rs:1669`):
//! Firecracker proxies the guest's vsock connection to a per-cell Unix
//! Domain Socket at `<vsock_uds_path>_<port>`, and the host crate binds the
//! UDS BEFORE the VM boots so no frame is missed. For F3b we mirror that
//! shape for `VSOCK_TELEMETRY_PORT = 9001`.
//!
//! Wire format (ADR-0006 §12 wire-schema versioning):
//!
//! ```text
//! ┌──────────────┬────────────────────────────────────┐
//! │ u32 LE len   │ CBOR map (the frame body)          │
//! └──────────────┴────────────────────────────────────┘
//! ```
//!
//! Frame body (CBOR map) MUST contain a `content_version: u16` key. The
//! host's `WIRE_CONTENT_VERSION_MAJOR` constant (1 today) is the highest
//! major it understands. If the major byte (`(content_version >> 8) as u8`)
//! does not match, the host **rejects** the frame with
//! [`TelemetryError::UnsupportedVersion`] — this is the schema-versioning
//! gate ADR-0006 §12 calls out.
//!
//! Frame body fields (after `content_version`): `probe_source` (text),
//! `guest_pid` (u32), `guest_comm` (text), `guest_monotonic_ns` (u64).
//! **Anything else is dropped at decode-time** — the host cannot let the
//! guest forge attribution by stuffing extra keys into the map.

use std::path::{Path, PathBuf};
use std::time::SystemTime;

use serde::Deserialize;
use tokio::io::AsyncReadExt;
use tokio::net::{UnixListener, UnixStream};

use crate::host_stamp;
use crate::keepalive::KeepAlive;
use crate::{
    GuestDeclaration, HostStamp, StampedDeclaration, TelemetryError, VSOCK_TELEMETRY_PORT,
    WIRE_CONTENT_VERSION_MAJOR,
};

/// Maximum CBOR frame body the host will accept.
///
/// 64 KiB is generous for the four-field guest declaration shape; anything
/// larger is treated as a malformed wire and the connection is dropped.
/// This is the host-side back-pressure floor; agent-side drop-with-counter
/// (ADR-0006 §5.3) handles the in-guest queue.
pub const MAX_FRAME_BYTES: u32 = 64 * 1024;

/// Per-cell vsock telemetry listener.
///
/// Construction binds the per-cell UDS at `<base>_<VSOCK_TELEMETRY_PORT>`.
/// Bind happens BEFORE the VM boots — `cellos-host-firecracker` calls
/// [`Self::bind_for_cell`] in the same pre-boot section that already owns
/// the exit-code listener, so the channel-authenticity invariant
/// (ADR-0006 §5: "the host trusts WHICH CID:port the bytes arrived on")
/// holds from the workload's first instruction.
pub struct VsockUdsListener {
    socket_path: PathBuf,
    listener: UnixListener,
}

impl VsockUdsListener {
    /// Bind the per-cell UDS at `<vsock_uds_base>_<VSOCK_TELEMETRY_PORT>`.
    ///
    /// `vsock_uds_base` is the same base path
    /// `cellos-host-firecracker::FirecrackerCell::start` uses for its
    /// vsock UDS family — the supervisor passes that base through so the
    /// telemetry UDS sits in the same socket-dir as the exit-code UDS
    /// (`_9000`), making per-cell teardown a single `remove_dir_all` on
    /// the socket-dir entry.
    pub fn bind_for_cell(vsock_uds_base: &Path) -> Result<Self, TelemetryError> {
        let socket_path = PathBuf::from(format!(
            "{}_{}",
            vsock_uds_base.display(),
            VSOCK_TELEMETRY_PORT
        ));
        // Mirror `listen_for_exit_code`: bind before any consumer connects.
        // We do NOT remove a stale socket at this path — that decision belongs
        // to the supervisor's per-cell socket-dir lifecycle, not the listener.
        let listener = UnixListener::bind(&socket_path).map_err(|e| {
            TelemetryError::Bind(format!(
                "bind telemetry UDS at {}: {e}",
                socket_path.display()
            ))
        })?;
        Ok(Self {
            socket_path,
            listener,
        })
    }

    /// Path the listener bound to.
    pub fn socket_path(&self) -> &Path {
        &self.socket_path
    }

    /// Accept the next guest connection. The supervisor calls this once per
    /// run (the agent maintains a single long-lived connection — there is
    /// no second-connection retry path; reconnection is a v1.x consideration).
    pub async fn accept(&self) -> Result<VsockUdsStream, TelemetryError> {
        let (stream, _addr) = self.listener.accept().await.map_err(|e| {
            TelemetryError::Bind(format!(
                "accept telemetry connection at {}: {e}",
                self.socket_path.display()
            ))
        })?;
        Ok(VsockUdsStream { stream })
    }
}

/// One accepted guest connection. Wraps the framing + decode loop.
pub struct VsockUdsStream {
    stream: UnixStream,
}

impl VsockUdsStream {
    /// Read one CBOR-framed guest declaration, decode it, host-stamp it,
    /// poke the keep-alive tracker, and return the stamped value.
    ///
    /// On `Ok(None)`: the guest closed the stream cleanly (EOF on the length
    /// prefix). The supervisor treats this as end-of-run for the telemetry
    /// channel and lets the keep-alive watcher decide whether to emit
    /// `agent_silenced` — clean close after a short window is itself
    /// a silenced signal. This matches the doctrine: silence is observable.
    pub async fn recv_stamped(
        &mut self,
        stamp: &HostStamp,
        keepalive: &KeepAlive,
    ) -> Result<Option<StampedDeclaration>, TelemetryError> {
        let guest = match self.recv_guest_declaration().await? {
            Some(g) => g,
            None => return Ok(None),
        };
        keepalive.notify_frame().await;
        // Re-stamp `host_received_at` per-frame so the receive instant is
        // accurate; the rest of the stamp (cell_id/run_id/spec_signature_hash)
        // is per-run and does not change.
        let per_frame_stamp = HostStamp {
            cell_id: stamp.cell_id.clone(),
            run_id: stamp.run_id.clone(),
            host_received_at: SystemTime::now(),
            spec_signature_hash: stamp.spec_signature_hash.clone(),
        };
        Ok(Some(host_stamp::stamp(guest, per_frame_stamp)))
    }

    /// Read one frame and return the four guest-fillable fields. The
    /// `content_version` major check happens here — anything else is
    /// rejected before the value reaches the stamping layer.
    pub async fn recv_guest_declaration(
        &mut self,
    ) -> Result<Option<GuestDeclaration>, TelemetryError> {
        // Length prefix.
        let mut len_buf = [0u8; 4];
        match self.stream.read_exact(&mut len_buf).await {
            Ok(_) => {}
            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
            Err(e) => return Err(TelemetryError::Wire(format!("read length prefix: {e}"))),
        }
        let len = u32::from_le_bytes(len_buf);
        if len == 0 || len > MAX_FRAME_BYTES {
            return Err(TelemetryError::Wire(format!(
                "frame length {len} out of bounds (max {MAX_FRAME_BYTES})"
            )));
        }

        // Body.
        let mut body = vec![0u8; len as usize];
        self.stream
            .read_exact(&mut body)
            .await
            .map_err(|e| TelemetryError::Wire(format!("read frame body: {e}")))?;

        decode_frame(&body).map(Some)
    }
}

/// CBOR-decoded wire frame, before the major-version check rejects unknowns.
///
/// `content_version` is parsed first; any unknown-major frame is dropped
/// at this layer — the inner fields are still parsed (to surface a useful
/// error) but the decoded value never reaches the stamping layer.
#[derive(Debug, Deserialize)]
struct WireFrame {
    content_version: u16,
    probe_source: String,
    guest_pid: u32,
    guest_comm: String,
    guest_monotonic_ns: u64,
}

/// Decode a CBOR frame body into a [`GuestDeclaration`].
///
/// Pure function, no I/O — exposed at module scope so the framing tests
/// can exercise the decode path with hand-crafted bytes.
pub fn decode_frame(body: &[u8]) -> Result<GuestDeclaration, TelemetryError> {
    let frame: WireFrame = ciborium::de::from_reader(body)
        .map_err(|e| TelemetryError::Wire(format!("CBOR decode: {e}")))?;

    // Major-version check (ADR-0006 §12). The major byte is the high byte of
    // the u16 — `(v >> 8) as u8`. Today: major == 0, content_version == 1.
    let frame_major = (frame.content_version >> 8) as u8;
    let host_major = (WIRE_CONTENT_VERSION_MAJOR >> 8) as u8;
    if frame_major != host_major {
        return Err(TelemetryError::UnsupportedVersion(frame.content_version));
    }

    Ok(GuestDeclaration {
        probe_source: frame.probe_source,
        guest_pid: frame.guest_pid,
        guest_comm: frame.guest_comm,
        guest_monotonic_ns: frame.guest_monotonic_ns,
    })
}

/// Test helper: encode a `WireFrame`-shaped CBOR map. Public(crate) so the
/// integration test under `tests/` can build a frame without re-deriving
/// the schema.
#[doc(hidden)]
pub fn encode_test_frame(
    content_version: u16,
    probe_source: &str,
    guest_pid: u32,
    guest_comm: &str,
    guest_monotonic_ns: u64,
) -> Vec<u8> {
    let body = serde_json::json!({
        "content_version": content_version,
        "probe_source": probe_source,
        "guest_pid": guest_pid,
        "guest_comm": guest_comm,
        "guest_monotonic_ns": guest_monotonic_ns,
    });
    let mut cbor_bytes: Vec<u8> = Vec::new();
    ciborium::ser::into_writer(&body, &mut cbor_bytes).expect("CBOR encode");
    let mut framed = (cbor_bytes.len() as u32).to_le_bytes().to_vec();
    framed.extend_from_slice(&cbor_bytes);
    framed
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::tempdir;
    use tokio::io::AsyncWriteExt;
    use tokio::net::UnixStream;

    #[test]
    fn decode_rejects_unknown_major() {
        // Major 1, content_version = 0x0100. Host major today is 0; reject.
        let mut cbor_bytes: Vec<u8> = Vec::new();
        let body = serde_json::json!({
            "content_version": 0x0100u16,
            "probe_source": "process.spawned",
            "guest_pid": 1u32,
            "guest_comm": "x",
            "guest_monotonic_ns": 0u64,
        });
        ciborium::ser::into_writer(&body, &mut cbor_bytes).unwrap();
        match decode_frame(&cbor_bytes) {
            Err(TelemetryError::UnsupportedVersion(v)) => assert_eq!(v, 0x0100),
            other => panic!("expected UnsupportedVersion, got {other:?}"),
        }
    }

    #[test]
    fn decode_accepts_known_major_and_drops_unknown_fields() {
        // Same major (0), content_version = 1. Extra "cell_id" the
        // attacker stuffed in is dropped at decode-time.
        let mut cbor_bytes: Vec<u8> = Vec::new();
        let body = serde_json::json!({
            "content_version": 1u16,
            "probe_source": "process.spawned",
            "guest_pid": 7u32,
            "guest_comm": "workload",
            "guest_monotonic_ns": 42u64,
            "cell_id": "FORGED-CELL-ID",
            "run_id": "FORGED-RUN-ID",
        });
        ciborium::ser::into_writer(&body, &mut cbor_bytes).unwrap();
        let g = decode_frame(&cbor_bytes).expect("decode");
        assert_eq!(g.probe_source, "process.spawned");
        assert_eq!(g.guest_pid, 7);
        assert_eq!(g.guest_comm, "workload");
        assert_eq!(g.guest_monotonic_ns, 42);
        // The struct itself has no `cell_id` field — type-system dropped it.
    }

    #[test]
    fn decode_rejects_garbage() {
        let result = decode_frame(&[0xff, 0xff, 0xff]);
        assert!(matches!(result, Err(TelemetryError::Wire(_))));
    }

    #[tokio::test]
    async fn bind_creates_uds_at_expected_path() {
        let dir = tempdir().unwrap();
        let base = dir.path().join("cellos-vsock-test.socket");
        let listener = VsockUdsListener::bind_for_cell(&base).expect("bind");
        let expected = PathBuf::from(format!("{}_{}", base.display(), VSOCK_TELEMETRY_PORT));
        assert_eq!(listener.socket_path(), expected.as_path());
        assert!(expected.exists(), "UDS file should exist after bind");
    }

    #[tokio::test]
    async fn end_to_end_frame_round_trip_stamps_attribution() {
        let dir = tempdir().unwrap();
        let base = dir.path().join("cellos-vsock-rt.socket");
        let listener = VsockUdsListener::bind_for_cell(&base).expect("bind");
        let socket_path = listener.socket_path().to_path_buf();

        let server = tokio::spawn(async move {
            let mut stream = listener.accept().await.expect("accept");
            let stamp = HostStamp {
                cell_id: "cell-A".into(),
                run_id: "run-1".into(),
                host_received_at: SystemTime::now(),
                spec_signature_hash: "sha256:abc".into(),
            };
            let ka = KeepAlive::new(std::time::Duration::from_secs(10));
            stream
                .recv_stamped(&stamp, &ka)
                .await
                .expect("recv_stamped")
        });

        let mut client = UnixStream::connect(&socket_path).await.expect("connect");
        // Attacker tries to stuff cell_id; host must overwrite.
        let frame = encode_test_frame(1, "process.spawned", 9, "evil", 7);
        client.write_all(&frame).await.expect("write");
        client.shutdown().await.ok();

        let stamped = server.await.expect("task").expect("got frame");
        // Host-stamped attribution wins.
        assert_eq!(stamped.cell_id, "cell-A");
        assert_eq!(stamped.run_id, "run-1");
        assert_eq!(stamped.spec_signature_hash, "sha256:abc");
        // Guest-fillable preserved.
        assert_eq!(stamped.probe_source, "process.spawned");
        assert_eq!(stamped.guest_pid, 9);
        assert_eq!(stamped.guest_comm, "evil");
    }
}