cellos-supervisor 0.5.1

CellOS execution-cell runner — boots cells in Firecracker microVMs or gVisor, enforces narrow typed authority, emits signed CloudEvents.
Documentation
//! Command runner — the `run_cell_command` / `run_cell_command_tokio` pair
//! split out from `supervisor.rs` (P0-2).
//!
//! `run_cell_command` is the cfg-gated dispatcher: on Linux, when any of the
//! optional isolation knobs are configured (cgroup attach, unshare, seccomp),
//! it forwards to [`crate::linux_isolation::linux_run_cell_command_isolated`];
//! otherwise it falls through to [`run_cell_command_tokio`], the cleared-env
//! tokio shim used on every non-isolated path (Linux and non-Linux unix and
//! Windows).
//!
//! Pure code-move from `supervisor.rs`. No logic changes.

use std::time::{Duration, Instant};

use cellos_core::ports::CellHandle;
use cellos_core::{EgressRule, RunSpec, SecretView};

use crate::runtime_secret::RuntimeSecretSession;
use crate::supervisor::{DnsProxyActivation, SniProxyActivation};
use crate::supervisor_helpers::{effective_run_timeout, run_timeout_message, RunTimeoutSource};

/// MVP subprocess shim: **cleared environment** — no inherited secrets; see `docs/red-lines.md`.
///
/// On **Linux**, optional cgroup v2 (`CellHandle.cgroup_path` and/or
/// [`crate::linux_isolation::linux_subprocess_cgroup_parent`]),
/// [`crate::linux_isolation::linux_subprocess_unshare_flags`], and/or seccomp
/// ([`crate::linux_seccomp::load_seccomp_program_from_env`]): cgroup
/// `cgroup.procs` is written **after** `spawn` from the parent; `unshare`
/// (+ loopback ioctl when `net`, private workspace when `mnt`) runs in the
/// child `pre_exec`, then optional seccomp before `execve`; nft rules
/// applied via `nsenter` after spawn when `CLONE_NEWNET` is active.
#[cfg(unix)]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn run_cell_command(
    run: &RunSpec,
    ttl_seconds: u64,
    handle: &CellHandle,
    egress_rules: &[EgressRule],
    dns_authority: Option<&cellos_core::DnsAuthority>,
    secrets: &[SecretView],
    runtime_secret_session: Option<&RuntimeSecretSession>,
    dns_proxy_activation: Option<DnsProxyActivation>,
    dns_proxy_emitter: Option<std::sync::Arc<dyn crate::dns_proxy::DnsQueryEmitter>>,
    sni_proxy_activation: Option<SniProxyActivation>,
    sni_proxy_emitter: Option<std::sync::Arc<dyn crate::sni_proxy::L7DecisionEmitter>>,
    // T3.C / E7 — per-flow real-time listener: (event_sink, run_id).
    // None when the supervisor doesn't have a sink wired or the run_id
    // wasn't materialised yet. Threaded through `linux_run_cell_command_isolated`
    // so the closure can spawn the in-netns listener alongside dns_proxy /
    // sni_proxy when `CELLOS_FIRECRACKER_PER_FLOW_EBPF=1` is set.
    per_flow_realtime: Option<(std::sync::Arc<dyn cellos_core::ports::EventSink>, String)>,
    // L5-15 — shared FlowAccumulator the in-netns nflog listener records
    // into. The supervisor builds the accumulator once per run, hands a
    // clone to the listener, and reads `unique_flow_count()` after the
    // workload exits to stamp `exercised_egress_connections` on the
    // homeostasis signal. `None` when `CELLOS_PER_FLOW_REALTIME=1` is off
    // — the listener degrades to its prior behaviour.
    flow_accumulator: Option<
        std::sync::Arc<std::sync::Mutex<crate::ebpf_flow::connection_tracking::FlowAccumulator>>,
    >,
) -> (
    i32,
    u64,
    Option<String>,
    Option<bool>,
    Option<String>,
    Option<String>,
    Vec<crate::nft_counters::NftCounterRow>,
) {
    let run_timeout = match effective_run_timeout(run, ttl_seconds) {
        Ok(t) => t,
        Err(msg) => return (-1, 0, Some(msg), None, None, None, Vec::new()),
    };
    #[cfg(target_os = "linux")]
    {
        let attach = crate::linux_isolation::linux_cgroup_attach_for_run(handle);
        let unshare = crate::linux_isolation::linux_subprocess_unshare_flags();
        let seccomp_program = match crate::linux_seccomp::load_seccomp_program_from_env() {
            Ok(p) => p,
            Err(e) => {
                return (
                    -1,
                    0,
                    Some(format!("seccomp: {e}")),
                    None,
                    None,
                    None,
                    Vec::new(),
                );
            }
        };
        if !matches!(attach, crate::linux_isolation::LinuxCgroupAttach::None)
            || unshare.is_some()
            || seccomp_program.is_some()
        {
            return crate::linux_isolation::linux_run_cell_command_isolated(
                run,
                &handle.cell_id,
                attach,
                unshare,
                egress_rules,
                dns_authority.cloned(),
                secrets,
                runtime_secret_session,
                run_timeout,
                seccomp_program,
                dns_proxy_activation,
                dns_proxy_emitter,
                sni_proxy_activation,
                sni_proxy_emitter,
                per_flow_realtime,
                flow_accumulator,
            )
            .await;
        }
    }
    #[cfg(not(target_os = "linux"))]
    {
        crate::linux_isolation::maybe_warn_linux_isolation_env_on_non_linux();
        let _ = handle;
        let _ = egress_rules;
        let _ = dns_authority;
        let _ = dns_proxy_activation;
        let _ = dns_proxy_emitter;
        let _ = sni_proxy_activation;
        let _ = sni_proxy_emitter;
        let _ = per_flow_realtime;
        let _ = flow_accumulator;
        // FC-38 Phase 1 is Linux-only — nft does not exist on macOS / Windows.
        // When the per-flow flag is set on a non-Linux supervisor host, log a
        // single tracing::warn so operators see WHY no events are being emitted.
        if std::env::var("CELLOS_PER_FLOW_ENFORCEMENT_EVENTS").as_deref() == Ok("1") {
            tracing::warn!(
                target: "cellos.supervisor.per_flow_enforcement",
                "CELLOS_PER_FLOW_ENFORCEMENT_EVENTS=1 is set but supervisor host is not Linux — \
                 per-flow network_flow_decision events require nft and are not emitted on this platform"
            );
        }
    }
    let (code, ms, err) =
        run_cell_command_tokio(run, run_timeout, secrets, runtime_secret_session).await;
    (code, ms, err, None, None, None, Vec::new())
}

