cellos_supervisor/dns_proxy/spawn.rs
1//! SEAM-1 Phase 2b — supervisor → cell-netns DNS proxy spawn.
2//!
3//! This module owns the *placement* of [`super::run_one_shot`] inside the
4//! cell's network namespace. The proxy module itself ([`super::mod`]) is
5//! platform-neutral and contains no `setns` / `nsenter` calls; the supervisor
6//! pre-binds sockets and hands them to `run_one_shot`. Phase 2b's job is to
7//! make those sockets exist in `/proc/<child_pid>/ns/net` rather than the host
8//! netns.
9//!
10//! ## Why a dedicated OS thread
11//!
12//! `setns(2)` changes the *calling thread*'s namespace association. A
13//! `tokio::task::spawn_blocking` task does not give us a stable thread —
14//! tokio's blocking pool may reuse the worker for other tasks once we
15//! return. Even if the proxy ran sync inside `spawn_blocking`, polluting
16//! that worker's netns with the cell's namespace would corrupt every
17//! subsequent task that landed on the same worker. We therefore allocate a
18//! fresh `std::thread`, do the `setns` there, run the loop to completion,
19//! and let the OS reclaim the thread on exit. The thread never returns to
20//! a tokio worker.
21//!
22//! ## Bridging the sync emitter to async [`EventSink`]
23//!
24//! The proxy's [`super::DnsQueryEmitter`] is synchronous (it must not block
25//! the recv loop). The supervisor's [`cellos_core::ports::EventSink`] is
26//! async. [`EventSinkEmitter`] captures a [`tokio::runtime::Handle`] **before**
27//! the OS thread starts and uses `Handle::spawn` to fire-and-forget every
28//! event onto the runtime. This preserves the per-query event audit trail
29//! without blocking the recv loop on emit completion.
30//!
31//! ## Shutdown coordination
32//!
33//! `run_one_shot` already checks an `AtomicBool` between iterations and
34//! the listener has a short read_timeout. The teardown helper
35//! [`signal_proxy_shutdown`] sets the flag *and* sends a 0-byte UDP
36//! packet to the listen address — the recv loop wakes immediately,
37//! observes the flag, and returns. Without the wake packet, the loop
38//! would sit in `recv_from` for up to `read_timeout` after teardown
39//! requested shutdown.
40
41use std::net::SocketAddr;
42use std::sync::atomic::AtomicBool;
43use std::sync::Arc;
44
45use cellos_core::CloudEventV1;
46
47#[cfg(target_os = "linux")]
48use super::DnsProxyConfig;
49use super::{DnsProxyStats, DnsQueryEmitter};
50
51/// Read timeout on the proxy's listener socket. Bounds the worst-case
52/// shutdown latency when the wake packet is somehow lost (e.g. the test
53/// teardown didn't reach `signal_proxy_shutdown`). Kept short so the
54/// `AtomicBool` check happens at a deterministic cadence.
55#[cfg(target_os = "linux")]
56const LISTENER_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(200);
57
58/// Adapter from the proxy's synchronous [`DnsQueryEmitter`] trait to the
59/// supervisor's async [`cellos_core::ports::EventSink`] pipeline.
60///
61/// **Tokio-context constraint:** `runtime_handle` MUST be captured on a
62/// thread that already has a tokio runtime context (i.e. inside an `async`
63/// block on the multi-thread runtime, OR inside `spawn_blocking`). The
64/// proxy thread itself is a bare `std::thread` and has no tokio context;
65/// the captured handle is what lets it `spawn` work back onto the runtime.
66pub struct EventSinkEmitter {
67 pub runtime_handle: tokio::runtime::Handle,
68 pub sink: Arc<dyn cellos_core::ports::EventSink>,
69 pub jsonl_sink: Option<Arc<dyn cellos_core::ports::EventSink>>,
70}
71
72impl EventSinkEmitter {
73 /// Build an emitter capturing the **current** tokio runtime handle.
74 ///
75 /// Call from inside an async block (or `spawn_blocking`) — never from
76 /// a bare std thread. Panics with a clear message if no tokio runtime
77 /// is reachable, which is a programming error: this struct exists
78 /// specifically to bridge from sync threads, but the bridge itself
79 /// must be constructed where a runtime is available.
80 pub fn capture_current(
81 sink: Arc<dyn cellos_core::ports::EventSink>,
82 jsonl_sink: Option<Arc<dyn cellos_core::ports::EventSink>>,
83 ) -> Self {
84 let handle = tokio::runtime::Handle::try_current()
85 .expect("EventSinkEmitter::capture_current called outside a tokio runtime context");
86 Self {
87 runtime_handle: handle,
88 sink,
89 jsonl_sink,
90 }
91 }
92}
93
94impl DnsQueryEmitter for EventSinkEmitter {
95 fn emit(&self, event: CloudEventV1) {
96 // Fire-and-forget: the proxy recv loop runs synchronously and any
97 // backpressure here turns directly into per-query latency for the
98 // workload. The supervisor's emit() helper logs sink failures —
99 // mirror that here without blocking the proxy.
100 let sink = self.sink.clone();
101 let jsonl = self.jsonl_sink.clone();
102 let event_for_jsonl = event.clone();
103 self.runtime_handle.spawn(async move {
104 if let Err(e) = sink.emit(&event).await {
105 tracing::warn!(
106 target: "cellos.supervisor.dns_proxy",
107 error = %e,
108 "primary sink emit failed for dns_query event"
109 );
110 }
111 });
112 if let Some(j) = jsonl {
113 self.runtime_handle.spawn(async move {
114 if let Err(e) = j.emit(&event_for_jsonl).await {
115 tracing::warn!(
116 target: "cellos.supervisor.dns_proxy",
117 error = %e,
118 "jsonl sink emit failed for dns_query event"
119 );
120 }
121 });
122 }
123 }
124}
125
126/// Shutdown coordination handle returned by [`spawn_proxy_in_netns`] (Linux)
127/// — held by the supervisor and signalled at cell destroy time.
128///
129/// On non-Linux platforms the supervisor never constructs one of these; the
130/// activation predicate evaluator simply returns `None`.
131pub struct DnsProxyHandle {
132 /// Shared shutdown flag. Setting `true` signals the proxy loop to exit
133 /// at its next iteration (within `LISTENER_READ_TIMEOUT` worst case).
134 pub shutdown: Arc<AtomicBool>,
135 /// The address the proxy listener is bound to inside the cell's netns.
136 /// Used by [`signal_proxy_shutdown`] to wake the recv loop.
137 pub listen_addr: SocketAddr,
138 /// Join handle for the proxy OS thread. `Some` until `join()` consumes it.
139 #[cfg(target_os = "linux")]
140 pub thread: Option<std::thread::JoinHandle<DnsProxyStats>>,
141 /// Resolver id stamped into events; retained for diagnostics.
142 pub upstream_resolver_id: String,
143}
144
145impl DnsProxyHandle {
146 /// Signal the proxy to stop and join the OS thread. Returns the
147 /// cumulative [`DnsProxyStats`] when the thread exited cleanly, or
148 /// `None` if the thread had already been joined or panicked.
149 ///
150 /// Caller must already have set `self.shutdown` to `true` and called
151 /// [`signal_proxy_shutdown`] (or otherwise woken the recv loop).
152 /// Calling this without waking the loop will block for up to
153 /// `LISTENER_READ_TIMEOUT` — bounded but not instant.
154 #[cfg(target_os = "linux")]
155 pub fn join(&mut self) -> Option<DnsProxyStats> {
156 let handle = self.thread.take()?;
157 match handle.join() {
158 Ok(stats) => Some(stats),
159 Err(_panic) => {
160 tracing::warn!(
161 target: "cellos.supervisor.dns_proxy",
162 "DNS proxy thread panicked on join"
163 );
164 None
165 }
166 }
167 }
168
169 /// Stub join on non-Linux platforms — no thread was ever spawned.
170 #[cfg(not(target_os = "linux"))]
171 pub fn join(&mut self) -> Option<DnsProxyStats> {
172 None
173 }
174}
175
176/// Send a 0-byte UDP packet to `listen_addr` to wake a blocked `recv_from`.
177///
178/// The proxy's recv loop checks `shutdown` between iterations; without a
179/// wake-up the loop would sit in `recv_from` for up to `LISTENER_READ_TIMEOUT`
180/// after the supervisor flipped the flag. The wake packet collapses that
181/// window to ~milliseconds.
182///
183/// 0-byte UDP datagrams are valid and parse-rejected by `parse_query` (the
184/// header alone is 12 bytes), producing one `dns_query` event with
185/// `reasonCode: malformed_query`. This is intentional: the wake packet is
186/// indistinguishable from any other malformed input the workload could
187/// have sent and the audit trail records it the same way. **The proxy
188/// thread will NOT emit this event after the loop has already terminated**
189/// — practically, the wake packet arrives, recv returns, the loop iterates
190/// once with the shutdown flag set to true, and exits without re-emitting.
191/// (The loop checks the flag at the top of each iteration; the recv that
192/// produced the wake packet completes, and on the next iteration the flag
193/// breaks the loop.)
194///
195/// Best-effort: if the wake socket cannot be bound or sent (extremely
196/// unlikely on localhost), the supervisor logs and falls back to
197/// timeout-based shutdown.
198pub fn signal_proxy_shutdown(listen_addr: SocketAddr) {
199 // Bind on a system-chosen port on the same family as the listener.
200 let bind_str = if listen_addr.is_ipv6() {
201 "[::1]:0"
202 } else {
203 "127.0.0.1:0"
204 };
205 let waker = match std::net::UdpSocket::bind(bind_str) {
206 Ok(s) => s,
207 Err(e) => {
208 tracing::debug!(
209 target: "cellos.supervisor.dns_proxy",
210 error = %e,
211 "wake-up socket bind failed; falling back to timeout-based shutdown"
212 );
213 return;
214 }
215 };
216 if let Err(e) = waker.send_to(&[], listen_addr) {
217 tracing::debug!(
218 target: "cellos.supervisor.dns_proxy",
219 error = %e,
220 addr = %listen_addr,
221 "wake-up packet send failed; falling back to timeout-based shutdown"
222 );
223 }
224}
225
226/// Linux-only: spawn the proxy on a dedicated OS thread, `setns(2)` into
227/// `/proc/<child_pid>/ns/net`, bind UDP listener + upstream sockets in that
228/// netns, and run [`super::run_one_shot`] until `shutdown` is set.
229///
230/// Returns a [`DnsProxyHandle`] carrying the join handle + the address the
231/// listener is bound to. The caller is responsible for joining the handle
232/// during teardown ([`DnsProxyHandle::join`]) and for waking the recv loop
233/// via [`signal_proxy_shutdown`] before joining.
234///
235/// **Errors:** any failure before the loop starts (open `/proc` fd, `setns`,
236/// bind listener / upstream) is returned to the caller as
237/// `std::io::Error`. The supervisor turns this into a `dns_query` event
238/// with `reasonCode: upstream_failure` so the audit trail surfaces the
239/// spawn failure.
240///
241/// **Activation contract:** `child_pid` MUST be a live process whose
242/// `/proc/<pid>/ns/net` is the cell's network namespace. The supervisor
243/// calls this helper inside `linux_run_cell_command_isolated` immediately
244/// after `cmd.spawn()` returns and before `child.wait()` — the PID is
245/// guaranteed alive at that point (the child is on a `pre_exec` path that
246/// has not yet completed `execve`).
247#[cfg(target_os = "linux")]
248pub fn spawn_proxy_in_netns(
249 child_pid: u32,
250 cfg: DnsProxyConfig,
251 listen_addr: SocketAddr,
252 upstream_addr: SocketAddr,
253 emitter: Arc<dyn DnsQueryEmitter>,
254 shutdown: Arc<AtomicBool>,
255) -> std::io::Result<DnsProxyHandle> {
256 use std::fs::File;
257 use std::os::unix::io::AsRawFd;
258
259 // Open the netns FD on the calling thread. We pass the FD into the proxy
260 // thread; setns(2) will be called there. Opening here means errors like
261 // "child died before we could grab the namespace" surface synchronously
262 // to the supervisor rather than from a thread it would have to join to
263 // discover the failure.
264 let netns_path = format!("/proc/{child_pid}/ns/net");
265 let netns_file = File::open(&netns_path)
266 .map_err(|e| std::io::Error::new(e.kind(), format!("open netns at {netns_path}: {e}")))?;
267
268 let upstream_resolver_id = cfg.upstream_resolver_id.clone();
269 let shutdown_for_thread = shutdown.clone();
270
271 let (ready_tx, ready_rx) = std::sync::mpsc::channel::<std::io::Result<SocketAddr>>();
272
273 let thread = std::thread::Builder::new()
274 .name(format!("cellos-dns-proxy-{child_pid}"))
275 .spawn(move || {
276 // SAFETY: setns is the documented Linux syscall for moving the
277 // calling thread into the namespace referenced by `fd`. We hold
278 // `netns_file` for the lifetime of the thread so the fd remains
279 // valid until the kernel's reference is taken; the thread never
280 // returns to a tokio worker so polluting "the" thread's netns
281 // is intentional and isolated.
282 let setns_rc = unsafe { libc::setns(netns_file.as_raw_fd(), libc::CLONE_NEWNET) };
283 if setns_rc != 0 {
284 let err = std::io::Error::last_os_error();
285 let _ = ready_tx.send(Err(std::io::Error::new(
286 err.kind(),
287 format!("setns(CLONE_NEWNET) for pid={child_pid}: {err}"),
288 )));
289 return DnsProxyStats::default();
290 }
291 // From this point on the thread is in the cell's netns.
292 // Sockets bound here will live in that netns and the workload
293 // can reach them via the IP it sees on its loopback / declared
294 // bridge interface.
295 let listener = match std::net::UdpSocket::bind(listen_addr) {
296 Ok(s) => s,
297 Err(e) => {
298 let _ = ready_tx.send(Err(std::io::Error::new(
299 e.kind(),
300 format!("bind listener at {listen_addr} in cell netns: {e}"),
301 )));
302 return DnsProxyStats::default();
303 }
304 };
305 if let Err(e) = listener.set_read_timeout(Some(LISTENER_READ_TIMEOUT)) {
306 let _ = ready_tx.send(Err(e));
307 return DnsProxyStats::default();
308 }
309 // Upstream socket: ephemeral port in the same netns. The upstream
310 // resolver MUST be reachable from inside this netns; SEC-22's
311 // nft path ensures the declared resolver IP/port is allowed.
312 let upstream_sock = match std::net::UdpSocket::bind(if upstream_addr.is_ipv6() {
313 "[::]:0"
314 } else {
315 "0.0.0.0:0"
316 }) {
317 Ok(s) => s,
318 Err(e) => {
319 let _ = ready_tx.send(Err(std::io::Error::new(
320 e.kind(),
321 format!("bind upstream socket in cell netns: {e}"),
322 )));
323 return DnsProxyStats::default();
324 }
325 };
326 // Report bound address back so the supervisor can confirm
327 // before it lets the cell command run.
328 let actual_listen = match listener.local_addr() {
329 Ok(a) => a,
330 Err(e) => {
331 let _ = ready_tx.send(Err(e));
332 return DnsProxyStats::default();
333 }
334 };
335
336 // Slot **A5** — TCP/53 listener path in the same netns.
337 //
338 // The TCP listener binds on the SAME `listen_addr` as UDP (per
339 // RFC 1035 §4.2.2 a resolver MUST accept both transports on
340 // the declared address). We bind it from the proxy thread,
341 // which is already setns'd into the cell netns; sub-threads
342 // spawned from here inherit the parent thread's net namespace
343 // (CLONE_NEWNET is per-task and Linux clone() without it
344 // copies the nsproxy reference), so the sibling worker
345 // running `run_tcp_one_shot` sees the same netns as the UDP
346 // loop on the main thread.
347 //
348 // Failure to bind TCP is non-fatal for the UDP path — many
349 // workloads only resolve over UDP and degrading silently to
350 // UDP-only is preferable to refusing activation. The error
351 // is logged and we proceed UDP-only.
352 let tcp_listener = match std::net::TcpListener::bind(listen_addr) {
353 Ok(l) => Some(l),
354 Err(e) => {
355 tracing::warn!(
356 target: "cellos.supervisor.dns_proxy",
357 error = %e,
358 addr = %listen_addr,
359 "TCP listener bind FAILED in cell netns — continuing UDP-only"
360 );
361 None
362 }
363 };
364
365 if ready_tx.send(Ok(actual_listen)).is_err() {
366 // Supervisor side dropped the receiver — abandon.
367 return DnsProxyStats::default();
368 }
369
370 // A5 — spawn the TCP loop on a sibling thread before entering
371 // the UDP loop on the current thread. The sibling inherits
372 // the netns we setns'd into above. We split this into a
373 // small helper closure so the multi-step "clone the upstream
374 // socket, build the worker config, spawn the thread" chain
375 // does not nest Options inside the OK arm of the map.
376 let tcp_worker: Option<std::thread::JoinHandle<()>> = match tcp_listener {
377 None => None,
378 Some(listener_tcp) => {
379 match upstream_sock.try_clone() {
380 Ok(upstream_clone) => {
381 let cfg_tcp = cfg.clone();
382 let emitter_tcp: Arc<dyn DnsQueryEmitter> = emitter.clone();
383 let shutdown_tcp = shutdown_for_thread.clone();
384 // TCP per-connection workers each forward
385 // against the upstream socket. UDP is
386 // stateless per query so sharing is safe.
387 let upstream_tcp = Arc::new(upstream_clone);
388 std::thread::Builder::new()
389 .name(format!("cellos-dns-proxy-tcp-{child_pid}"))
390 .spawn(move || {
391 if let Err(e) = super::run_tcp_one_shot(
392 &cfg_tcp,
393 &listener_tcp,
394 upstream_tcp,
395 emitter_tcp,
396 &shutdown_tcp,
397 ) {
398 tracing::warn!(
399 target: "cellos.supervisor.dns_proxy",
400 error = %e,
401 "TCP proxy accept loop returned with I/O error"
402 );
403 }
404 })
405 .ok()
406 }
407 Err(e) => {
408 tracing::warn!(
409 target: "cellos.supervisor.dns_proxy",
410 error = %e,
411 "TCP upstream socket clone FAILED — TCP path will not start"
412 );
413 None
414 }
415 }
416 }
417 };
418
419 let udp_stats = match super::run_one_shot(
420 &cfg,
421 &listener,
422 &upstream_sock,
423 emitter.as_ref(),
424 &shutdown_for_thread,
425 ) {
426 Ok(stats) => stats,
427 Err(e) => {
428 tracing::warn!(
429 target: "cellos.supervisor.dns_proxy",
430 error = %e,
431 "proxy recv loop returned with I/O error"
432 );
433 DnsProxyStats::default()
434 }
435 };
436
437 // A5 — UDP loop has exited (shutdown observed). Join the TCP
438 // sibling so the OS thread is reaped before we return; the
439 // TCP accept loop also polls `shutdown_for_thread` and will
440 // be exiting at this point. Stats from the TCP path are
441 // intentionally NOT merged into the returned UDP stats — the
442 // existing teardown logging consumes UDP stats only, and
443 // changing that aggregation is out of scope for A5.
444 if let Some(h) = tcp_worker {
445 let _ = h.join();
446 }
447 udp_stats
448 })?;
449
450 // Wait for the thread to confirm setns + bind succeeded before we let
451 // the supervisor proceed. Bounded — the thread either reports within
452 // ~tens of ms or something is wrong and we surface the error.
453 let bound_addr = ready_rx
454 .recv_timeout(std::time::Duration::from_secs(2))
455 .map_err(|e| std::io::Error::other(format!("proxy thread ready timeout: {e}")))??;
456
457 Ok(DnsProxyHandle {
458 shutdown,
459 listen_addr: bound_addr,
460 thread: Some(thread),
461 upstream_resolver_id,
462 })
463}
464
465#[cfg(test)]
466mod tests {
467 use super::*;
468 use std::sync::atomic::Ordering;
469 use std::sync::Mutex;
470
471 /// Minimal in-memory sink for the EventSinkEmitter test.
472 struct CountingSink {
473 count: Mutex<u64>,
474 }
475 #[async_trait::async_trait]
476 impl cellos_core::ports::EventSink for CountingSink {
477 async fn emit(&self, _event: &CloudEventV1) -> Result<(), cellos_core::error::CellosError> {
478 *self.count.lock().unwrap() += 1;
479 Ok(())
480 }
481 }
482
483 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
484 async fn event_sink_emitter_dispatches_to_runtime() {
485 let sink = Arc::new(CountingSink {
486 count: Mutex::new(0),
487 });
488 let emitter = EventSinkEmitter::capture_current(sink.clone(), None);
489 let event = CloudEventV1 {
490 specversion: "1.0".into(),
491 id: "evt-1".into(),
492 source: "test".into(),
493 ty: "test.event".into(),
494 datacontenttype: Some("application/json".into()),
495 data: Some(serde_json::json!({"k": "v"})),
496 time: Some(chrono::Utc::now().to_rfc3339()),
497 traceparent: None,
498 };
499 emitter.emit(event);
500 // Yield until the spawned task runs. fire-and-forget so we poll.
501 for _ in 0..50 {
502 if *sink.count.lock().unwrap() >= 1 {
503 break;
504 }
505 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
506 }
507 assert_eq!(
508 *sink.count.lock().unwrap(),
509 1,
510 "sink should have received one event via the runtime handle"
511 );
512 }
513
514 #[test]
515 fn signal_proxy_shutdown_does_not_panic_on_unbound_addr() {
516 // Sending a wake packet to a port nothing is listening on must not
517 // panic — UDP is fire-and-forget.
518 let addr: SocketAddr = "127.0.0.1:1".parse().unwrap();
519 signal_proxy_shutdown(addr);
520 }
521
522 #[test]
523 fn handle_join_returns_none_when_no_thread() {
524 let mut h = DnsProxyHandle {
525 shutdown: Arc::new(AtomicBool::new(false)),
526 listen_addr: "127.0.0.1:0".parse().unwrap(),
527 #[cfg(target_os = "linux")]
528 thread: None,
529 upstream_resolver_id: "test".into(),
530 };
531 assert!(h.join().is_none());
532 // shutdown is still readable/usable post-join.
533 h.shutdown.store(true, Ordering::SeqCst);
534 assert!(h.shutdown.load(Ordering::SeqCst));
535 }
536}