cellos_supervisor/sni_proxy/spawn.rs
1//! SEC-22 Phase 2 — supervisor → cell-netns SNI proxy spawn helper.
2//!
3//! Mirrors [`crate::dns_proxy::spawn`] (W4 SEAM-1 Phase 2b). The caller
4//! constructs a [`super::SniProxyConfig`], hands it here, and gets back a
5//! [`SniProxyHandle`] tracking the OS thread that runs the proxy inside the
6//! cell's network namespace.
7//!
8//! ## Why a dedicated OS thread + a single-threaded tokio runtime
9//!
10//! `setns(2)` changes the calling thread's namespace association. Tokio's
11//! multi-threaded runtime parks worker threads in pools that we cannot
12//! safely tag with a netns; if we ran the proxy as a `tokio::spawn` task,
13//! the SNI proxy task would migrate between workers and we'd have to
14//! `setns` every worker (or none, breaking isolation). Instead we allocate
15//! a fresh `std::thread`, `setns` it once, build a `current_thread` tokio
16//! runtime that pins all SNI-proxy work to *that* thread, and let the OS
17//! reclaim the thread on exit. The thread never returns to a tokio worker.
18//!
19//! Unlike [`crate::dns_proxy`] (whose recv loop is blocking sync) the SNI
20//! proxy is async-heavy (`accept` + `peek` + `copy_bidirectional`), so we
21//! need a tokio runtime in-thread. `current_thread` keeps the runtime
22//! itself single-threaded and our setns isolation tight.
23//!
24//! ## Shutdown coordination
25//!
26//! [`super::run_one_shot`] checks an `AtomicBool` between accept calls and
27//! `accept` blocks until either a TCP connection arrives or the listener
28//! is closed. [`signal_sni_proxy_shutdown`] sets the flag *and* connects-
29//! and-drops a TCP socket to the listen address; `accept` returns the
30//! drop-stub connection, the loop observes the flag, and exits.
31
32use std::net::SocketAddr;
33use std::sync::atomic::AtomicBool;
34use std::sync::Arc;
35use std::time::Duration;
36
37use cellos_core::CloudEventV1;
38
39#[cfg(target_os = "linux")]
40use super::SniProxyConfig;
41use super::{L7DecisionEmitter, ProxyStats};
42
43/// Adapter from the proxy's synchronous [`L7DecisionEmitter`] trait to the
44/// supervisor's async [`cellos_core::ports::EventSink`] pipeline. Mirrors
45/// [`crate::dns_proxy::spawn::EventSinkEmitter`] one-for-one.
46pub struct EventSinkEmitter {
47 pub runtime_handle: tokio::runtime::Handle,
48 pub sink: Arc<dyn cellos_core::ports::EventSink>,
49 pub jsonl_sink: Option<Arc<dyn cellos_core::ports::EventSink>>,
50}
51
52impl EventSinkEmitter {
53 /// Build an emitter capturing the **current** tokio runtime handle.
54 /// MUST be called from inside an async context (the supervisor
55 /// satisfies this by constructing the emitter from inside `run()`).
56 pub fn capture_current(
57 sink: Arc<dyn cellos_core::ports::EventSink>,
58 jsonl_sink: Option<Arc<dyn cellos_core::ports::EventSink>>,
59 ) -> Self {
60 let handle = tokio::runtime::Handle::try_current()
61 .expect("EventSinkEmitter::capture_current called outside a tokio runtime context");
62 Self {
63 runtime_handle: handle,
64 sink,
65 jsonl_sink,
66 }
67 }
68}
69
70impl L7DecisionEmitter for EventSinkEmitter {
71 fn emit(&self, event: CloudEventV1) {
72 let sink = self.sink.clone();
73 let jsonl = self.jsonl_sink.clone();
74 let event_for_jsonl = event.clone();
75 self.runtime_handle.spawn(async move {
76 if let Err(e) = sink.emit(&event).await {
77 tracing::warn!(
78 target: "cellos.supervisor.sni_proxy",
79 error = %e,
80 "primary sink emit failed for l7_egress_decision event"
81 );
82 }
83 });
84 if let Some(j) = jsonl {
85 self.runtime_handle.spawn(async move {
86 if let Err(e) = j.emit(&event_for_jsonl).await {
87 tracing::warn!(
88 target: "cellos.supervisor.sni_proxy",
89 error = %e,
90 "jsonl sink emit failed for l7_egress_decision event"
91 );
92 }
93 });
94 }
95 }
96}
97
98/// Shutdown coordination handle returned by [`spawn_sni_proxy_in_netns`]
99/// — held by the supervisor and signalled at cell destroy time.
100pub struct SniProxyHandle {
101 /// Shared shutdown flag. Set to `true` before [`signal_sni_proxy_shutdown`]
102 /// to coordinate clean exit.
103 pub shutdown: Arc<AtomicBool>,
104 /// Address the proxy listener is bound to inside the cell's netns.
105 pub listen_addr: SocketAddr,
106 /// Join handle for the proxy OS thread. `Some` until `join()` consumes it.
107 #[cfg(target_os = "linux")]
108 pub thread: Option<std::thread::JoinHandle<ProxyStats>>,
109}
110
111impl SniProxyHandle {
112 /// Signal-flag-set + bounded join. Caller must already have called
113 /// [`signal_sni_proxy_shutdown`] (or otherwise woken `accept`) before
114 /// this returns. Mirrors `DnsProxyHandle::join` semantics.
115 #[cfg(target_os = "linux")]
116 pub fn join(&mut self) -> Option<ProxyStats> {
117 let handle = self.thread.take()?;
118 match handle.join() {
119 Ok(stats) => Some(stats),
120 Err(_panic) => {
121 tracing::warn!(
122 target: "cellos.supervisor.sni_proxy",
123 "SNI proxy thread panicked on join"
124 );
125 None
126 }
127 }
128 }
129
130 /// Stub join on non-Linux platforms — no thread was ever spawned.
131 #[cfg(not(target_os = "linux"))]
132 pub fn join(&mut self) -> Option<ProxyStats> {
133 None
134 }
135}
136
137/// Wake the proxy's `accept()` by opening + immediately dropping a TCP
138/// connection to `listen_addr`. The connection is unbuffered, the proxy's
139/// per-connection task observes the shutdown flag, drops the stream, and
140/// returns; the accept loop then re-checks the flag and exits.
141///
142/// Best-effort: on any I/O error the supervisor falls back to letting the
143/// proxy thread exit on its next accept timeout (which, since `accept` has
144/// no native timeout in tokio, would only happen if a real connection
145/// arrives — so the wake nudge is the primary mechanism).
146pub fn signal_sni_proxy_shutdown(listen_addr: SocketAddr) {
147 match std::net::TcpStream::connect_timeout(&listen_addr, Duration::from_millis(500)) {
148 Ok(stream) => {
149 // Drop without writing; accept returns to the proxy and the
150 // shutdown flag breaks the loop.
151 drop(stream);
152 }
153 Err(e) => {
154 tracing::debug!(
155 target: "cellos.supervisor.sni_proxy",
156 error = %e,
157 addr = %listen_addr,
158 "wake-up TCP connection failed; falling back to natural exit"
159 );
160 }
161 }
162}
163
164/// Linux-only: spawn the SNI proxy on a dedicated OS thread, `setns(2)` into
165/// `/proc/<child_pid>/ns/net`, bind a TCP listener at `cfg.bind_addr` in
166/// that netns, and run [`super::run_one_shot`] until `shutdown` is set.
167///
168/// Returns a [`SniProxyHandle`] carrying the join handle + the address the
169/// listener is bound to. Mirrors `dns_proxy::spawn::spawn_proxy_in_netns`
170/// — the only structural differences are:
171///
172/// - The bound socket is TCP (not UDP).
173/// - The proxy thread runs a `current_thread` tokio runtime to drive the
174/// async accept loop.
175/// - There is no separate upstream socket — `copy_bidirectional` opens
176/// per-connection upstream connections on demand.
177///
178/// **Errors:** open `/proc/<pid>/ns/net`, `setns`, or `bind` failures are
179/// returned to the caller as `std::io::Error`. The supervisor turns these
180/// into a single `l7_egress_decision` event with
181/// `reasonCode: l7_unknown_protocol` so the audit trail surfaces the gap.
182#[cfg(target_os = "linux")]
183pub fn spawn_sni_proxy_in_netns(
184 child_pid: u32,
185 cfg: SniProxyConfig,
186 emitter: Arc<dyn L7DecisionEmitter>,
187 shutdown: Arc<AtomicBool>,
188) -> std::io::Result<SniProxyHandle> {
189 use std::fs::File;
190 use std::os::unix::io::AsRawFd;
191
192 // Open the netns FD on the calling thread first so transient errors
193 // (child gone, /proc inaccessible) surface synchronously to the
194 // supervisor rather than from the spawned thread.
195 let netns_path = format!("/proc/{child_pid}/ns/net");
196 let netns_file = File::open(&netns_path)
197 .map_err(|e| std::io::Error::new(e.kind(), format!("open netns at {netns_path}: {e}")))?;
198
199 let bind_addr = cfg.bind_addr;
200 let shutdown_for_thread = shutdown.clone();
201
202 let (ready_tx, ready_rx) = std::sync::mpsc::channel::<std::io::Result<SocketAddr>>();
203
204 let thread = std::thread::Builder::new()
205 .name(format!("cellos-sni-proxy-{child_pid}"))
206 .spawn(move || {
207 // SAFETY: setns is the documented Linux syscall for moving the
208 // calling thread into the namespace referenced by `fd`. We
209 // hold `netns_file` for the lifetime of the thread; the thread
210 // never returns to a tokio worker so the netns pollution is
211 // intentional and isolated to this thread alone.
212 let setns_rc = unsafe { libc::setns(netns_file.as_raw_fd(), libc::CLONE_NEWNET) };
213 if setns_rc != 0 {
214 let err = std::io::Error::last_os_error();
215 let _ = ready_tx.send(Err(std::io::Error::new(
216 err.kind(),
217 format!("setns(CLONE_NEWNET) for pid={child_pid}: {err}"),
218 )));
219 return ProxyStats::default();
220 }
221
222 let runtime = match tokio::runtime::Builder::new_current_thread()
223 .enable_all()
224 .build()
225 {
226 Ok(rt) => rt,
227 Err(e) => {
228 let _ = ready_tx.send(Err(e));
229 return ProxyStats::default();
230 }
231 };
232
233 runtime.block_on(async move {
234 let listener = match tokio::net::TcpListener::bind(bind_addr).await {
235 Ok(l) => l,
236 Err(e) => {
237 let _ = ready_tx.send(Err(std::io::Error::new(
238 e.kind(),
239 format!("bind sni-proxy listener at {bind_addr} in cell netns: {e}"),
240 )));
241 return ProxyStats::default();
242 }
243 };
244 let actual_listen = match listener.local_addr() {
245 Ok(a) => a,
246 Err(e) => {
247 let _ = ready_tx.send(Err(e));
248 return ProxyStats::default();
249 }
250 };
251 if ready_tx.send(Ok(actual_listen)).is_err() {
252 return ProxyStats::default();
253 }
254 match super::run_one_shot(&cfg, listener, emitter, shutdown_for_thread).await {
255 Ok(stats) => stats,
256 Err(e) => {
257 tracing::warn!(
258 target: "cellos.supervisor.sni_proxy",
259 error = %e,
260 "sni-proxy run_one_shot returned with I/O error"
261 );
262 ProxyStats::default()
263 }
264 }
265 })
266 })?;
267
268 let bound_addr = ready_rx
269 .recv_timeout(Duration::from_secs(2))
270 .map_err(|e| std::io::Error::other(format!("sni-proxy ready timeout: {e}")))??;
271
272 Ok(SniProxyHandle {
273 shutdown,
274 listen_addr: bound_addr,
275 thread: Some(thread),
276 })
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282 use std::sync::atomic::Ordering;
283
284 #[test]
285 fn signal_shutdown_does_not_panic_on_unbound_addr() {
286 // Sending a wake to a port nothing is listening on must not panic
287 // — connect_timeout returns ECONNREFUSED (or similar) and we
288 // log-and-continue.
289 let addr: SocketAddr = "127.0.0.1:1".parse().unwrap();
290 signal_sni_proxy_shutdown(addr);
291 }
292
293 #[test]
294 fn handle_join_returns_none_when_no_thread() {
295 let mut h = SniProxyHandle {
296 shutdown: Arc::new(AtomicBool::new(false)),
297 listen_addr: "127.0.0.1:0".parse().unwrap(),
298 #[cfg(target_os = "linux")]
299 thread: None,
300 };
301 assert!(h.join().is_none());
302 h.shutdown.store(true, Ordering::SeqCst);
303 assert!(h.shutdown.load(Ordering::SeqCst));
304 }
305}