cellos-supervisor 0.5.1

CellOS execution-cell runner — boots cells in Firecracker microVMs or gVisor, enforces narrow typed authority, emits signed CloudEvents.
Documentation
//! SEC-22 Phase 2 — supervisor → cell-netns SNI proxy spawn helper.
//!
//! Mirrors [`crate::dns_proxy::spawn`] (W4 SEAM-1 Phase 2b). The caller
//! constructs a [`super::SniProxyConfig`], hands it here, and gets back a
//! [`SniProxyHandle`] tracking the OS thread that runs the proxy inside the
//! cell's network namespace.
//!
//! ## Why a dedicated OS thread + a single-threaded tokio runtime
//!
//! `setns(2)` changes the calling thread's namespace association. Tokio's
//! multi-threaded runtime parks worker threads in pools that we cannot
//! safely tag with a netns; if we ran the proxy as a `tokio::spawn` task,
//! the SNI proxy task would migrate between workers and we'd have to
//! `setns` every worker (or none, breaking isolation). Instead we allocate
//! a fresh `std::thread`, `setns` it once, build a `current_thread` tokio
//! runtime that pins all SNI-proxy work to *that* thread, and let the OS
//! reclaim the thread on exit. The thread never returns to a tokio worker.
//!
//! Unlike [`crate::dns_proxy`] (whose recv loop is blocking sync) the SNI
//! proxy is async-heavy (`accept` + `peek` + `copy_bidirectional`), so we
//! need a tokio runtime in-thread. `current_thread` keeps the runtime
//! itself single-threaded and our setns isolation tight.
//!
//! ## Shutdown coordination
//!
//! [`super::run_one_shot`] checks an `AtomicBool` between accept calls and
//! `accept` blocks until either a TCP connection arrives or the listener
//! is closed. [`signal_sni_proxy_shutdown`] sets the flag *and* connects-
//! and-drops a TCP socket to the listen address; `accept` returns the
//! drop-stub connection, the loop observes the flag, and exits.

use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;

use cellos_core::CloudEventV1;

#[cfg(target_os = "linux")]
use super::SniProxyConfig;
use super::{L7DecisionEmitter, ProxyStats};

/// Adapter from the proxy's synchronous [`L7DecisionEmitter`] trait to the
/// supervisor's async [`cellos_core::ports::EventSink`] pipeline. Mirrors
/// [`crate::dns_proxy::spawn::EventSinkEmitter`] one-for-one.
pub struct EventSinkEmitter {
    pub runtime_handle: tokio::runtime::Handle,
    pub sink: Arc<dyn cellos_core::ports::EventSink>,
    pub jsonl_sink: Option<Arc<dyn cellos_core::ports::EventSink>>,
}

impl EventSinkEmitter {
    /// Build an emitter capturing the **current** tokio runtime handle.
    /// MUST be called from inside an async context (the supervisor
    /// satisfies this by constructing the emitter from inside `run()`).
    pub fn capture_current(
        sink: Arc<dyn cellos_core::ports::EventSink>,
        jsonl_sink: Option<Arc<dyn cellos_core::ports::EventSink>>,
    ) -> Self {
        let handle = tokio::runtime::Handle::try_current()
            .expect("EventSinkEmitter::capture_current called outside a tokio runtime context");
        Self {
            runtime_handle: handle,
            sink,
            jsonl_sink,
        }
    }
}

impl L7DecisionEmitter for EventSinkEmitter {
    fn emit(&self, event: CloudEventV1) {
        let sink = self.sink.clone();
        let jsonl = self.jsonl_sink.clone();
        let event_for_jsonl = event.clone();
        self.runtime_handle.spawn(async move {
            if let Err(e) = sink.emit(&event).await {
                tracing::warn!(
                    target: "cellos.supervisor.sni_proxy",
                    error = %e,
                    "primary sink emit failed for l7_egress_decision event"
                );
            }
        });
        if let Some(j) = jsonl {
            self.runtime_handle.spawn(async move {
                if let Err(e) = j.emit(&event_for_jsonl).await {
                    tracing::warn!(
                        target: "cellos.supervisor.sni_proxy",
                        error = %e,
                        "jsonl sink emit failed for l7_egress_decision event"
                    );
                }
            });
        }
    }
}

