Skip to main content

ts_runtime/
serve.rs

1//! Stored Serve config + accept-loop runtime (`tsnet`'s `Get/SetServeConfig` + serving runtime).
2//!
3//! Go `tsnet` stores an `ipn.ServeConfig` on the node and runs one accept loop per configured
4//! tailnet port, dispatching each accepted connection per its handler (proxy / text / raw TCP
5//! forward / hand-back). This module is the faithful equivalent on the **application** netstack: a
6//! [`ServeManager`](crate::serve::ServeManager) owns the current [`ServeState`](ts_control::ServeState), one accept-loop task
7//! per bound port, and tears every loop down on drop / on the next `set`.
8//!
9//! ## Storage + reconcile (full-replace)
10//!
11//! The manager holds the current [`ServeState`](ts_control::ServeState) plus one [`tokio::task::AbortHandle`] per bound
12//! port behind a single `Arc<Mutex<Inner>>` (mirroring [`crate::fallback_tcp::FallbackTcpManager`]).
13//! [`ServeManager::set`](crate::serve::ServeManager::set) uses **full-replace** semantics: it aborts *every* existing accept loop and
14//! respawns from the new config. Go reconciles incrementally (leaving unchanged ports running); we
15//! do full-replace because it is simpler and correct, and a `SetServeConfig` is a rare control-plane
16//! operation, not a hot path. The passed [`ServeState`](ts_control::ServeState) becomes the whole config (REPLACE, matching
17//! Go). `pure_reconcile` computes the add/remove port deltas for testing and documentation, even
18//! though the live path replaces wholesale.
19//!
20//! ## TLS termination
21//!
22//! TLS-terminating ports (`ServeTarget::terminates_tls`) need a `TlsAcceptor`; the caller
23//! (`Device::set_serve_config`) obtains it **once** via the cert path and hands it in per port. The
24//! manager never builds an acceptor and never touches the cert/ACME machinery — that keeps
25//! `ts_runtime` off the cert path and lets the device fail the whole `set` closed if a cert cannot
26//! be issued (no plaintext downgrade).
27//!
28//! ## Anti-leak
29//!
30//! Every accept loop binds the **overlay** netstack only (via `Channel::tcp_listen` on the
31//! device's own tailnet IPv4) — never a host socket. The `ServeTarget::Proxy` /
32//! `ServeTarget::TcpForward` backend dial is a **local host socket** to the embedder's own backend
33//! (exactly like Go's reverse-proxy to `127.0.0.1` and like [`crate::Runtime`]'s loopback proxy) —
34//! it is intentionally NOT routed through the `ts_forwarder` exit-egress path, so the exit-node
35//! anti-leak chokepoint is untouched. A backend dial failure drops the connection (fail-closed,
36//! logged); it never falls back to anything.
37
38use std::{
39    collections::{BTreeMap, BTreeSet},
40    net::{Ipv4Addr, SocketAddr},
41    sync::{Arc, Mutex},
42};
43
44use netstack::{CreateSocket, netcore::Channel, netsock::TcpStream as OverlayStream};
45use tokio::{
46    io::{AsyncRead, AsyncWrite, AsyncWriteExt},
47    sync::{Semaphore, mpsc},
48};
49use ts_control::{ServeState, ServeTarget, tls::TlsAcceptor};
50
51/// Max concurrent in-flight connections served per bound port. Bounds the per-port spawn fan-out so
52/// a flood of accepts on one serve port cannot grow tasks (and overlay sockets) without limit;
53/// saturated => the accept loop back-pressures (stops accepting) until an in-flight conn finishes.
54/// Mirrors the loopback proxy's `MAX_CONCURRENT_CONNS` rationale (each accepted conn pins an overlay
55/// TCP socket, ~512 KiB of rx+tx buffers — see `tcp_buffer_size` in AGENTS.md).
56const MAX_SERVE_CONNS_PER_PORT: usize = 256;
57
58/// A connection handed back to the embedder for a [`ServeTarget::Accept`] port (the in-process
59/// stand-in for Go `tsnet`'s `ListenTLS`-returned `net.Listener`).
60///
61/// `stream` is already TLS-terminated (the overlay stream wrapped in `tokio_rustls`'s server
62/// `TlsStream`), boxed so the channel is target-agnostic. `port` is the serve port it arrived on so
63/// an embedder serving `Accept` on several ports can demultiplex.
64pub struct ServeAccepted {
65    /// The tailnet (overlay) port this connection was accepted on.
66    pub port: u16,
67    /// The accepted, TLS-terminated stream, ready to read/write.
68    pub stream: Box<dyn AsyncReadWrite>,
69}
70
71/// Object-safe alias for the boxed accepted stream: an `AsyncRead + AsyncWrite` the embedder drives.
72pub trait AsyncReadWrite: AsyncRead + AsyncWrite + Send + Unpin {}
73impl<T: AsyncRead + AsyncWrite + Send + Unpin> AsyncReadWrite for T {}
74
75/// Receiver side of the [`ServeTarget::Accept`] hand-back channel (mirrors a `net.Listener`'s accept
76/// queue). [`ServeManager::set`] returns one; await [`recv`](mpsc::Receiver::recv) to take the next
77/// accepted, TLS-terminated connection. Dropped/replaced when the next `set` runs.
78pub type ServeAcceptedReceiver = mpsc::Receiver<ServeAccepted>;
79
80/// A fully-resolved per-port serve plan: the target plus, for TLS-terminating targets, the acceptor
81/// the device built up-front from the cert path. The caller guarantees `acceptor.is_some()` exactly
82/// when `target.terminates_tls()` — the manager asserts this is never violated by failing the bind.
83pub struct ResolvedPort {
84    /// What to serve on this port.
85    pub target: ServeTarget,
86    /// The TLS acceptor for this port, present iff `target.terminates_tls()`.
87    pub acceptor: Option<TlsAcceptor>,
88}
89
90/// Shared manager state behind a single lock.
91struct Inner {
92    /// The currently-stored config (what [`get`](ServeManager::get) returns). Empty default until
93    /// the first `set`.
94    state: ServeState,
95    /// One accept-loop abort handle per currently-bound port. Aborting a handle stops that port's
96    /// accept loop (and, transitively, drops its listener so the overlay port is released).
97    ports: BTreeMap<u16, tokio::task::AbortHandle>,
98}
99
100impl Drop for Inner {
101    fn drop(&mut self) {
102        for h in self.ports.values() {
103            h.abort();
104        }
105    }
106}
107
108/// Owns the stored Serve config and the live per-port accept loops (`tsnet` serving runtime).
109///
110/// Built once from the application netstack [`Channel`] and the device's overlay IPv4, held by the
111/// [`crate::Runtime`]. [`set`](Self::set) replaces the whole config (full-replace reconcile);
112/// dropping the manager (with the runtime / device) aborts every accept loop.
113pub struct ServeManager {
114    inner: Arc<Mutex<Inner>>,
115    channel: Channel,
116    self_ipv4: Ipv4Addr,
117}
118
119impl ServeManager {
120    /// Build a manager bound to the application netstack `channel` and the device's own tailnet
121    /// `self_ipv4` (the overlay address every serve listener binds on). No accept loop runs until the
122    /// first [`set`](Self::set).
123    pub fn new(channel: Channel, self_ipv4: Ipv4Addr) -> Self {
124        Self {
125            inner: Arc::new(Mutex::new(Inner {
126                state: ServeState::default(),
127                ports: BTreeMap::new(),
128            })),
129            channel,
130            self_ipv4,
131        }
132    }
133
134    /// The currently-stored config (Go `GetServeConfig`); empty default if none was ever set.
135    pub fn get(&self) -> ServeState {
136        self.inner
137            .lock()
138            .unwrap_or_else(|e| e.into_inner())
139            .state
140            .clone()
141    }
142
143    /// Replace the whole Serve config (Go `SetServeConfig`, REPLACE semantics), full-replace
144    /// reconcile.
145    ///
146    /// `state` is the new config; `resolved` carries the per-port target + (for TLS ports) the
147    /// pre-built acceptor, keyed identically to `state.ports`. Aborts every existing accept loop and
148    /// spawns one per port in `resolved`. Returns a fresh [`ServeAcceptedReceiver`] delivering
149    /// connections for every [`ServeTarget::Accept`] port (empty if there are none).
150    ///
151    /// The caller is responsible for `state.validate()` and for obtaining the acceptors (failing the
152    /// whole call closed if a cert can't be issued) before calling this; the manager only binds and
153    /// dispatches.
154    pub fn set(
155        &self,
156        state: ServeState,
157        resolved: BTreeMap<u16, ResolvedPort>,
158    ) -> ServeAcceptedReceiver {
159        // A bounded channel back-pressures a slow embedder rather than buffering unboundedly.
160        let (accept_tx, accept_rx) = mpsc::channel::<ServeAccepted>(MAX_SERVE_CONNS_PER_PORT);
161
162        let mut new_ports: BTreeMap<u16, tokio::task::AbortHandle> = BTreeMap::new();
163        for (port, rp) in resolved {
164            let channel = self.channel.clone();
165            let self_ipv4 = self.self_ipv4;
166            let accept_tx = accept_tx.clone();
167            let handle = tokio::spawn(async move {
168                if let Err(e) = run_port(channel, self_ipv4, port, rp, accept_tx).await {
169                    tracing::warn!(%port, error = %e, "serve listener exited");
170                }
171            })
172            .abort_handle();
173            new_ports.insert(port, handle);
174        }
175
176        // Swap in the new state + handles under the lock; aborting the OLD handles happens when the
177        // replaced map is dropped at end of scope (after the lock is released).
178        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
179        inner.state = state;
180        let old = std::mem::replace(&mut inner.ports, new_ports);
181        drop(inner);
182        for h in old.values() {
183            h.abort();
184        }
185
186        accept_rx
187    }
188}
189
190/// Compute which ports must be added and removed to go from `current` to `next` (pure; the diff Go
191/// reconciles incrementally). The live [`ServeManager::set`] uses full-replace, but this captures
192/// the delta for tests/documentation: a port is *changed* iff its target differs, which counts as
193/// both a remove and an add.
194#[cfg_attr(not(test), allow(dead_code))]
195fn pure_reconcile(
196    current: &BTreeMap<u16, ServeTarget>,
197    next: &BTreeMap<u16, ServeTarget>,
198) -> (BTreeSet<u16>, BTreeSet<u16>) {
199    let mut to_add = BTreeSet::new();
200    let mut to_remove = BTreeSet::new();
201    for (port, target) in next {
202        match current.get(port) {
203            Some(cur) if cur == target => {}
204            _ => {
205                to_add.insert(*port);
206            }
207        }
208    }
209    for port in current.keys() {
210        match next.get(port) {
211            Some(target) if current.get(port) == Some(target) => {}
212            _ => {
213                to_remove.insert(*port);
214            }
215        }
216    }
217    (to_add, to_remove)
218}
219
220/// Accept loop for one serve port: bind the overlay listener on `(self_ipv4, port)` and dispatch
221/// each accepted connection per `rp.target`, capped at [`MAX_SERVE_CONNS_PER_PORT`] in flight.
222async fn run_port(
223    channel: Channel,
224    self_ipv4: Ipv4Addr,
225    port: u16,
226    rp: ResolvedPort,
227    accept_tx: mpsc::Sender<ServeAccepted>,
228) -> Result<(), netstack::netcore::Error> {
229    // Anti-leak: bind the OVERLAY netstack on this node's own tailnet IPv4, never a host socket.
230    let listen_addr = SocketAddr::new(self_ipv4.into(), port);
231    let listener = channel.tcp_listen(listen_addr).await?;
232    tracing::debug!(%port, "serve listener accepting");
233
234    let rp = Arc::new(rp);
235    let inflight = Arc::new(Semaphore::new(MAX_SERVE_CONNS_PER_PORT));
236
237    loop {
238        // Acquire a permit BEFORE accepting so the loop back-pressures at the cap.
239        let Ok(permit) = inflight.clone().acquire_owned().await else {
240            return Ok(());
241        };
242        let overlay = listener.accept().await?;
243
244        let rp = rp.clone();
245        let accept_tx = accept_tx.clone();
246        tokio::spawn(async move {
247            let _permit = permit; // released when this connection finishes
248            dispatch_conn(port, overlay, rp, accept_tx).await;
249        });
250    }
251}
252
253/// Dispatch one accepted overlay connection per the port's target. TLS is terminated here (once per
254/// connection) for TLS-terminating targets; failures drop the connection (fail-closed, logged).
255async fn dispatch_conn(
256    port: u16,
257    overlay: OverlayStream,
258    rp: Arc<ResolvedPort>,
259    accept_tx: mpsc::Sender<ServeAccepted>,
260) {
261    match &rp.target {
262        // Raw passthrough: NO TLS. Splice the raw overlay stream to the local backend.
263        ServeTarget::TcpForward { to } => {
264            forward_to_backend(port, overlay, to).await;
265        }
266        // TLS-terminating targets: terminate TLS once, then act on the decrypted stream.
267        _ => {
268            let Some(acceptor) = rp.acceptor.as_ref() else {
269                // The caller's contract guarantees a TLS acceptor for every TLS-terminating port;
270                // a missing one means we must never serve plaintext — drop, fail-closed.
271                tracing::warn!(%port, "serve: missing TLS acceptor for TLS port; dropping conn");
272                return;
273            };
274            let tls = match acceptor.accept(overlay).await {
275                Ok(s) => s,
276                Err(e) => {
277                    tracing::debug!(%port, error = %e, "serve: TLS handshake failed; dropping conn");
278                    return;
279                }
280            };
281            match &rp.target {
282                ServeTarget::Accept => {
283                    // Hand the TLS-terminated stream back to the embedder over the channel.
284                    let accepted = ServeAccepted {
285                        port,
286                        stream: Box::new(tls),
287                    };
288                    if accept_tx.send(accepted).await.is_err() {
289                        tracing::debug!(%port, "serve: accept receiver dropped; closing conn");
290                    }
291                }
292                // Reached DIRECTLY (no request head consumed off `tls`): a plain splice with no
293                // prefix replay — the backend sees the client's bytes verbatim.
294                ServeTarget::Proxy { to } => {
295                    proxy_to_backend(port, tls, to).await;
296                }
297                ServeTarget::Text { body } => {
298                    write_text(port, tls, body).await;
299                }
300                ServeTarget::Redirect { to, status } => {
301                    serve_redirect(port, tls, to, *status).await;
302                }
303                ServeTarget::Path { handlers } => {
304                    serve_path(port, tls, handlers).await;
305                }
306                // `TcpForward` is handled in the non-TLS arm above; nothing else terminates TLS.
307                // The wildcard covers `#[non_exhaustive]` future raw (non-TLS) variants: if one is
308                // added it must NOT silently terminate TLS here — drop it fail-closed until this
309                // dispatch is taught how to serve it.
310                other => {
311                    debug_assert!(
312                        !other.terminates_tls(),
313                        "TLS-terminating ServeTarget reached fall-through arm"
314                    );
315                    tracing::warn!(%port, "serve: unhandled ServeTarget on TLS port; dropping conn");
316                }
317            }
318        }
319    }
320}
321
322/// Reverse-proxy a TLS-terminated stream to a local host backend (Go `Proxy` handler). The backend
323/// dial is a LOCAL host socket to the embedder's own backend — never the forwarder egress path.
324///
325/// Reached DIRECTLY from [`dispatch_conn`] (no request head has been consumed off `tls`), so no
326/// prefix replay is needed — the backend sees the client's bytes verbatim via the bidirectional
327/// splice. The `Path`-nested case (where a head WAS consumed) uses [`proxy_to_backend_with_prefix`]
328/// instead.
329async fn proxy_to_backend<S>(port: u16, tls: S, to: &str)
330where
331    S: AsyncRead + AsyncWrite + Unpin,
332{
333    proxy_to_backend_with_prefix(port, tls, to, &[]).await;
334}
335
336/// Reverse-proxy a TLS-terminated stream to a local host backend, writing `prefix` to the backend
337/// FIRST (before the bidirectional splice). This replays an HTTP request head already consumed off
338/// `tls` (e.g. by [`serve_path`]'s [`read_http_head`]) so the backend sees the complete request: the
339/// consumed request line + headers, then the rest of the body/stream via the splice. An empty
340/// `prefix` is equivalent to a plain splice ([`proxy_to_backend`]). The backend dial is a LOCAL host
341/// socket — never the forwarder egress path; any failure (dial or prefix write) drops the conn
342/// fail-closed.
343async fn proxy_to_backend_with_prefix<S>(port: u16, mut tls: S, to: &str, prefix: &[u8])
344where
345    S: AsyncRead + AsyncWrite + Unpin,
346{
347    let mut backend = match tokio::net::TcpStream::connect(to).await {
348        Ok(b) => b,
349        Err(e) => {
350            tracing::debug!(%port, %to, error = %e, "serve proxy: backend dial failed; dropping conn");
351            return;
352        }
353    };
354    if !prefix.is_empty()
355        && let Err(e) = backend.write_all(prefix).await
356    {
357        tracing::debug!(%port, %to, error = %e, "serve proxy: prefix replay failed; dropping conn");
358        return;
359    }
360    if let Err(e) = tokio::io::copy_bidirectional(&mut tls, &mut backend).await {
361        tracing::debug!(%port, %to, error = %e, "serve proxy: splice ended");
362    }
363}
364
365/// Forward a RAW (non-TLS) overlay stream to a local host backend (Go `TCPForward` handler). The
366/// backend dial is a LOCAL host socket — never the forwarder egress path.
367async fn forward_to_backend(port: u16, mut overlay: OverlayStream, to: &str) {
368    let mut backend = match tokio::net::TcpStream::connect(to).await {
369        Ok(b) => b,
370        Err(e) => {
371            tracing::debug!(%port, %to, error = %e, "serve forward: backend dial failed; dropping conn");
372            return;
373        }
374    };
375    if let Err(e) = tokio::io::copy_bidirectional(&mut overlay, &mut backend).await {
376        tracing::debug!(%port, %to, error = %e, "serve forward: splice ended");
377    }
378}
379
380/// Write a fixed body to the TLS-terminated stream, flush, and close (Go `Text` handler).
381async fn write_text<S>(port: u16, mut tls: S, body: &str)
382where
383    S: AsyncRead + AsyncWrite + Unpin,
384{
385    if let Err(e) = tls.write_all(body.as_bytes()).await {
386        tracing::debug!(%port, error = %e, "serve text: write failed");
387        return;
388    }
389    if let Err(e) = tls.flush().await {
390        tracing::debug!(%port, error = %e, "serve text: flush failed");
391    }
392    drop(tls.shutdown().await);
393}
394
395/// Max bytes of an HTTP request head (request line + headers) we will buffer before giving up. A
396/// peer that never sends `\r\n\r\n` within this exact bound is dropped fail-closed (no unbounded
397/// read); the buffer is bound-checked AFTER each read, so it never exceeds this cap.
398const MAX_HTTP_HEAD: usize = 8 * 1024;
399
400/// Read the HTTP request head (up to and including `\r\n\r\n`) from a TLS-terminated stream into a
401/// buffer. Returns `(buf, header_end)` where `header_end` is the offset just past the terminator, or
402/// `None` if the peer closed early or the head exceeded [`MAX_HTTP_HEAD`]. Hand-rolled (no
403/// axum/hyper); mirrors the peerAPI router's head-read style.
404async fn read_http_head<S>(stream: &mut S) -> Option<(Vec<u8>, usize)>
405where
406    S: AsyncRead + AsyncWrite + Unpin,
407{
408    use tokio::io::AsyncReadExt;
409
410    let mut buf = Vec::with_capacity(1024);
411    let mut tmp = [0u8; 1024];
412    loop {
413        if let Some(end) = crate::peerapi_doh::find_header_end(&buf) {
414            return Some((buf, end));
415        }
416        match stream.read(&mut tmp).await {
417            Ok(0) => return None,
418            Ok(n) => {
419                buf.extend_from_slice(&tmp[..n]);
420                // Bound-check AFTER extending so the buffer never exceeds MAX_HTTP_HEAD. The
421                // terminator is re-checked at the top of the loop, so a head whose terminator lands
422                // exactly at the bound still succeeds; only a head with no terminator within
423                // MAX_HTTP_HEAD is dropped fail-closed.
424                if crate::peerapi_doh::find_header_end(&buf).is_none() && buf.len() >= MAX_HTTP_HEAD
425                {
426                    return None;
427                }
428            }
429            Err(_) => return None,
430        }
431    }
432}
433
434/// Parse the request-line path from an HTTP head. Returns the path component (without the query
435/// string), or `None` if the head is malformed. Hand-rolled; no HTTP library framing assumptions
436/// beyond the request line.
437fn request_path(buf: &[u8]) -> Option<String> {
438    let mut headers = [httparse::EMPTY_HEADER; 32];
439    let mut req = httparse::Request::new(&mut headers);
440    match req.parse(buf) {
441        Ok(_) => {}
442        Err(_) => return None,
443    }
444    let path = req.path?;
445    let raw = path.split_once('?').map(|(p, _)| p).unwrap_or(path);
446    Some(raw.to_string())
447}
448
449/// Reason phrase for a redirect status (best-effort; falls back to "Redirect").
450fn redirect_reason(status: u16) -> &'static str {
451    match status {
452        301 => "Moved Permanently",
453        302 => "Found",
454        303 => "See Other",
455        307 => "Temporary Redirect",
456        308 => "Permanent Redirect",
457        _ => "Redirect",
458    }
459}
460
461/// Write a bodyless HTTP redirect (Go `HTTPHandler` redirect) on a TLS-terminated stream, then close.
462/// Fail-closed: any write error drops the conn. No request parsing is needed — every request on a
463/// `Redirect` target gets the same response.
464async fn serve_redirect<S>(port: u16, mut tls: S, to: &str, status: u16)
465where
466    S: AsyncRead + AsyncWrite + Unpin,
467{
468    let head = format!(
469        "HTTP/1.1 {status} {reason}\r\nLocation: {to}\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
470        reason = redirect_reason(status),
471    );
472    if let Err(e) = tls.write_all(head.as_bytes()).await {
473        tracing::debug!(%port, error = %e, "serve redirect: write failed");
474        return;
475    }
476    if let Err(e) = tls.flush().await {
477        tracing::debug!(%port, error = %e, "serve redirect: flush failed");
478    }
479    drop(tls.shutdown().await);
480}
481
482/// Write a bodyless HTTP status response (e.g. `404 Not Found`) on a TLS-terminated stream, then
483/// close. Local mirror of `peerapi_doh::write_status` (which takes the concrete peerAPI stream type).
484async fn write_http_status<S>(port: u16, mut tls: S, status: &str)
485where
486    S: AsyncRead + AsyncWrite + Unpin,
487{
488    let head = format!("HTTP/1.1 {status}\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
489    if let Err(e) = tls.write_all(head.as_bytes()).await {
490        tracing::debug!(%port, error = %e, "serve path: status write failed");
491        return;
492    }
493    drop(tls.flush().await);
494    drop(tls.shutdown().await);
495}
496
497/// Serve a [`ServeTarget::Path`] mux on a TLS-terminated stream: read the request head, pick the
498/// longest-matching path prefix in `handlers`, and dispatch the matched nested target on the
499/// already-decrypted stream. Fail-closed: a malformed head, no matching prefix, or an
500/// un-dispatchable nested target ⇒ 404/drop. For a matched nested `Proxy`, the request head consumed
501/// here is replayed to the backend first (via [`proxy_to_backend_with_prefix`]) so the backend sees
502/// the complete request. Backend dial failures inside a nested `Proxy` drop the conn. Nested `Path`
503/// is rejected by `ServeState::validate`, so it is not expected here; it is dropped fail-closed if it
504/// ever reaches dispatch.
505async fn serve_path<S>(port: u16, mut tls: S, handlers: &BTreeMap<String, ServeTarget>)
506where
507    S: AsyncRead + AsyncWrite + Unpin,
508{
509    let Some((buf, _end)) = read_http_head(&mut tls).await else {
510        tracing::debug!(%port, "serve path: incomplete/oversized request head; dropping conn");
511        return;
512    };
513    let Some(path) = request_path(&buf) else {
514        write_http_status(port, tls, "400 Bad Request").await;
515        return;
516    };
517
518    // Longest-matching prefix wins.
519    let matched = handlers
520        .iter()
521        .filter(|(prefix, _)| path.starts_with(prefix.as_str()))
522        .max_by_key(|(prefix, _)| prefix.len())
523        .map(|(_, target)| target);
524
525    let Some(target) = matched else {
526        write_http_status(port, tls, "404 Not Found").await;
527        return;
528    };
529
530    match target {
531        // The request head was already consumed off `tls` by `read_http_head`; replay it (`buf`) to
532        // the backend FIRST so the backend sees the complete request (head + remaining body/stream),
533        // not a request with its first request-line+headers missing.
534        ServeTarget::Proxy { to } => proxy_to_backend_with_prefix(port, tls, to, &buf).await,
535        ServeTarget::Text { body } => write_text(port, tls, body).await,
536        ServeTarget::Redirect { to, status } => serve_redirect(port, tls, to, *status).await,
537        // Accept (no hand-back channel here), TcpForward (raw, not on a TLS path), nested Path
538        // (rejected by validate), and any future `#[non_exhaustive]` variant are not servable as a
539        // Path leaf: drop fail-closed rather than guess.
540        _ => {
541            tracing::warn!(%port, "serve path: unsupported nested target; dropping conn");
542            write_http_status(port, tls, "404 Not Found").await;
543        }
544    }
545}
546
547#[cfg(test)]
548mod tests {
549    use super::*;
550
551    fn proxy(to: &str) -> ServeTarget {
552        ServeTarget::Proxy { to: to.into() }
553    }
554
555    #[test]
556    fn cap_is_bounded() {
557        assert_eq!(MAX_SERVE_CONNS_PER_PORT, 256);
558    }
559
560    #[test]
561    fn reconcile_adds_new_ports() {
562        let current = BTreeMap::new();
563        let mut next = BTreeMap::new();
564        next.insert(443u16, ServeTarget::Accept);
565        next.insert(8443u16, proxy("127.0.0.1:8080"));
566        let (add, remove) = pure_reconcile(&current, &next);
567        assert_eq!(add, BTreeSet::from([443, 8443]));
568        assert!(remove.is_empty());
569    }
570
571    #[test]
572    fn reconcile_removes_dropped_ports() {
573        let mut current = BTreeMap::new();
574        current.insert(443u16, ServeTarget::Accept);
575        current.insert(8443u16, proxy("127.0.0.1:8080"));
576        let mut next = BTreeMap::new();
577        next.insert(443u16, ServeTarget::Accept);
578        let (add, remove) = pure_reconcile(&current, &next);
579        assert!(add.is_empty());
580        assert_eq!(remove, BTreeSet::from([8443]));
581    }
582
583    #[test]
584    fn reconcile_changed_port_is_remove_and_add() {
585        // Same port, different target => counts as both (full-replace would respawn it anyway).
586        let mut current = BTreeMap::new();
587        current.insert(443u16, proxy("127.0.0.1:8080"));
588        let mut next = BTreeMap::new();
589        next.insert(443u16, proxy("127.0.0.1:9090"));
590        let (add, remove) = pure_reconcile(&current, &next);
591        assert_eq!(add, BTreeSet::from([443]));
592        assert_eq!(remove, BTreeSet::from([443]));
593    }
594
595    #[test]
596    fn reconcile_unchanged_port_is_noop() {
597        let mut current = BTreeMap::new();
598        current.insert(443u16, ServeTarget::Accept);
599        let next = current.clone();
600        let (add, remove) = pure_reconcile(&current, &next);
601        assert!(add.is_empty());
602        assert!(remove.is_empty());
603    }
604
605    #[test]
606    fn terminates_tls_matches_dispatch_arm() {
607        // The dispatch decision (TLS vs raw) must agree with the type's own `terminates_tls`: only
608        // TcpForward is raw; Accept/Proxy/Text/Path/Redirect all terminate TLS.
609        assert!(ServeTarget::Accept.terminates_tls());
610        assert!(proxy("127.0.0.1:8080").terminates_tls());
611        assert!(ServeTarget::Text { body: "ok".into() }.terminates_tls());
612        assert!(
613            ServeTarget::Redirect {
614                to: "/elsewhere".into(),
615                status: 302,
616            }
617            .terminates_tls()
618        );
619        let mut handlers = BTreeMap::new();
620        handlers.insert("/".to_string(), proxy("127.0.0.1:8080"));
621        assert!(ServeTarget::Path { handlers }.terminates_tls());
622        assert!(
623            !ServeTarget::TcpForward {
624                to: "127.0.0.1:5000".into()
625            }
626            .terminates_tls()
627        );
628    }
629
630    #[test]
631    fn find_header_end_shared_with_peerapi_doh() {
632        // The local mirror was removed; serve dispatch now uses the shared peerAPI helper. Keep one
633        // assertion that the shared fn behaves as serve dispatch relies on (peerapi_doh owns the
634        // exhaustive coverage).
635        assert_eq!(
636            crate::peerapi_doh::find_header_end(b"GET / HTTP/1.1\r\n\r\n"),
637            Some(18)
638        );
639        assert_eq!(
640            crate::peerapi_doh::find_header_end(b"GET / HTTP/1.1\r\n"),
641            None
642        );
643    }
644
645    #[test]
646    fn request_path_strips_query() {
647        assert_eq!(
648            request_path(b"GET /api/v1?x=1 HTTP/1.1\r\nHost: h\r\n\r\n").as_deref(),
649            Some("/api/v1")
650        );
651        assert_eq!(
652            request_path(b"GET / HTTP/1.1\r\n\r\n").as_deref(),
653            Some("/")
654        );
655        assert_eq!(request_path(b"not a request").as_deref(), None);
656    }
657
658    #[test]
659    fn request_path_none_on_malformed_request_line() {
660        // No method/version framing at all => httparse rejects => None.
661        assert_eq!(request_path(b"GARBAGE\r\n\r\n").as_deref(), None);
662        // Empty buffer => incomplete => None.
663        assert_eq!(request_path(b"").as_deref(), None);
664    }
665
666    #[test]
667    fn longest_prefix_wins() {
668        // Mirror the selection serve_path performs: longest matching prefix wins.
669        let mut handlers: BTreeMap<String, ServeTarget> = BTreeMap::new();
670        handlers.insert("/".to_string(), proxy("127.0.0.1:1"));
671        handlers.insert("/api".to_string(), proxy("127.0.0.1:2"));
672        handlers.insert("/api/v2".to_string(), proxy("127.0.0.1:3"));
673
674        let pick = |path: &str| -> Option<&ServeTarget> {
675            handlers
676                .iter()
677                .filter(|(prefix, _)| path.starts_with(prefix.as_str()))
678                .max_by_key(|(prefix, _)| prefix.len())
679                .map(|(_, target)| target)
680        };
681
682        assert_eq!(pick("/api/v2/x"), Some(&proxy("127.0.0.1:3")));
683        assert_eq!(pick("/api/v1"), Some(&proxy("127.0.0.1:2")));
684        assert_eq!(pick("/other"), Some(&proxy("127.0.0.1:1")));
685    }
686
687    #[test]
688    fn redirect_reason_known_statuses() {
689        assert_eq!(redirect_reason(301), "Moved Permanently");
690        assert_eq!(redirect_reason(308), "Permanent Redirect");
691        assert_eq!(redirect_reason(399), "Redirect");
692    }
693
694    use tokio::io::{AsyncReadExt, AsyncWriteExt};
695
696    /// Read everything the server side wrote to the `client` half of a duplex until the server task
697    /// closes its end (drop/shutdown), returning it as a `String`.
698    async fn drain_to_string(mut client: tokio::io::DuplexStream) -> String {
699        let mut out = Vec::new();
700        drop(client.read_to_end(&mut out).await);
701        String::from_utf8(out).expect("server emitted valid utf8")
702    }
703
704    #[tokio::test]
705    async fn serve_redirect_emits_exact_response() {
706        let (client, server) = tokio::io::duplex(4096);
707        let t = tokio::spawn(async move {
708            serve_redirect(443, server, "/elsewhere", 302).await;
709        });
710        let got = drain_to_string(client).await;
711        t.await.unwrap();
712        assert_eq!(
713            got,
714            "HTTP/1.1 302 Found\r\nLocation: /elsewhere\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
715        );
716    }
717
718    #[tokio::test]
719    async fn write_http_status_emits_status_line() {
720        let (client, server) = tokio::io::duplex(4096);
721        let t = tokio::spawn(async move {
722            write_http_status(443, server, "404 Not Found").await;
723        });
724        let got = drain_to_string(client).await;
725        t.await.unwrap();
726        assert_eq!(
727            got,
728            "HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
729        );
730
731        let (client, server) = tokio::io::duplex(4096);
732        let t = tokio::spawn(async move {
733            write_http_status(443, server, "400 Bad Request").await;
734        });
735        let got = drain_to_string(client).await;
736        t.await.unwrap();
737        assert_eq!(
738            got,
739            "HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
740        );
741    }
742
743    #[tokio::test]
744    async fn read_http_head_reads_terminated_head() {
745        let (mut client, mut server) = tokio::io::duplex(4096);
746        client
747            .write_all(b"GET /api HTTP/1.1\r\nHost: h\r\n\r\nBODY")
748            .await
749            .unwrap();
750        drop(client);
751        let (buf, end) = read_http_head(&mut server).await.expect("complete head");
752        // `end` points just past the terminator; the head + trailing body are both buffered.
753        assert_eq!(&buf[..end], b"GET /api HTTP/1.1\r\nHost: h\r\n\r\n");
754        assert_eq!(&buf[end..], b"BODY");
755    }
756
757    #[tokio::test]
758    async fn read_http_head_none_on_early_eof() {
759        let (mut client, mut server) = tokio::io::duplex(4096);
760        client.write_all(b"GET / HTTP/1.1\r\n").await.unwrap();
761        drop(client); // EOF before the terminator
762        assert!(read_http_head(&mut server).await.is_none());
763    }
764
765    #[tokio::test]
766    async fn read_http_head_none_on_oversized_head() {
767        let (mut client, mut server) = tokio::io::duplex(64 * 1024);
768        // A head that never terminates and exceeds MAX_HTTP_HEAD must be dropped fail-closed.
769        let oversized = vec![b'a'; MAX_HTTP_HEAD + 1024];
770        client.write_all(&oversized).await.unwrap();
771        drop(client);
772        assert!(read_http_head(&mut server).await.is_none());
773    }
774
775    #[tokio::test]
776    async fn read_http_head_never_exceeds_max_head() {
777        // A terminator landing exactly at the bound still succeeds (the buffer never overshoots).
778        let (mut client, mut server) = tokio::io::duplex(MAX_HTTP_HEAD + 16);
779        let mut head = vec![b'a'; MAX_HTTP_HEAD - 4];
780        head.extend_from_slice(b"\r\n\r\n");
781        assert_eq!(head.len(), MAX_HTTP_HEAD);
782        client.write_all(&head).await.unwrap();
783        drop(client);
784        let (buf, end) = read_http_head(&mut server).await.expect("head at bound");
785        assert_eq!(end, MAX_HTTP_HEAD);
786        assert!(buf.len() <= MAX_HTTP_HEAD);
787    }
788
789    #[tokio::test]
790    async fn proxy_with_prefix_writes_prefix_before_bidi_copy() {
791        // Fix 1 regression guard: the consumed request head MUST hit the backend FIRST, before the
792        // bidirectional splice forwards the rest of the client stream. The backend is a real
793        // loopback TcpListener (the helper dials `to` via tokio TcpStream).
794        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
795        let backend_addr = listener.local_addr().unwrap();
796
797        let prefix = b"GET /api HTTP/1.1\r\nHost: h\r\n\r\n";
798        let body = b"trailing-body-bytes";
799        let backend = tokio::spawn(async move {
800            let (mut sock, _) = listener.accept().await.unwrap();
801            let mut head = vec![0u8; prefix.len()];
802            sock.read_exact(&mut head).await.unwrap();
803            let mut rest = vec![0u8; body.len()];
804            sock.read_exact(&mut rest).await.unwrap();
805            (head, rest)
806        });
807
808        // Client side of the duplex stands in for the TLS-terminated stream the helper splices.
809        let (mut client, server) = tokio::io::duplex(4096);
810        let to = backend_addr.to_string();
811        let proxy_task = tokio::spawn(async move {
812            proxy_to_backend_with_prefix(443, server, &to, prefix).await;
813        });
814
815        // Feed the rest of the request body through the splice, then close.
816        client.write_all(body).await.unwrap();
817        drop(client);
818
819        let (head, rest) = backend.await.unwrap();
820        proxy_task.await.unwrap();
821        assert_eq!(
822            head, prefix,
823            "prefix (consumed head) replayed to backend first"
824        );
825        assert_eq!(rest, body, "remaining stream spliced after the prefix");
826    }
827
828    #[tokio::test]
829    async fn serve_path_proxy_replays_consumed_head_to_backend() {
830        // End-to-end longest-prefix selection routing to a nested Proxy: the head consumed by
831        // `read_http_head` must reach the backend, proving the request is not dropped (the bug).
832        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
833        let backend_addr = listener.local_addr().unwrap();
834        let request = b"GET /api/v2/x HTTP/1.1\r\nHost: h\r\n\r\n";
835        let backend = tokio::spawn(async move {
836            let (mut sock, _) = listener.accept().await.unwrap();
837            let mut head = vec![0u8; request.len()];
838            sock.read_exact(&mut head).await.unwrap();
839            head
840        });
841
842        let mut handlers: BTreeMap<String, ServeTarget> = BTreeMap::new();
843        handlers.insert("/".to_string(), proxy("127.0.0.1:1")); // shorter prefix (not selected)
844        handlers.insert("/api/v2".to_string(), proxy(&backend_addr.to_string())); // longest match
845
846        let (mut client, server) = tokio::io::duplex(4096);
847        let path_task = tokio::spawn(async move {
848            serve_path(443, server, &handlers).await;
849        });
850        client.write_all(request).await.unwrap();
851        drop(client);
852
853        let head = backend.await.unwrap();
854        path_task.await.unwrap();
855        assert_eq!(
856            head, request,
857            "serve_path routed to the longest-prefix Proxy and replayed the consumed head"
858        );
859    }
860
861    #[tokio::test]
862    async fn serve_path_text_target_emits_body() {
863        // Longest-prefix selection routing to a nested Text target: the body is emitted verbatim.
864        let mut handlers: BTreeMap<String, ServeTarget> = BTreeMap::new();
865        handlers.insert(
866            "/".to_string(),
867            ServeTarget::Text {
868                body: "root".into(),
869            },
870        );
871        handlers.insert(
872            "/hello".to_string(),
873            ServeTarget::Text {
874                body: "hello-body".into(),
875            },
876        );
877
878        let (mut client, server) = tokio::io::duplex(4096);
879        let t = tokio::spawn(async move {
880            serve_path(443, server, &handlers).await;
881        });
882        client
883            .write_all(b"GET /hello/world HTTP/1.1\r\nHost: h\r\n\r\n")
884            .await
885            .unwrap();
886        // Keep the client half open: `read_http_head` already saw the full head, and the Text target
887        // neither reads further nor needs EOF. Drain the body the server writes + shuts down.
888        let got = drain_to_string(client).await;
889        t.await.unwrap();
890        assert_eq!(got, "hello-body");
891    }
892
893    // NOTE: a live bind+accept test needs a running netstack channel + overlay; the existing
894    // netstack-backed managers (fallback_tcp) likewise unit-test only the pure pieces (port diff,
895    // dispatch decision) and leave the bind/accept path to integration coverage. The byte-emission
896    // helpers above are exercised directly over `tokio::io::duplex` + loopback `TcpStream` backends;
897    // the bind/accept/splice path is exercised via `Device::set_serve_config` against a real device.
898}