cellos-telemetry 0.5.0

In-guest telemetry agent for CellOS — runs as PID 2 inside Firecracker microVMs, emits CBOR-over-vsock observations. No signing key by design (ADR-0006).
Documentation
//! `cellos-telemetry` binary entry point — runs as guest PID 2.
//!
//! Phase F3a — implementation. ADR-0006 §5 is the operating model.
//!
//! Lifecycle:
//!   1. `cellos-init` invokes this binary BEFORE the workload exec. The
//!      binary calls `fork(2)`; the child becomes the long-lived telemetry
//!      agent (PID 2) and the parent returns 0 immediately so init can
//!      continue with the workload exec (workload becomes PID 3+).
//!   2. The child opens an AF_VSOCK stream to `VMADDR_CID_HOST=2`,
//!      `VSOCK_TELEMETRY_PORT=9001` — the supervisor bound this CID:port
//!      before the workload's first instruction (ADR-0006 §5).
//!   3. Probes are installed and the main loop drains them at a fixed
//!      cadence. Each [`ProbeEvent`] is CBOR-framed and `write(2)`-ed to
//!      vsock. Back-pressure is drop-with-counter: if the in-process
//!      bounded queue is full we increment a counter and discard the
//!      event; the host sees the gap as `cell.observability.guest.telemetry.dropped`
//!      via the supervisor's silence detector.
//!
//! Non-Linux builds: this binary is a no-op that prints a notice and exits 0
//! so the workspace builds on Windows/macOS dev boxes.
//!
//! TODO seccomp gate: the workload's seccomp profile must block
//! `kill(2)` / `tgkill(2)` / `ptrace(2)` against PIDs ≤ 2 (ADR-0006 §5.2,
//! Q5.2). That profile lives in `cellos-host-firecracker` (jailer +
//! seccomp BPF program); this binary cannot enforce it from inside the
//! guest. The TODO is sited HERE — at the agent fork point — because
//! that is the moment after which the seccomp invariant must already be
//! installed on the workload's process tree.

#![cfg_attr(target_os = "linux", allow(unsafe_code))]

#[cfg(not(target_os = "linux"))]
fn main() {
    eprintln!(
        "cellos-telemetry: non-Linux target — no-op (Phase F3a; \
         see ADR-0006 / Plans/cellos-code-complete-roadmap.md)"
    );
    // Exit 0 so non-Linux CI (Windows) keeps building cleanly.
}

#[cfg(target_os = "linux")]
fn main() {
    use std::process;

    // Step 1: fork. Parent returns 0 to cellos-init so init can proceed
    // with the workload exec; child becomes PID 2 telemetry agent.
    //
    // SAFETY: fork() is always safe to call. After it returns we are
    // single-threaded in the child (we have not spawned threads yet) so
    // we are clear of the well-known async-signal-safety hazard around
    // fork-with-threads.
    let pid = unsafe { libc::fork() };
    if pid < 0 {
        let errno = unsafe { *libc::__errno_location() };
        eprintln!("cellos-telemetry: fork failed errno={errno}");
        // Hard-fail the launch: cellos-init's onAgentFailure default is
        // `fail-cell` under hardened (ADR-0006 §5.7). Returning non-zero
        // here lets init turn that into a cell teardown.
        process::exit(2);
    }
    if pid > 0 {
        // Parent — exit 0 so init proceeds. The kernel will reparent the
        // child to PID 1 (cellos-init) which keeps the agent alive for the
        // life of the cell. That is the structural reason workload
        // seccomp blocks kill/tgkill/ptrace against PIDs ≤ 2:
        //
        //   TODO seccomp gate — workload profile (config in
        //   cellos-host-firecracker) MUST install BPF that returns
        //   ERRNO=EPERM for kill/tgkill/ptrace targeting PID 1 or 2.
        //   Without that gate, a compromised workload can kill us before
        //   our channel-authenticity guarantee bites.
        process::exit(0);
    }

    // Child path. Run the agent loop. agent_main() returns Err only on
    // unrecoverable channel failures; we exit non-zero so init's silence
    // detector can transition the cell into `fail-cell` immediately,
    // rather than waiting for the host-side keep-alive timeout.
    if let Err(e) = agent_main() {
        eprintln!("cellos-telemetry: agent_main failed: {e}");
        process::exit(3);
    }
}

