Skip to main content

cellos_supervisor/sni_proxy/
spawn.rs

1//! SEC-22 Phase 2 — supervisor → cell-netns SNI proxy spawn helper.
2//!
3//! Mirrors [`crate::dns_proxy::spawn`] (W4 SEAM-1 Phase 2b). The caller
4//! constructs a [`super::SniProxyConfig`], hands it here, and gets back a
5//! [`SniProxyHandle`] tracking the OS thread that runs the proxy inside the
6//! cell's network namespace.
7//!
8//! ## Why a dedicated OS thread + a single-threaded tokio runtime
9//!
10//! `setns(2)` changes the calling thread's namespace association. Tokio's
11//! multi-threaded runtime parks worker threads in pools that we cannot
12//! safely tag with a netns; if we ran the proxy as a `tokio::spawn` task,
13//! the SNI proxy task would migrate between workers and we'd have to
14//! `setns` every worker (or none, breaking isolation). Instead we allocate
15//! a fresh `std::thread`, `setns` it once, build a `current_thread` tokio
16//! runtime that pins all SNI-proxy work to *that* thread, and let the OS
17//! reclaim the thread on exit. The thread never returns to a tokio worker.
18//!
19//! Unlike [`crate::dns_proxy`] (whose recv loop is blocking sync) the SNI
20//! proxy is async-heavy (`accept` + `peek` + `copy_bidirectional`), so we
21//! need a tokio runtime in-thread. `current_thread` keeps the runtime
22//! itself single-threaded and our setns isolation tight.
23//!
24//! ## Shutdown coordination
25//!
26//! [`super::run_one_shot`] checks an `AtomicBool` between accept calls and
27//! `accept` blocks until either a TCP connection arrives or the listener
28//! is closed. [`signal_sni_proxy_shutdown`] sets the flag *and* connects-
29//! and-drops a TCP socket to the listen address; `accept` returns the
30//! drop-stub connection, the loop observes the flag, and exits.
31
32use std::net::SocketAddr;
33use std::sync::atomic::AtomicBool;
34use std::sync::Arc;
35use std::time::Duration;
36
37use cellos_core::CloudEventV1;
38
39#[cfg(target_os = "linux")]
40use super::SniProxyConfig;
41use super::{L7DecisionEmitter, ProxyStats};
42
43/// Adapter from the proxy's synchronous [`L7DecisionEmitter`] trait to the
44/// supervisor's async [`cellos_core::ports::EventSink`] pipeline. Mirrors
45/// [`crate::dns_proxy::spawn::EventSinkEmitter`] one-for-one.
46pub struct EventSinkEmitter {
47    pub runtime_handle: tokio::runtime::Handle,
48    pub sink: Arc<dyn cellos_core::ports::EventSink>,
49    pub jsonl_sink: Option<Arc<dyn cellos_core::ports::EventSink>>,
50}
51
52impl EventSinkEmitter {
53    /// Build an emitter capturing the **current** tokio runtime handle.
54    /// MUST be called from inside an async context (the supervisor
55    /// satisfies this by constructing the emitter from inside `run()`).
56    pub fn capture_current(
57        sink: Arc<dyn cellos_core::ports::EventSink>,
58        jsonl_sink: Option<Arc<dyn cellos_core::ports::EventSink>>,
59    ) -> Self {
60        let handle = tokio::runtime::Handle::try_current()
61            .expect("EventSinkEmitter::capture_current called outside a tokio runtime context");
62        Self {
63            runtime_handle: handle,
64            sink,
65            jsonl_sink,
66        }
67    }
68}
69
70impl L7DecisionEmitter for EventSinkEmitter {
71    fn emit(&self, event: CloudEventV1) {
72        let sink = self.sink.clone();
73        let jsonl = self.jsonl_sink.clone();
74        let event_for_jsonl = event.clone();
75        self.runtime_handle.spawn(async move {
76            if let Err(e) = sink.emit(&event).await {
77                tracing::warn!(
78                    target: "cellos.supervisor.sni_proxy",
79                    error = %e,
80                    "primary sink emit failed for l7_egress_decision event"
81                );
82            }
83        });
84        if let Some(j) = jsonl {
85            self.runtime_handle.spawn(async move {
86                if let Err(e) = j.emit(&event_for_jsonl).await {
87                    tracing::warn!(
88                        target: "cellos.supervisor.sni_proxy",
89                        error = %e,
90                        "jsonl sink emit failed for l7_egress_decision event"
91                    );
92                }
93            });
94        }
95    }
96}
97
98/// Shutdown coordination handle returned by [`spawn_sni_proxy_in_netns`]
99/// — held by the supervisor and signalled at cell destroy time.
100pub struct SniProxyHandle {
101    /// Shared shutdown flag. Set to `true` before [`signal_sni_proxy_shutdown`]
102    /// to coordinate clean exit.
103    pub shutdown: Arc<AtomicBool>,
104    /// Address the proxy listener is bound to inside the cell's netns.
105    pub listen_addr: SocketAddr,
106    /// Join handle for the proxy OS thread. `Some` until `join()` consumes it.
107    #[cfg(target_os = "linux")]
108    pub thread: Option<std::thread::JoinHandle<ProxyStats>>,
109}
110
111impl SniProxyHandle {
112    /// Signal-flag-set + bounded join. Caller must already have called
113    /// [`signal_sni_proxy_shutdown`] (or otherwise woken `accept`) before
114    /// this returns. Mirrors `DnsProxyHandle::join` semantics.
115    #[cfg(target_os = "linux")]
116    pub fn join(&mut self) -> Option<ProxyStats> {
117        let handle = self.thread.take()?;
118        match handle.join() {
119            Ok(stats) => Some(stats),
120            Err(_panic) => {
121                tracing::warn!(
122                    target: "cellos.supervisor.sni_proxy",
123                    "SNI proxy thread panicked on join"
124                );
125                None
126            }
127        }
128    }
129
130    /// Stub join on non-Linux platforms — no thread was ever spawned.
131    #[cfg(not(target_os = "linux"))]
132    pub fn join(&mut self) -> Option<ProxyStats> {
133        None
134    }
135}
136
137/// Wake the proxy's `accept()` by opening + immediately dropping a TCP
138/// connection to `listen_addr`. The connection is unbuffered, the proxy's
139/// per-connection task observes the shutdown flag, drops the stream, and
140/// returns; the accept loop then re-checks the flag and exits.
141///
142/// Best-effort: on any I/O error the supervisor falls back to letting the
143/// proxy thread exit on its next accept timeout (which, since `accept` has
144/// no native timeout in tokio, would only happen if a real connection
145/// arrives — so the wake nudge is the primary mechanism).
146pub fn signal_sni_proxy_shutdown(listen_addr: SocketAddr) {
147    match std::net::TcpStream::connect_timeout(&listen_addr, Duration::from_millis(500)) {
148        Ok(stream) => {
149            // Drop without writing; accept returns to the proxy and the
150            // shutdown flag breaks the loop.
151            drop(stream);
152        }
153        Err(e) => {
154            tracing::debug!(
155                target: "cellos.supervisor.sni_proxy",
156                error = %e,
157                addr = %listen_addr,
158                "wake-up TCP connection failed; falling back to natural exit"
159            );
160        }
161    }
162}
163
164/// Linux-only: spawn the SNI proxy on a dedicated OS thread, `setns(2)` into
165/// `/proc/<child_pid>/ns/net`, bind a TCP listener at `cfg.bind_addr` in
166/// that netns, and run [`super::run_one_shot`] until `shutdown` is set.
167///
168/// Returns a [`SniProxyHandle`] carrying the join handle + the address the
169/// listener is bound to. Mirrors `dns_proxy::spawn::spawn_proxy_in_netns`
170/// — the only structural differences are:
171///
172/// - The bound socket is TCP (not UDP).
173/// - The proxy thread runs a `current_thread` tokio runtime to drive the
174///   async accept loop.
175/// - There is no separate upstream socket — `copy_bidirectional` opens
176///   per-connection upstream connections on demand.
177///
178/// **Errors:** open `/proc/<pid>/ns/net`, `setns`, or `bind` failures are
179/// returned to the caller as `std::io::Error`. The supervisor turns these
180/// into a single `l7_egress_decision` event with
181/// `reasonCode: l7_unknown_protocol` so the audit trail surfaces the gap.
182#[cfg(target_os = "linux")]
183pub fn spawn_sni_proxy_in_netns(
184    child_pid: u32,
185    cfg: SniProxyConfig,
186    emitter: Arc<dyn L7DecisionEmitter>,
187    shutdown: Arc<AtomicBool>,
188) -> std::io::Result<SniProxyHandle> {
189    use std::fs::File;
190    use std::os::unix::io::AsRawFd;
191
192    // Open the netns FD on the calling thread first so transient errors
193    // (child gone, /proc inaccessible) surface synchronously to the
194    // supervisor rather than from the spawned thread.
195    let netns_path = format!("/proc/{child_pid}/ns/net");
196    let netns_file = File::open(&netns_path)
197        .map_err(|e| std::io::Error::new(e.kind(), format!("open netns at {netns_path}: {e}")))?;
198
199    let bind_addr = cfg.bind_addr;
200    let shutdown_for_thread = shutdown.clone();
201
202    let (ready_tx, ready_rx) = std::sync::mpsc::channel::<std::io::Result<SocketAddr>>();
203
204    let thread = std::thread::Builder::new()
205        .name(format!("cellos-sni-proxy-{child_pid}"))
206        .spawn(move || {
207            // SAFETY: setns is the documented Linux syscall for moving the
208            // calling thread into the namespace referenced by `fd`. We
209            // hold `netns_file` for the lifetime of the thread; the thread
210            // never returns to a tokio worker so the netns pollution is
211            // intentional and isolated to this thread alone.
212            let setns_rc = unsafe { libc::setns(netns_file.as_raw_fd(), libc::CLONE_NEWNET) };
213            if setns_rc != 0 {
214                let err = std::io::Error::last_os_error();
215                let _ = ready_tx.send(Err(std::io::Error::new(
216                    err.kind(),
217                    format!("setns(CLONE_NEWNET) for pid={child_pid}: {err}"),
218                )));
219                return ProxyStats::default();
220            }
221
222            let runtime = match tokio::runtime::Builder::new_current_thread()
223                .enable_all()
224                .build()
225            {
226                Ok(rt) => rt,
227                Err(e) => {
228                    let _ = ready_tx.send(Err(e));
229                    return ProxyStats::default();
230                }
231            };
232
233            runtime.block_on(async move {
234                let listener = match tokio::net::TcpListener::bind(bind_addr).await {
235                    Ok(l) => l,
236                    Err(e) => {
237                        let _ = ready_tx.send(Err(std::io::Error::new(
238                            e.kind(),
239                            format!("bind sni-proxy listener at {bind_addr} in cell netns: {e}"),
240                        )));
241                        return ProxyStats::default();
242                    }
243                };
244                let actual_listen = match listener.local_addr() {
245                    Ok(a) => a,
246                    Err(e) => {
247                        let _ = ready_tx.send(Err(e));
248                        return ProxyStats::default();
249                    }
250                };
251                if ready_tx.send(Ok(actual_listen)).is_err() {
252                    return ProxyStats::default();
253                }
254                match super::run_one_shot(&cfg, listener, emitter, shutdown_for_thread).await {
255                    Ok(stats) => stats,
256                    Err(e) => {
257                        tracing::warn!(
258                            target: "cellos.supervisor.sni_proxy",
259                            error = %e,
260                            "sni-proxy run_one_shot returned with I/O error"
261                        );
262                        ProxyStats::default()
263                    }
264                }
265            })
266        })?;
267
268    let bound_addr = ready_rx
269        .recv_timeout(Duration::from_secs(2))
270        .map_err(|e| std::io::Error::other(format!("sni-proxy ready timeout: {e}")))??;
271
272    Ok(SniProxyHandle {
273        shutdown,
274        listen_addr: bound_addr,
275        thread: Some(thread),
276    })
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282    use std::sync::atomic::Ordering;
283
284    #[test]
285    fn signal_shutdown_does_not_panic_on_unbound_addr() {
286        // Sending a wake to a port nothing is listening on must not panic
287        // — connect_timeout returns ECONNREFUSED (or similar) and we
288        // log-and-continue.
289        let addr: SocketAddr = "127.0.0.1:1".parse().unwrap();
290        signal_sni_proxy_shutdown(addr);
291    }
292
293    #[test]
294    fn handle_join_returns_none_when_no_thread() {
295        let mut h = SniProxyHandle {
296            shutdown: Arc::new(AtomicBool::new(false)),
297            listen_addr: "127.0.0.1:0".parse().unwrap(),
298            #[cfg(target_os = "linux")]
299            thread: None,
300        };
301        assert!(h.join().is_none());
302        h.shutdown.store(true, Ordering::SeqCst);
303        assert!(h.shutdown.load(Ordering::SeqCst));
304    }
305}