/// Shutdown coordination handle returned by [`spawn_sni_proxy_in_netns`]
/// — held by the supervisor and signalled at cell destroy time.
pub struct SniProxyHandle {
    /// Shared shutdown flag. Set to `true` before [`signal_sni_proxy_shutdown`]
    /// to coordinate clean exit.
    pub shutdown: Arc<AtomicBool>,
    /// Address the proxy listener is bound to inside the cell's netns.
    pub listen_addr: SocketAddr,
    /// Join handle for the proxy OS thread. `Some` until `join()` consumes it.
    #[cfg(target_os = "linux")]
    pub thread: Option<std::thread::JoinHandle<ProxyStats>>,
}

impl SniProxyHandle {
    /// Signal-flag-set + bounded join. Caller must already have called
    /// [`signal_sni_proxy_shutdown`] (or otherwise woken `accept`) before
    /// this returns. Mirrors `DnsProxyHandle::join` semantics.
    #[cfg(target_os = "linux")]
    pub fn join(&mut self) -> Option<ProxyStats> {
        let handle = self.thread.take()?;
        match handle.join() {
            Ok(stats) => Some(stats),
            Err(_panic) => {
                tracing::warn!(
                    target: "cellos.supervisor.sni_proxy",
                    "SNI proxy thread panicked on join"
                );
                None
            }
        }
    }

    /// Stub join on non-Linux platforms — no thread was ever spawned.
    #[cfg(not(target_os = "linux"))]
    pub fn join(&mut self) -> Option<ProxyStats> {
        None
    }
}

/// Wake the proxy's `accept()` by opening + immediately dropping a TCP
/// connection to `listen_addr`. The connection is unbuffered, the proxy's
/// per-connection task observes the shutdown flag, drops the stream, and
/// returns; the accept loop then re-checks the flag and exits.
///
/// Best-effort: on any I/O error the supervisor falls back to letting the
/// proxy thread exit on its next accept timeout (which, since `accept` has
/// no native timeout in tokio, would only happen if a real connection
/// arrives — so the wake nudge is the primary mechanism).
pub fn signal_sni_proxy_shutdown(listen_addr: SocketAddr) {
    match std::net::TcpStream::connect_timeout(&listen_addr, Duration::from_millis(500)) {
        Ok(stream) => {
            // Drop without writing; accept returns to the proxy and the
            // shutdown flag breaks the loop.
            drop(stream);
        }
        Err(e) => {
            tracing::debug!(
                target: "cellos.supervisor.sni_proxy",
                error = %e,
                addr = %listen_addr,
                "wake-up TCP connection failed; falling back to natural exit"
            );
        }
    }
}

