Skip to main content

cellos_supervisor/dns_proxy/
spawn.rs

1//! SEAM-1 Phase 2b — supervisor → cell-netns DNS proxy spawn.
2//!
3//! This module owns the *placement* of [`super::run_one_shot`] inside the
4//! cell's network namespace. The proxy module itself ([`super::mod`]) is
5//! platform-neutral and contains no `setns` / `nsenter` calls; the supervisor
6//! pre-binds sockets and hands them to `run_one_shot`. Phase 2b's job is to
7//! make those sockets exist in `/proc/<child_pid>/ns/net` rather than the host
8//! netns.
9//!
10//! ## Why a dedicated OS thread
11//!
12//! `setns(2)` changes the *calling thread*'s namespace association. A
13//! `tokio::task::spawn_blocking` task does not give us a stable thread —
14//! tokio's blocking pool may reuse the worker for other tasks once we
15//! return. Even if the proxy ran sync inside `spawn_blocking`, polluting
16//! that worker's netns with the cell's namespace would corrupt every
17//! subsequent task that landed on the same worker. We therefore allocate a
18//! fresh `std::thread`, do the `setns` there, run the loop to completion,
19//! and let the OS reclaim the thread on exit. The thread never returns to
20//! a tokio worker.
21//!
22//! ## Bridging the sync emitter to async [`EventSink`]
23//!
24//! The proxy's [`super::DnsQueryEmitter`] is synchronous (it must not block
25//! the recv loop). The supervisor's [`cellos_core::ports::EventSink`] is
26//! async. [`EventSinkEmitter`] captures a [`tokio::runtime::Handle`] **before**
27//! the OS thread starts and uses `Handle::spawn` to fire-and-forget every
28//! event onto the runtime. This preserves the per-query event audit trail
29//! without blocking the recv loop on emit completion.
30//!
31//! ## Shutdown coordination
32//!
33//! `run_one_shot` already checks an `AtomicBool` between iterations and
34//! the listener has a short read_timeout. The teardown helper
35//! [`signal_proxy_shutdown`] sets the flag *and* sends a 0-byte UDP
36//! packet to the listen address — the recv loop wakes immediately,
37//! observes the flag, and returns. Without the wake packet, the loop
38//! would sit in `recv_from` for up to `read_timeout` after teardown
39//! requested shutdown.
40
41use std::net::SocketAddr;
42use std::sync::atomic::AtomicBool;
43use std::sync::Arc;
44
45use cellos_core::CloudEventV1;
46
47#[cfg(target_os = "linux")]
48use super::DnsProxyConfig;
49use super::{DnsProxyStats, DnsQueryEmitter};
50
51/// Read timeout on the proxy's listener socket. Bounds the worst-case
52/// shutdown latency when the wake packet is somehow lost (e.g. the test
53/// teardown didn't reach `signal_proxy_shutdown`). Kept short so the
54/// `AtomicBool` check happens at a deterministic cadence.
55#[cfg(target_os = "linux")]
56const LISTENER_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(200);
57
58/// Adapter from the proxy's synchronous [`DnsQueryEmitter`] trait to the
59/// supervisor's async [`cellos_core::ports::EventSink`] pipeline.
60///
61/// **Tokio-context constraint:** `runtime_handle` MUST be captured on a
62/// thread that already has a tokio runtime context (i.e. inside an `async`
63/// block on the multi-thread runtime, OR inside `spawn_blocking`). The
64/// proxy thread itself is a bare `std::thread` and has no tokio context;
65/// the captured handle is what lets it `spawn` work back onto the runtime.
66pub struct EventSinkEmitter {
67    pub runtime_handle: tokio::runtime::Handle,
68    pub sink: Arc<dyn cellos_core::ports::EventSink>,
69    pub jsonl_sink: Option<Arc<dyn cellos_core::ports::EventSink>>,
70}
71
72impl EventSinkEmitter {
73    /// Build an emitter capturing the **current** tokio runtime handle.
74    ///
75    /// Call from inside an async block (or `spawn_blocking`) — never from
76    /// a bare std thread. Panics with a clear message if no tokio runtime
77    /// is reachable, which is a programming error: this struct exists
78    /// specifically to bridge from sync threads, but the bridge itself
79    /// must be constructed where a runtime is available.
80    pub fn capture_current(
81        sink: Arc<dyn cellos_core::ports::EventSink>,
82        jsonl_sink: Option<Arc<dyn cellos_core::ports::EventSink>>,
83    ) -> Self {
84        let handle = tokio::runtime::Handle::try_current()
85            .expect("EventSinkEmitter::capture_current called outside a tokio runtime context");
86        Self {
87            runtime_handle: handle,
88            sink,
89            jsonl_sink,
90        }
91    }
92}
93
94impl DnsQueryEmitter for EventSinkEmitter {
95    fn emit(&self, event: CloudEventV1) {
96        // Fire-and-forget: the proxy recv loop runs synchronously and any
97        // backpressure here turns directly into per-query latency for the
98        // workload. The supervisor's emit() helper logs sink failures —
99        // mirror that here without blocking the proxy.
100        let sink = self.sink.clone();
101        let jsonl = self.jsonl_sink.clone();
102        let event_for_jsonl = event.clone();
103        self.runtime_handle.spawn(async move {
104            if let Err(e) = sink.emit(&event).await {
105                tracing::warn!(
106                    target: "cellos.supervisor.dns_proxy",
107                    error = %e,
108                    "primary sink emit failed for dns_query event"
109                );
110            }
111        });
112        if let Some(j) = jsonl {
113            self.runtime_handle.spawn(async move {
114                if let Err(e) = j.emit(&event_for_jsonl).await {
115                    tracing::warn!(
116                        target: "cellos.supervisor.dns_proxy",
117                        error = %e,
118                        "jsonl sink emit failed for dns_query event"
119                    );
120                }
121            });
122        }
123    }
124}
125
126/// Shutdown coordination handle returned by [`spawn_proxy_in_netns`] (Linux)
127/// — held by the supervisor and signalled at cell destroy time.
128///
129/// On non-Linux platforms the supervisor never constructs one of these; the
130/// activation predicate evaluator simply returns `None`.
131pub struct DnsProxyHandle {
132    /// Shared shutdown flag. Setting `true` signals the proxy loop to exit
133    /// at its next iteration (within `LISTENER_READ_TIMEOUT` worst case).
134    pub shutdown: Arc<AtomicBool>,
135    /// The address the proxy listener is bound to inside the cell's netns.
136    /// Used by [`signal_proxy_shutdown`] to wake the recv loop.
137    pub listen_addr: SocketAddr,
138    /// Join handle for the proxy OS thread. `Some` until `join()` consumes it.
139    #[cfg(target_os = "linux")]
140    pub thread: Option<std::thread::JoinHandle<DnsProxyStats>>,
141    /// Resolver id stamped into events; retained for diagnostics.
142    pub upstream_resolver_id: String,
143}
144
145impl DnsProxyHandle {
146    /// Signal the proxy to stop and join the OS thread. Returns the
147    /// cumulative [`DnsProxyStats`] when the thread exited cleanly, or
148    /// `None` if the thread had already been joined or panicked.
149    ///
150    /// Caller must already have set `self.shutdown` to `true` and called
151    /// [`signal_proxy_shutdown`] (or otherwise woken the recv loop).
152    /// Calling this without waking the loop will block for up to
153    /// `LISTENER_READ_TIMEOUT` — bounded but not instant.
154    #[cfg(target_os = "linux")]
155    pub fn join(&mut self) -> Option<DnsProxyStats> {
156        let handle = self.thread.take()?;
157        match handle.join() {
158            Ok(stats) => Some(stats),
159            Err(_panic) => {
160                tracing::warn!(
161                    target: "cellos.supervisor.dns_proxy",
162                    "DNS proxy thread panicked on join"
163                );
164                None
165            }
166        }
167    }
168
169    /// Stub join on non-Linux platforms — no thread was ever spawned.
170    #[cfg(not(target_os = "linux"))]
171    pub fn join(&mut self) -> Option<DnsProxyStats> {
172        None
173    }
174}
175
176/// Send a 0-byte UDP packet to `listen_addr` to wake a blocked `recv_from`.
177///
178/// The proxy's recv loop checks `shutdown` between iterations; without a
179/// wake-up the loop would sit in `recv_from` for up to `LISTENER_READ_TIMEOUT`
180/// after the supervisor flipped the flag. The wake packet collapses that
181/// window to ~milliseconds.
182///
183/// 0-byte UDP datagrams are valid and parse-rejected by `parse_query` (the
184/// header alone is 12 bytes), producing one `dns_query` event with
185/// `reasonCode: malformed_query`. This is intentional: the wake packet is
186/// indistinguishable from any other malformed input the workload could
187/// have sent and the audit trail records it the same way. **The proxy
188/// thread will NOT emit this event after the loop has already terminated**
189/// — practically, the wake packet arrives, recv returns, the loop iterates
190/// once with the shutdown flag set to true, and exits without re-emitting.
191/// (The loop checks the flag at the top of each iteration; the recv that
192/// produced the wake packet completes, and on the next iteration the flag
193/// breaks the loop.)
194///
195/// Best-effort: if the wake socket cannot be bound or sent (extremely
196/// unlikely on localhost), the supervisor logs and falls back to
197/// timeout-based shutdown.
198pub fn signal_proxy_shutdown(listen_addr: SocketAddr) {
199    // Bind on a system-chosen port on the same family as the listener.
200    let bind_str = if listen_addr.is_ipv6() {
201        "[::1]:0"
202    } else {
203        "127.0.0.1:0"
204    };
205    let waker = match std::net::UdpSocket::bind(bind_str) {
206        Ok(s) => s,
207        Err(e) => {
208            tracing::debug!(
209                target: "cellos.supervisor.dns_proxy",
210                error = %e,
211                "wake-up socket bind failed; falling back to timeout-based shutdown"
212            );
213            return;
214        }
215    };
216    if let Err(e) = waker.send_to(&[], listen_addr) {
217        tracing::debug!(
218            target: "cellos.supervisor.dns_proxy",
219            error = %e,
220            addr = %listen_addr,
221            "wake-up packet send failed; falling back to timeout-based shutdown"
222        );
223    }
224}
225
226/// Linux-only: spawn the proxy on a dedicated OS thread, `setns(2)` into
227/// `/proc/<child_pid>/ns/net`, bind UDP listener + upstream sockets in that
228/// netns, and run [`super::run_one_shot`] until `shutdown` is set.
229///
230/// Returns a [`DnsProxyHandle`] carrying the join handle + the address the
231/// listener is bound to. The caller is responsible for joining the handle
232/// during teardown ([`DnsProxyHandle::join`]) and for waking the recv loop
233/// via [`signal_proxy_shutdown`] before joining.
234///
235/// **Errors:** any failure before the loop starts (open `/proc` fd, `setns`,
236/// bind listener / upstream) is returned to the caller as
237/// `std::io::Error`. The supervisor turns this into a `dns_query` event
238/// with `reasonCode: upstream_failure` so the audit trail surfaces the
239/// spawn failure.
240///
241/// **Activation contract:** `child_pid` MUST be a live process whose
242/// `/proc/<pid>/ns/net` is the cell's network namespace. The supervisor
243/// calls this helper inside `linux_run_cell_command_isolated` immediately
244/// after `cmd.spawn()` returns and before `child.wait()` — the PID is
245/// guaranteed alive at that point (the child is on a `pre_exec` path that
246/// has not yet completed `execve`).
247#[cfg(target_os = "linux")]
248pub fn spawn_proxy_in_netns(
249    child_pid: u32,
250    cfg: DnsProxyConfig,
251    listen_addr: SocketAddr,
252    upstream_addr: SocketAddr,
253    emitter: Arc<dyn DnsQueryEmitter>,
254    shutdown: Arc<AtomicBool>,
255) -> std::io::Result<DnsProxyHandle> {
256    use std::fs::File;
257    use std::os::unix::io::AsRawFd;
258
259    // Open the netns FD on the calling thread. We pass the FD into the proxy
260    // thread; setns(2) will be called there. Opening here means errors like
261    // "child died before we could grab the namespace" surface synchronously
262    // to the supervisor rather than from a thread it would have to join to
263    // discover the failure.
264    let netns_path = format!("/proc/{child_pid}/ns/net");
265    let netns_file = File::open(&netns_path)
266        .map_err(|e| std::io::Error::new(e.kind(), format!("open netns at {netns_path}: {e}")))?;
267
268    let upstream_resolver_id = cfg.upstream_resolver_id.clone();
269    let shutdown_for_thread = shutdown.clone();
270
271    let (ready_tx, ready_rx) = std::sync::mpsc::channel::<std::io::Result<SocketAddr>>();
272
273    let thread = std::thread::Builder::new()
274        .name(format!("cellos-dns-proxy-{child_pid}"))
275        .spawn(move || {
276            // SAFETY: setns is the documented Linux syscall for moving the
277            // calling thread into the namespace referenced by `fd`. We hold
278            // `netns_file` for the lifetime of the thread so the fd remains
279            // valid until the kernel's reference is taken; the thread never
280            // returns to a tokio worker so polluting "the" thread's netns
281            // is intentional and isolated.
282            let setns_rc = unsafe { libc::setns(netns_file.as_raw_fd(), libc::CLONE_NEWNET) };
283            if setns_rc != 0 {
284                let err = std::io::Error::last_os_error();
285                let _ = ready_tx.send(Err(std::io::Error::new(
286                    err.kind(),
287                    format!("setns(CLONE_NEWNET) for pid={child_pid}: {err}"),
288                )));
289                return DnsProxyStats::default();
290            }
291            // From this point on the thread is in the cell's netns.
292            // Sockets bound here will live in that netns and the workload
293            // can reach them via the IP it sees on its loopback / declared
294            // bridge interface.
295            let listener = match std::net::UdpSocket::bind(listen_addr) {
296                Ok(s) => s,
297                Err(e) => {
298                    let _ = ready_tx.send(Err(std::io::Error::new(
299                        e.kind(),
300                        format!("bind listener at {listen_addr} in cell netns: {e}"),
301                    )));
302                    return DnsProxyStats::default();
303                }
304            };
305            if let Err(e) = listener.set_read_timeout(Some(LISTENER_READ_TIMEOUT)) {
306                let _ = ready_tx.send(Err(e));
307                return DnsProxyStats::default();
308            }
309            // Upstream socket: ephemeral port in the same netns. The upstream
310            // resolver MUST be reachable from inside this netns; SEC-22's
311            // nft path ensures the declared resolver IP/port is allowed.
312            let upstream_sock = match std::net::UdpSocket::bind(if upstream_addr.is_ipv6() {
313                "[::]:0"
314            } else {
315                "0.0.0.0:0"
316            }) {
317                Ok(s) => s,
318                Err(e) => {
319                    let _ = ready_tx.send(Err(std::io::Error::new(
320                        e.kind(),
321                        format!("bind upstream socket in cell netns: {e}"),
322                    )));
323                    return DnsProxyStats::default();
324                }
325            };
326            // Report bound address back so the supervisor can confirm
327            // before it lets the cell command run.
328            let actual_listen = match listener.local_addr() {
329                Ok(a) => a,
330                Err(e) => {
331                    let _ = ready_tx.send(Err(e));
332                    return DnsProxyStats::default();
333                }
334            };
335
336            // Slot **A5** — TCP/53 listener path in the same netns.
337            //
338            // The TCP listener binds on the SAME `listen_addr` as UDP (per
339            // RFC 1035 §4.2.2 a resolver MUST accept both transports on
340            // the declared address). We bind it from the proxy thread,
341            // which is already setns'd into the cell netns; sub-threads
342            // spawned from here inherit the parent thread's net namespace
343            // (CLONE_NEWNET is per-task and Linux clone() without it
344            // copies the nsproxy reference), so the sibling worker
345            // running `run_tcp_one_shot` sees the same netns as the UDP
346            // loop on the main thread.
347            //
348            // Failure to bind TCP is non-fatal for the UDP path — many
349            // workloads only resolve over UDP and degrading silently to
350            // UDP-only is preferable to refusing activation. The error
351            // is logged and we proceed UDP-only.
352            let tcp_listener = match std::net::TcpListener::bind(listen_addr) {
353                Ok(l) => Some(l),
354                Err(e) => {
355                    tracing::warn!(
356                        target: "cellos.supervisor.dns_proxy",
357                        error = %e,
358                        addr = %listen_addr,
359                        "TCP listener bind FAILED in cell netns — continuing UDP-only"
360                    );
361                    None
362                }
363            };
364
365            if ready_tx.send(Ok(actual_listen)).is_err() {
366                // Supervisor side dropped the receiver — abandon.
367                return DnsProxyStats::default();
368            }
369
370            // A5 — spawn the TCP loop on a sibling thread before entering
371            // the UDP loop on the current thread. The sibling inherits
372            // the netns we setns'd into above. We split this into a
373            // small helper closure so the multi-step "clone the upstream
374            // socket, build the worker config, spawn the thread" chain
375            // does not nest Options inside the OK arm of the map.
376            let tcp_worker: Option<std::thread::JoinHandle<()>> = match tcp_listener {
377                None => None,
378                Some(listener_tcp) => {
379                    match upstream_sock.try_clone() {
380                        Ok(upstream_clone) => {
381                            let cfg_tcp = cfg.clone();
382                            let emitter_tcp: Arc<dyn DnsQueryEmitter> = emitter.clone();
383                            let shutdown_tcp = shutdown_for_thread.clone();
384                            // TCP per-connection workers each forward
385                            // against the upstream socket. UDP is
386                            // stateless per query so sharing is safe.
387                            let upstream_tcp = Arc::new(upstream_clone);
388                            std::thread::Builder::new()
389                                .name(format!("cellos-dns-proxy-tcp-{child_pid}"))
390                                .spawn(move || {
391                                    if let Err(e) = super::run_tcp_one_shot(
392                                        &cfg_tcp,
393                                        &listener_tcp,
394                                        upstream_tcp,
395                                        emitter_tcp,
396                                        &shutdown_tcp,
397                                    ) {
398                                        tracing::warn!(
399                                            target: "cellos.supervisor.dns_proxy",
400                                            error = %e,
401                                            "TCP proxy accept loop returned with I/O error"
402                                        );
403                                    }
404                                })
405                                .ok()
406                        }
407                        Err(e) => {
408                            tracing::warn!(
409                                target: "cellos.supervisor.dns_proxy",
410                                error = %e,
411                                "TCP upstream socket clone FAILED — TCP path will not start"
412                            );
413                            None
414                        }
415                    }
416                }
417            };
418
419            let udp_stats = match super::run_one_shot(
420                &cfg,
421                &listener,
422                &upstream_sock,
423                emitter.as_ref(),
424                &shutdown_for_thread,
425            ) {
426                Ok(stats) => stats,
427                Err(e) => {
428                    tracing::warn!(
429                        target: "cellos.supervisor.dns_proxy",
430                        error = %e,
431                        "proxy recv loop returned with I/O error"
432                    );
433                    DnsProxyStats::default()
434                }
435            };
436
437            // A5 — UDP loop has exited (shutdown observed). Join the TCP
438            // sibling so the OS thread is reaped before we return; the
439            // TCP accept loop also polls `shutdown_for_thread` and will
440            // be exiting at this point. Stats from the TCP path are
441            // intentionally NOT merged into the returned UDP stats — the
442            // existing teardown logging consumes UDP stats only, and
443            // changing that aggregation is out of scope for A5.
444            if let Some(h) = tcp_worker {
445                let _ = h.join();
446            }
447            udp_stats
448        })?;
449
450    // Wait for the thread to confirm setns + bind succeeded before we let
451    // the supervisor proceed. Bounded — the thread either reports within
452    // ~tens of ms or something is wrong and we surface the error.
453    let bound_addr = ready_rx
454        .recv_timeout(std::time::Duration::from_secs(2))
455        .map_err(|e| std::io::Error::other(format!("proxy thread ready timeout: {e}")))??;
456
457    Ok(DnsProxyHandle {
458        shutdown,
459        listen_addr: bound_addr,
460        thread: Some(thread),
461        upstream_resolver_id,
462    })
463}
464
465#[cfg(test)]
466mod tests {
467    use super::*;
468    use std::sync::atomic::Ordering;
469    use std::sync::Mutex;
470
471    /// Minimal in-memory sink for the EventSinkEmitter test.
472    struct CountingSink {
473        count: Mutex<u64>,
474    }
475    #[async_trait::async_trait]
476    impl cellos_core::ports::EventSink for CountingSink {
477        async fn emit(&self, _event: &CloudEventV1) -> Result<(), cellos_core::error::CellosError> {
478            *self.count.lock().unwrap() += 1;
479            Ok(())
480        }
481    }
482
483    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
484    async fn event_sink_emitter_dispatches_to_runtime() {
485        let sink = Arc::new(CountingSink {
486            count: Mutex::new(0),
487        });
488        let emitter = EventSinkEmitter::capture_current(sink.clone(), None);
489        let event = CloudEventV1 {
490            specversion: "1.0".into(),
491            id: "evt-1".into(),
492            source: "test".into(),
493            ty: "test.event".into(),
494            datacontenttype: Some("application/json".into()),
495            data: Some(serde_json::json!({"k": "v"})),
496            time: Some(chrono::Utc::now().to_rfc3339()),
497            traceparent: None,
498        };
499        emitter.emit(event);
500        // Yield until the spawned task runs. fire-and-forget so we poll.
501        for _ in 0..50 {
502            if *sink.count.lock().unwrap() >= 1 {
503                break;
504            }
505            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
506        }
507        assert_eq!(
508            *sink.count.lock().unwrap(),
509            1,
510            "sink should have received one event via the runtime handle"
511        );
512    }
513
514    #[test]
515    fn signal_proxy_shutdown_does_not_panic_on_unbound_addr() {
516        // Sending a wake packet to a port nothing is listening on must not
517        // panic — UDP is fire-and-forget.
518        let addr: SocketAddr = "127.0.0.1:1".parse().unwrap();
519        signal_proxy_shutdown(addr);
520    }
521
522    #[test]
523    fn handle_join_returns_none_when_no_thread() {
524        let mut h = DnsProxyHandle {
525            shutdown: Arc::new(AtomicBool::new(false)),
526            listen_addr: "127.0.0.1:0".parse().unwrap(),
527            #[cfg(target_os = "linux")]
528            thread: None,
529            upstream_resolver_id: "test".into(),
530        };
531        assert!(h.join().is_none());
532        // shutdown is still readable/usable post-join.
533        h.shutdown.store(true, Ordering::SeqCst);
534        assert!(h.shutdown.load(Ordering::SeqCst));
535    }
536}