#[cfg(not(unix))]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn run_cell_command(
    run: &RunSpec,
    ttl_seconds: u64,
    _handle: &CellHandle,
    _egress_rules: &[EgressRule],
    _dns_authority: Option<&cellos_core::DnsAuthority>,
    secrets: &[SecretView],
    _runtime_secret_session: Option<&RuntimeSecretSession>,
    _dns_proxy_activation: Option<DnsProxyActivation>,
    _dns_proxy_emitter: Option<std::sync::Arc<dyn crate::dns_proxy::DnsQueryEmitter>>,
    _sni_proxy_activation: Option<SniProxyActivation>,
    _sni_proxy_emitter: Option<std::sync::Arc<dyn crate::sni_proxy::L7DecisionEmitter>>,
    _per_flow_realtime: Option<(std::sync::Arc<dyn cellos_core::ports::EventSink>, String)>,
    _flow_accumulator: Option<
        std::sync::Arc<std::sync::Mutex<crate::ebpf_flow::connection_tracking::FlowAccumulator>>,
    >,
) -> (
    i32,
    u64,
    Option<String>,
    Option<bool>,
    Option<String>,
    Option<String>,
    Vec<crate::nft_counters::NftCounterRow>,
) {
    let run_timeout = match effective_run_timeout(run, ttl_seconds) {
        Ok(t) => t,
        Err(msg) => return (-1, 0, Some(msg), None, None, None, Vec::new()),
    };
    if std::env::var("CELLOS_PER_FLOW_ENFORCEMENT_EVENTS").as_deref() == Ok("1") {
        tracing::warn!(
            target: "cellos.supervisor.per_flow_enforcement",
            "CELLOS_PER_FLOW_ENFORCEMENT_EVENTS=1 is set but supervisor host is not unix/linux — \
             per-flow network_flow_decision events require nft and are not emitted on this platform"
        );
    }
    let (code, ms, err) = run_cell_command_tokio(run, run_timeout, secrets, None).await;
    (code, ms, err, None, None, None, Vec::new())
}

pub(crate) async fn run_cell_command_tokio(
    run: &RunSpec,
    run_timeout: Option<(Duration, RunTimeoutSource)>,
    secrets: &[SecretView],
    runtime_secret_session: Option<&RuntimeSecretSession>,
) -> (i32, u64, Option<String>) {
    let start = Instant::now();
    let mut cmd = tokio::process::Command::new(&run.argv[0]);
    if run.argv.len() > 1 {
        cmd.args(&run.argv[1..]);
    }
    cmd.env_clear();
    #[cfg(unix)]
    cmd.env("PATH", "/usr/bin:/bin:/usr/local/bin");
    // Fixed PATH — do not inherit host PATH (search-order hijack); matches red-lines / SEC-06.
    #[cfg(windows)]
    cmd.env("PATH", r"C:\Windows\System32;C:\Windows");
    if let Some(session) = runtime_secret_session {
        for (key, value) in session.env_pairs() {
            cmd.env(key, value);
        }
    } else {
        for s in secrets {
            cmd.env(&s.key, s.value.as_str());
        }
    }
    if let Some(wd) = &run.working_directory {
        cmd.current_dir(wd);
    }

    let mut child = match cmd.spawn() {
        Ok(c) => c,
        Err(e) => {
            let duration_ms = start.elapsed().as_millis() as u64;
            return (-1, duration_ms, Some(e.to_string()));
        }
    };

    if let Some((timeout, source)) = run_timeout {
        tokio::select! {
            status = child.wait() => {
                let duration_ms = start.elapsed().as_millis() as u64;
                match status {
                    Ok(status) => (status.code().unwrap_or(-1), duration_ms, None),
                    Err(e) => (-1, duration_ms, Some(e.to_string())),
                }
            }
            _ = tokio::time::sleep(timeout) => {
                let _ = child.start_kill();
                let _ = child.wait().await;
                let duration_ms = start.elapsed().as_millis() as u64;
                (-1, duration_ms, Some(run_timeout_message(timeout, source)))
            }
        }
    } else {
        match child.wait().await {
            Ok(status) => {
                let duration_ms = start.elapsed().as_millis() as u64;
                (status.code().unwrap_or(-1), duration_ms, None)
            }
            Err(e) => {
                let duration_ms = start.elapsed().as_millis() as u64;
                (-1, duration_ms, Some(e.to_string()))
            }
        }
    }
}