/// Linux-only: spawn the SNI proxy on a dedicated OS thread, `setns(2)` into
/// `/proc/<child_pid>/ns/net`, bind a TCP listener at `cfg.bind_addr` in
/// that netns, and run [`super::run_one_shot`] until `shutdown` is set.
///
/// Returns a [`SniProxyHandle`] carrying the join handle + the address the
/// listener is bound to. Mirrors `dns_proxy::spawn::spawn_proxy_in_netns`
/// — the only structural differences are:
///
/// - The bound socket is TCP (not UDP).
/// - The proxy thread runs a `current_thread` tokio runtime to drive the
///   async accept loop.
/// - There is no separate upstream socket — `copy_bidirectional` opens
///   per-connection upstream connections on demand.
///
/// **Errors:** open `/proc/<pid>/ns/net`, `setns`, or `bind` failures are
/// returned to the caller as `std::io::Error`. The supervisor turns these
/// into a single `l7_egress_decision` event with
/// `reasonCode: l7_unknown_protocol` so the audit trail surfaces the gap.
#[cfg(target_os = "linux")]
pub fn spawn_sni_proxy_in_netns(
    child_pid: u32,
    cfg: SniProxyConfig,
    emitter: Arc<dyn L7DecisionEmitter>,
    shutdown: Arc<AtomicBool>,
) -> std::io::Result<SniProxyHandle> {
    use std::fs::File;
    use std::os::unix::io::AsRawFd;

    // Open the netns FD on the calling thread first so transient errors
    // (child gone, /proc inaccessible) surface synchronously to the
    // supervisor rather than from the spawned thread.
    let netns_path = format!("/proc/{child_pid}/ns/net");
    let netns_file = File::open(&netns_path)
        .map_err(|e| std::io::Error::new(e.kind(), format!("open netns at {netns_path}: {e}")))?;

    let bind_addr = cfg.bind_addr;
    let shutdown_for_thread = shutdown.clone();

    let (ready_tx, ready_rx) = std::sync::mpsc::channel::<std::io::Result<SocketAddr>>();

    let thread = std::thread::Builder::new()
        .name(format!("cellos-sni-proxy-{child_pid}"))
        .spawn(move || {
            // SAFETY: setns is the documented Linux syscall for moving the
            // calling thread into the namespace referenced by `fd`. We
            // hold `netns_file` for the lifetime of the thread; the thread
            // never returns to a tokio worker so the netns pollution is
            // intentional and isolated to this thread alone.
            let setns_rc = unsafe { libc::setns(netns_file.as_raw_fd(), libc::CLONE_NEWNET) };
            if setns_rc != 0 {
                let err = std::io::Error::last_os_error();
                let _ = ready_tx.send(Err(std::io::Error::new(
                    err.kind(),
                    format!("setns(CLONE_NEWNET) for pid={child_pid}: {err}"),
                )));
                return ProxyStats::default();
            }

            let runtime = match tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
            {
                Ok(rt) => rt,
                Err(e) => {
                    let _ = ready_tx.send(Err(e));
                    return ProxyStats::default();
                }
            };

            runtime.block_on(async move {
                let listener = match tokio::net::TcpListener::bind(bind_addr).await {
                    Ok(l) => l,
                    Err(e) => {
                        let _ = ready_tx.send(Err(std::io::Error::new(
                            e.kind(),
                            format!("bind sni-proxy listener at {bind_addr} in cell netns: {e}"),
                        )));
                        return ProxyStats::default();
                    }
                };
                let actual_listen = match listener.local_addr() {
                    Ok(a) => a,
                    Err(e) => {
                        let _ = ready_tx.send(Err(e));
                        return ProxyStats::default();
                    }
                };
                if ready_tx.send(Ok(actual_listen)).is_err() {
                    return ProxyStats::default();
                }
                match super::run_one_shot(&cfg, listener, emitter, shutdown_for_thread).await {
                    Ok(stats) => stats,
                    Err(e) => {
                        tracing::warn!(
                            target: "cellos.supervisor.sni_proxy",
                            error = %e,
                            "sni-proxy run_one_shot returned with I/O error"
                        );
                        ProxyStats::default()
                    }
                }
            })
        })?;

    let bound_addr = ready_rx
        .recv_timeout(Duration::from_secs(2))
        .map_err(|e| std::io::Error::other(format!("sni-proxy ready timeout: {e}")))??;

    Ok(SniProxyHandle {
        shutdown,
        listen_addr: bound_addr,
        thread: Some(thread),
    })
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::atomic::Ordering;

    #[test]
    fn signal_shutdown_does_not_panic_on_unbound_addr() {
        // Sending a wake to a port nothing is listening on must not panic
        // — connect_timeout returns ECONNREFUSED (or similar) and we
        // log-and-continue.
        let addr: SocketAddr = "127.0.0.1:1".parse().unwrap();
        signal_sni_proxy_shutdown(addr);
    }

    #[test]
    fn handle_join_returns_none_when_no_thread() {
        let mut h = SniProxyHandle {
            shutdown: Arc::new(AtomicBool::new(false)),
            listen_addr: "127.0.0.1:0".parse().unwrap(),
            #[cfg(target_os = "linux")]
            thread: None,
        };
        assert!(h.join().is_none());
        h.shutdown.store(true, Ordering::SeqCst);
        assert!(h.shutdown.load(Ordering::SeqCst));
    }
}