#[cfg(target_os = "linux")]
fn agent_main() -> Result<(), String> {
    use std::io::Write;
    use std::time::Duration;

    use cellos_telemetry::probes::{
        capability::CapabilityProbe, inotify::InotifyProbe, net_connect::NetConnectProbe,
        process::ProcWalker,
    };
    use cellos_telemetry::{
        encode_frame, probe_source, ProbeEvent, MAX_FRAME_BODY_BYTES, VMADDR_CID_HOST,
        VSOCK_TELEMETRY_PORT,
    };

    // Bounded in-process queue. At 1024 events the agent's resident set
    // stays well under 256 KiB even before we account for drops. The number
    // is conservative; Phase K bench will tune it.
    const QUEUE_CAP: usize = 1024;
    const POLL_PERIOD: Duration = Duration::from_millis(100);

    let mut vsock = open_vsock(VMADDR_CID_HOST, VSOCK_TELEMETRY_PORT)?;

    let mut proc_walker = ProcWalker::new();
    let mut cap_probe = CapabilityProbe::new();
    let mut net_probe = NetConnectProbe::new();
    // Inotify is best-effort: if no path is configured (Phase F3a doesn't
    // wire spec ingestion yet) we skip it without failing the agent.
    let mut inotify_probe: Option<InotifyProbe> = None;
    if let Ok(path) = std::env::var("CELLOS_TELEMETRY_INOTIFY_PATH") {
        match InotifyProbe::open(&path, "cellos-telemetry") {
            Ok(p) => inotify_probe = Some(p),
            Err(e) => eprintln!("cellos-telemetry: inotify open({path}) failed: {e}"),
        }
    }

    // Drop the initial `/proc` snapshot — those are processes that existed
    // before the agent started, not events we want to declare.
    let _ = proc_walker.poll();

    let mut dropped: u64 = 0;
    let mut queue: std::collections::VecDeque<ProbeEvent> = std::collections::VecDeque::new();

    loop {
        // Drain probes into the bounded queue.
        for ev in proc_walker.poll() {
            push_or_drop(&mut queue, ev, QUEUE_CAP, &mut dropped);
        }
        for ev in cap_probe.poll() {
            push_or_drop(&mut queue, ev, QUEUE_CAP, &mut dropped);
        }
        for ev in net_probe.poll() {
            push_or_drop(&mut queue, ev, QUEUE_CAP, &mut dropped);
        }
        if let Some(p) = inotify_probe.as_mut() {
            for ev in p.poll() {
                push_or_drop(&mut queue, ev, QUEUE_CAP, &mut dropped);
            }
        }

        // Flush the queue to vsock. We never block: a write that returns
        // short or errors out is treated as a drop and folded into the
        // counter — the host's silence detector picks up the channel loss
        // separately.
        while let Some(ev) = queue.pop_front() {
            let frame = match encode_frame(&ev) {
                Ok(f) => f,
                Err(_) => {
                    // Encoder rejecting our own frame means the event grew
                    // past MAX_FRAME_BODY_BYTES — count it and move on.
                    dropped = dropped.saturating_add(1);
                    continue;
                }
            };
            if vsock.write_all(&frame).is_err() {
                // Channel broke. Re-open lazily on next iteration; queue
                // pre-flush events get dropped.
                dropped = dropped.saturating_add(1);
                queue.clear();
                vsock = match open_vsock(VMADDR_CID_HOST, VSOCK_TELEMETRY_PORT) {
                    Ok(s) => s,
                    Err(_) => {
                        std::thread::sleep(POLL_PERIOD);
                        break;
                    }
                };
            }
        }

        // Periodically emit a self-declaration carrying the drop count, so
        // the supervisor's projector can render
        // `cell.observability.guest.telemetry.dropped` (ADR-0006 §5.3).
        // We piggyback on PROCESS_SPAWNED's shape — guest_pid carries our
        // own pid, guest_monotonic_ns carries `dropped` saturated to u64.
        // The supervisor has the schema mapping. `MAX_FRAME_BODY_BYTES` is
        // referenced here only to silence dead-code lints in case the
        // import path changes; remove when the dropped-counter event has
        // its own probe_source.
        let _ = MAX_FRAME_BODY_BYTES;
        let _ = probe_source::PROCESS_EXITED;

        std::thread::sleep(POLL_PERIOD);
    }
}

#[cfg(target_os = "linux")]
fn push_or_drop(
    queue: &mut std::collections::VecDeque<cellos_telemetry::ProbeEvent>,
    ev: cellos_telemetry::ProbeEvent,
    cap: usize,
    dropped: &mut u64,
) {
    if queue.len() >= cap {
        *dropped = dropped.saturating_add(1);
        return;
    }
    queue.push_back(ev);
}

/// Open a non-blocking AF_VSOCK SOCK_STREAM to the host. Returns a `File`
/// wrapping the fd so we can use `Write`'s ergonomics without bringing in
/// a vsock crate.
#[cfg(target_os = "linux")]
fn open_vsock(cid: u32, port: u32) -> Result<std::fs::File, String> {
    use std::os::unix::io::FromRawFd;

    /// AF_VSOCK constant — not exposed by libc directly, but stable on Linux.
    const AF_VSOCK: libc::c_int = 40;

    /// Layout matches `struct sockaddr_vm` from `<linux/vm_sockets.h>`.
    /// Size must be exactly 16 bytes.
    #[repr(C)]
    struct SockaddrVm {
        svm_family: u16,
        svm_reserved1: u16,
        svm_port: u32,
        svm_cid: u32,
        svm_zero: [u8; 4],
    }

    // SAFETY: socket() with valid family/type is always safe.
    let fd = unsafe { libc::socket(AF_VSOCK, libc::SOCK_STREAM, 0) };
    if fd < 0 {
        return Err(format!("socket(AF_VSOCK): errno {}", unsafe {
            *libc::__errno_location()
        }));
    }

    let addr = SockaddrVm {
        svm_family: AF_VSOCK as u16,
        svm_reserved1: 0,
        svm_port: port,
        svm_cid: cid,
        svm_zero: [0u8; 4],
    };

    // SAFETY: addr is correctly laid out; fd was just returned by socket().
    let ret = unsafe {
        libc::connect(
            fd,
            &addr as *const SockaddrVm as *const libc::sockaddr,
            core::mem::size_of::<SockaddrVm>() as libc::socklen_t,
        )
    };
    if ret < 0 {
        let errno = unsafe { *libc::__errno_location() };
        // SAFETY: close on a fd we own.
        unsafe { libc::close(fd) };
        return Err(format!(
            "vsock connect(CID={cid}, port={port}): errno {errno}"
        ));
    }

    // SAFETY: we own `fd` exclusively from here; File takes the descriptor.
    Ok(unsafe { std::fs::File::from_raw_fd(fd) })
}