Skip to main content

symforge/server/
serve.rs

1//! `symforge serve` async entrypoint and socket binding.
2//!
3//! Phase-2 scope (this file): resolve the API key, compute loopback, enforce the
4//! secure-default refuse-to-start rule, and provide [`bind_listener`] (the
5//! socket2 + `SO_REUSEADDR` bind, mirroring [`crate::sidecar::server::spawn_sidecar`]).
6//!
7//! US1/T013-T016 will extend [`run`] to build the [`ServerRuntime`], mount the
8//! `/mcp` router + auth layer, print the attach URL, and run with graceful
9//! shutdown. For now [`run`] performs the secure-startup checks and returns
10//! `Ok(())` with a "not yet fully implemented" notice — the secure-default
11//! behavior (refuse-to-start, key resolution, loopback computation) is already
12//! real and exercised before any listener is opened.
13
14use std::net::SocketAddr;
15use std::sync::Arc;
16
17use parking_lot::Mutex;
18
19use super::api_keys::ApiKeyStore;
20use super::auth::{AuthConfig, AuthLayerState, OriginLayerState};
21use super::{ServerRuntime, admin, mcp_http};
22use crate::live_index::{LiveIndex, SharedIndex};
23use crate::protocol::SymForgeServer;
24use crate::sidecar::governor::RequestGovernor;
25use crate::stel::ledger_store::StelLedgerStore;
26use crate::watcher::WatcherInfo;
27
28/// The default `--listen` bind address (loopback, fixed product port).
29pub const DEFAULT_LISTEN: &str = "127.0.0.1:8787";
30
31/// Resolved inputs for the `serve` subcommand.
32#[derive(Debug, Clone)]
33pub struct ServeArgs {
34    /// `HOST:PORT` to bind. `PORT=0` requests an OS-assigned port.
35    pub listen: String,
36    /// Whether the operator explicitly supplied `--listen` (US1, FR-002/003).
37    ///
38    /// `false` means `listen` is the historical default ([`DEFAULT_LISTEN`]):
39    /// serve prefers it but, if occupied, falls back to an OS-assigned free port
40    /// rather than failing (no dead second listener). `true` means the operator
41    /// chose the address, so it is honored exactly and an occupied port fails
42    /// loudly (no silent substitution).
43    pub explicit_listen: bool,
44    /// Inline API key (`--api-key`).
45    pub api_key: Option<String>,
46    /// Name of an env var holding the API key (`--api-key-env`); used only when
47    /// `api_key` is `None`.
48    pub api_key_env: Option<String>,
49}
50
51impl Default for ServeArgs {
52    fn default() -> Self {
53        Self {
54            listen: DEFAULT_LISTEN.to_string(),
55            explicit_listen: false,
56            api_key: None,
57            api_key_env: None,
58        }
59    }
60}
61
62/// Resolve the effective API key: `--api-key` wins, else `--api-key-env` (read
63/// from the environment), else `None`.
64pub fn resolve_api_key(api_key: Option<&str>, api_key_env: Option<&str>) -> Option<String> {
65    if let Some(key) = api_key
66        && !key.is_empty()
67    {
68        return Some(key.to_string());
69    }
70    if let Some(var) = api_key_env
71        && let Ok(value) = std::env::var(var)
72        && !value.is_empty()
73    {
74        return Some(value);
75    }
76    None
77}
78
79/// Enforce the inline-`--api-key` source policy (P2-E).
80///
81/// An inline `--api-key` is visible in process listings (`ps` / Windows Task
82/// Manager / `/proc/<pid>/cmdline`), so it is a secret-leak vector on a routable
83/// bind. This applies two rules, mirroring the secure-default refuse-to-start
84/// rule but for the *key source* rather than the key's presence:
85///
86/// 1. **Warn** whenever a non-empty inline `--api-key` is used (any bind),
87///    recommending `--api-key-env <VAR>` which keeps the secret out of argv.
88/// 2. **Refuse** an inline `--api-key` on a **non-loopback** (network) bind:
89///    a routable bind must source the key from the environment. Loopback binds
90///    may still accept an inline key for local convenience.
91///
92/// `is_loopback` is computed by the caller from the parsed bind address. The
93/// warning is emitted via `tracing::warn!` AND to stderr so an operator running
94/// without a tracing subscriber still sees it. Returns
95/// [`AuthStartupError::InlineKeyOnNonLoopback`] on a refused config so `run`
96/// exits before binding.
97pub fn enforce_api_key_source_policy(
98    api_key: Option<&str>,
99    is_loopback: bool,
100) -> Result<(), super::auth::AuthStartupError> {
101    let inline_present = matches!(api_key, Some(key) if !key.is_empty());
102    if !inline_present {
103        return Ok(());
104    }
105
106    if !is_loopback {
107        // Routable bind + inline key: refuse before binding. The operator must
108        // pass --api-key-env so the secret never lands in argv.
109        return Err(super::auth::AuthStartupError::InlineKeyOnNonLoopback);
110    }
111
112    // Loopback + inline key: allowed, but warn — inline keys are visible in
113    // process listings even locally; --api-key-env is the recommended path.
114    let msg = "WARNING: --api-key was passed inline; it is visible in process listings \
115        (ps / Task Manager). Prefer --api-key-env <VAR> so the secret stays out of argv.";
116    tracing::warn!("{msg}");
117    eprintln!("{msg}");
118    Ok(())
119}
120
121/// Whether a parsed bind address is loopback (`127.0.0.0/8` or `::1`).
122// REVIEW P3-D (deferred): `IpAddr::is_loopback()` is `false` for an IPv4-mapped
123// IPv6 loopback (`[::ffff:127.0.0.1]`). This is currently safe — with a key it
124// binds (fine); without a key it refuses (secure default). Optional future fix:
125// normalize an IPv4-mapped loopback to its IPv4 form before the policy check.
126/// Whether a parsed bind address is loopback (`127.0.0.0/8` or `::1`).
127///
128/// P3-D (resolved): an IPv4-mapped IPv6 loopback (`[::ffff:127.0.0.1]`) is
129/// normalized to its IPv4 form before the check, so it is correctly treated as
130/// loopback (matching operator intent) rather than as a routable bind.
131pub fn is_loopback_addr(addr: &SocketAddr) -> bool {
132    let ip = match addr.ip() {
133        std::net::IpAddr::V6(v6) => v6
134            .to_ipv4_mapped()
135            .map(std::net::IpAddr::V4)
136            .unwrap_or(std::net::IpAddr::V6(v6)),
137        other => other,
138    };
139    ip.is_loopback()
140}
141
142/// Bind a [`tokio::net::TcpListener`] on `addr` with `SO_REUSEADDR`.
143///
144/// Mirrors the socket setup in [`crate::sidecar::server::spawn_sidecar`]:
145/// create a `socket2::Socket`, set `SO_REUSEADDR` (so a TIME_WAIT socket on the
146/// chosen port does not block the bind under rapid restarts / parallel test
147/// fan-out), set non-blocking, bind, listen with backlog 1024, then hand the
148/// std socket to tokio. Unlike the sidecar (which always binds `:0`), this
149/// honors the operator-chosen port from `--listen` (and `:0` for tests).
150pub fn bind_listener(addr: SocketAddr) -> std::io::Result<tokio::net::TcpListener> {
151    let domain = if addr.is_ipv4() {
152        socket2::Domain::IPV4
153    } else {
154        socket2::Domain::IPV6
155    };
156    let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?;
157    socket.set_reuse_address(true)?;
158    socket.set_nonblocking(true)?;
159    socket.bind(&addr.into())?;
160    socket.listen(1024)?;
161    let std_listener: std::net::TcpListener = socket.into();
162    tokio::net::TcpListener::from_std(std_listener)
163}
164
165/// Operator-friendly listen ports, in selection order. Corporate networks
166/// routinely permit the 8000/5000 ranges but block ephemeral high ports, so a
167/// no-explicit-port serve/admin start MUST resolve here and never drift to an
168/// OS-assigned `:0` port (the 61850 problem). A short curated spread comes
169/// first (memorable, non-adjacent), then the full 8000-8999 and 5000-5999
170/// ranges guarantee a free port if one exists.
171/// ponytail: ~2000 candidates; widen the ranges only if both ever fill.
172pub(crate) fn operator_port_candidates() -> impl Iterator<Item = u16> {
173    const SPREAD: [u16; 6] = [8080, 8088, 8181, 8585, 8686, 8989];
174    SPREAD
175        .into_iter()
176        .chain(8000u16..=8999)
177        .chain(5000u16..=5999)
178}
179
180/// Bind a verified-free listener, preferring `preferred` (US1, D1).
181///
182/// The race-free free-port primitive: when `preferred` is `Some` and non-zero,
183/// attempt to bind it via [`bind_listener`]; on success return that live
184/// listener. On a bind failure (the port is occupied), or when `preferred` is
185/// `None`, bind the first free [`operator_port_candidates`] port (8000-8999 then
186/// 5000-5999) — never an OS-assigned ephemeral port, which corporate networks
187/// routinely block (the 61850 problem). An explicit `:0` `preferred` is honored
188/// verbatim (callers/tests that truly want an OS port). Each candidate bind is
189/// atomic, so the returned listener is guaranteed-free with no check-then-bind
190/// TOCTOU gap; the caller serves directly on it, so reported URL == bound URL
191/// (FR-020).
192///
193/// This is the production path: it never drops and rebinds, so there is no
194/// window for another process to steal the chosen port. The thin
195/// [`probe_free_port`] wrapper exists for the decision-logic unit tests and
196/// callers that only need the resolved address.
197pub fn probe_free_listener(
198    preferred: Option<SocketAddr>,
199) -> std::io::Result<tokio::net::TcpListener> {
200    if let Some(addr) = preferred {
201        // Explicit ephemeral (`:0`) is honored verbatim.
202        if addr.port() == 0 {
203            return bind_listener(addr);
204        }
205        if let Ok(listener) = bind_listener(addr) {
206            return Ok(listener);
207        }
208    }
209    // No preference, or the preferred port was occupied: bind the first free
210    // operator port in the corporate-friendly ranges. NEVER an OS ephemeral port.
211    for port in operator_port_candidates() {
212        let addr = SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), port);
213        if let Ok(listener) = bind_listener(addr) {
214            return Ok(listener);
215        }
216    }
217    Err(std::io::Error::new(
218        std::io::ErrorKind::AddrInUse,
219        "no free operator port available in 8000-8999 or 5000-5999",
220    ))
221}
222
223/// Resolve a verified-free [`SocketAddr`], preferring `preferred` (US1, D1).
224///
225/// Wraps [`probe_free_listener`]: tries `preferred` via a real bind, else binds
226/// `127.0.0.1:0` for an OS-assigned port, and returns the listener's
227/// `local_addr()`. The probe listener is dropped before returning, leaving a
228/// small rebind window — a second process could, in principle, grab the freed
229/// port between this call and the caller's rebind. Production serve uses
230/// [`probe_free_listener`] (which threads the live listener through, eliminating
231/// the window); this address-returning form is for callers that only need the
232/// decision (e.g. a "suggested free port" in the setup wizard) and the unit
233/// tests asserting the selection logic without holding a listener.
234///
235/// Signature per `contracts/free-port.md` / T005.
236pub fn probe_free_port(preferred: Option<SocketAddr>) -> std::io::Result<SocketAddr> {
237    let listener = probe_free_listener(preferred)?;
238    listener.local_addr()
239}
240
241/// Error returned by [`run`] before/while binding the operator server.
242#[derive(Debug, thiserror::Error)]
243pub enum ServeError {
244    /// `--listen` could not be parsed as a `HOST:PORT` socket address.
245    #[error("invalid --listen address {addr:?}: {source}")]
246    InvalidListen {
247        addr: String,
248        #[source]
249        source: std::net::AddrParseError,
250    },
251    /// Secure-default policy refused the requested bind (non-loopback, no key).
252    #[error(transparent)]
253    Startup(#[from] super::auth::AuthStartupError),
254    /// The socket could not be bound (e.g. address already in use).
255    #[error("failed to bind {addr}: {source}")]
256    Bind {
257        addr: SocketAddr,
258        #[source]
259        source: std::io::Error,
260    },
261    /// The project index could not be loaded for serving.
262    #[error("failed to load project index for serving: {source}")]
263    IndexLoad {
264        #[source]
265        source: anyhow::Error,
266    },
267    /// The axum server returned an error while serving.
268    #[error("operator server error: {source}")]
269    Serve {
270        #[source]
271        source: std::io::Error,
272    },
273}
274
275/// Build the in-process [`SharedIndex`] for serving.
276///
277/// Resolves the project root (`discovery::find_project_root`) and loads it
278/// synchronously (the same `LiveIndex::load` the stdio local path uses). When no
279/// safe root is found, serves over an empty index — `tools/list` still responds,
280/// and the operator can `index_folder` after attaching. Returns the index and
281/// the resolved root (for the STEL ledger store location and the project name).
282fn load_serve_index() -> Result<(SharedIndex, Option<std::path::PathBuf>), ServeError> {
283    match crate::discovery::find_project_root() {
284        Some(root) => {
285            let index =
286                LiveIndex::load(&root).map_err(|source| ServeError::IndexLoad { source })?;
287            Ok((index, Some(root)))
288        }
289        None => Ok((LiveIndex::empty(), None)),
290    }
291}
292
293/// Construct the [`ServerRuntime`] from a resolved index + auth config.
294///
295/// Builds the **same** [`SymForgeServer`] the stdio path constructs (one shared
296/// dispatcher, no logic fork), a default [`RequestGovernor`], and opens the
297/// durable STEL [`StelLedgerStore`] under the project's symforge data dir. If the
298/// store cannot open it degrades to [`StelLedgerStore::Disabled`] (serving
299/// continues — FR-011). When there is no project root the ledger store is left
300/// `None` (no data dir to anchor it).
301///
302/// US3/T028+T029: the durable store is opened **before** the protocol
303/// dispatcher so the same `Arc<StelLedgerStore>` is shared into both the
304/// dispatcher (write-through on each `symforge`/`symforge_edit` invocation via
305/// `finalize_symforge_with_ledger`) and the [`ServerRuntime`] (read path for
306/// the `status` summary). One physical store = one durable ledger path, so no
307/// economics row is counted twice across the in-memory and durable sinks.
308fn build_serve_runtime(
309    index: SharedIndex,
310    repo_root: Option<std::path::PathBuf>,
311    auth: AuthConfig,
312) -> ServerRuntime {
313    let project_name = repo_root
314        .as_ref()
315        .and_then(|root| root.file_name())
316        .and_then(|name| name.to_str())
317        .unwrap_or("project")
318        .to_string();
319
320    // US3: open the durable economics ledger under the project data dir FIRST,
321    // so the opened handle can be shared with both the dispatcher and the
322    // runtime. A failure here degrades to Disabled inside `StelLedgerStore::open`,
323    // so the server still starts (FR-011).
324    let ledger_store: Option<Arc<StelLedgerStore>> =
325        repo_root
326            .as_ref()
327            .and_then(|root| match crate::paths::ensure_symforge_dir(root) {
328                Ok(dir) => Some(Arc::new(StelLedgerStore::open(
329                    &dir,
330                    format!("serve-{}", std::process::id()),
331                ))),
332                Err(error) => {
333                    tracing::warn!(
334                        root = %root.display(),
335                        %error,
336                        "could not ensure symforge data dir; STEL ledger will not persist"
337                    );
338                    None
339                }
340            });
341
342    let watcher_info = Arc::new(Mutex::new(WatcherInfo::default()));
343    let mut protocol = SymForgeServer::new(
344        Arc::clone(&index),
345        project_name,
346        watcher_info,
347        repo_root.clone(),
348        None,
349    );
350    // Share the SAME store allocation with the dispatcher so durable
351    // write-through (T028) and the runtime summary read (T029) use one path.
352    if let Some(store) = ledger_store.as_ref() {
353        protocol = protocol.with_stel_ledger_store(Arc::clone(store));
354    }
355    let protocol = Arc::new(protocol);
356    // P2-F (resolved): the governor is now consulted on the `/mcp` HTTP path.
357    // `mcp_http::build_mcp_router` wraps the route with `apply_governor`, which
358    // acquires one concurrency permit per request from this shared governor and
359    // releases it on completion — bounding concurrent operator clients to
360    // `max_concurrency` (queued/shed with 503 beyond that). No longer dead.
361    let governor = Arc::new(RequestGovernor::new());
362
363    // The runtime holds a clone of the same underlying store (the `Sqlite`
364    // variant shares its `Arc<Mutex<Connection>>`), so `status`'s `summary()`
365    // observes exactly the rows the dispatcher wrote — surviving restart.
366    let runtime_store = ledger_store.map(|store| (*store).clone());
367
368    // 006 G-039: open the hashed product API-key store under the same data dir.
369    // On failure it degrades to `Disabled` (bootstrap --api-key still works).
370    // Shared by Arc into both the auth layer (minted keys authenticate at /mcp)
371    // and the admin /api/v1/keys handlers.
372    let key_store: Option<Arc<ApiKeyStore>> =
373        repo_root
374            .as_ref()
375            .and_then(|root| match crate::paths::ensure_symforge_dir(root) {
376                Ok(dir) => Some(Arc::new(ApiKeyStore::open(&dir))),
377                Err(error) => {
378                    tracing::warn!(
379                        root = %root.display(),
380                        %error,
381                        "could not ensure symforge data dir; API-key store unavailable"
382                    );
383                    None
384                }
385            });
386
387    let mut runtime = ServerRuntime::build_runtime(index, protocol, governor, auth, runtime_store);
388    if let Some(store) = key_store {
389        runtime = runtime.with_key_store(store);
390    }
391    runtime
392}
393
394/// `symforge serve` entrypoint (US1 — `/mcp` over Streamable HTTP).
395///
396/// Resolves the key, parses `--listen`, computes loopback, and enforces the
397/// refuse-to-start rule **before** opening any socket. On a permitted config it
398/// loads the project index, builds the [`ServerRuntime`] (the same shared
399/// `SymForgeServer` stdio uses — no logic fork), mounts the `/mcp` Streamable
400/// HTTP router with the Bearer auth layer in front, prints the attach URL to
401/// stdout, and runs one long-lived server until a shutdown signal arrives with
402/// graceful drain: SIGINT (Ctrl+C) on all platforms, plus SIGTERM on Unix so the
403/// server drains under Docker/K8s/systemd (P2-B).
404pub async fn run(args: ServeArgs) -> Result<(), ServeError> {
405    let api_key = resolve_api_key(args.api_key.as_deref(), args.api_key_env.as_deref());
406    let auth = AuthConfig::new(api_key);
407
408    let addr: SocketAddr = args
409        .listen
410        .parse()
411        .map_err(|source| ServeError::InvalidListen {
412            addr: args.listen.clone(),
413            source,
414        })?;
415    let is_loopback = is_loopback_addr(&addr);
416
417    // P2-E: enforce the inline-key source policy before binding — warn on any
418    // inline --api-key (recommend --api-key-env), and refuse an inline key on a
419    // non-loopback bind (the secret would be visible in process listings).
420    enforce_api_key_source_policy(args.api_key.as_deref(), is_loopback)?;
421
422    // Secure default (G-033): refuse a routable bind with no key before binding.
423    auth.refuse_to_start(is_loopback)?;
424
425    // Load the shared index, then bind. Load before bind so an index failure does
426    // not leave a half-open listener.
427    let (index, repo_root) = load_serve_index()?;
428    // Keep a copy for the onboarding state path (the original is moved into the
429    // runtime builder below).
430    let onboarding_root = repo_root.clone();
431    let runtime = build_serve_runtime(index, repo_root, auth.clone());
432
433    // US1 (FR-001/002/003): an explicit operator-chosen `--listen` is honored
434    // exactly — an occupied port fails loudly (no substitution). The default
435    // address (no `--listen`) prefers `DEFAULT_LISTEN` but, if occupied, falls
436    // back to an OS-assigned free port via `probe_free_listener` (which threads
437    // the live listener through — no rebind window, no dead second listener).
438    let listener = if args.explicit_listen {
439        bind_listener(addr).map_err(|source| ServeError::Bind { addr, source })?
440    } else {
441        probe_free_listener(Some(addr)).map_err(|source| ServeError::Bind { addr, source })?
442    };
443    let local_addr = listener
444        .local_addr()
445        .map_err(|source| ServeError::Bind { addr, source })?;
446
447    // Build the /mcp router plus the /admin + /api/v1 router (006), merge them,
448    // and layer Bearer auth + Origin gating in front (secure-default rule on
449    // AuthConfig/AuthLayerState; P1-B Origin on OriginLayerState). Bearer auth
450    // skips read-only admin static assets so the GUI loads when a key is set
451    // (P2-1); `/api/v1/*` and `/mcp` remain gated. Layer order: Bearer outermost,
452    // then Origin, then the handler.
453    let mcp_router = mcp_http::build_mcp_router(&runtime, local_addr);
454    let admin_router = admin::build_admin_router(&runtime);
455    let merged = mcp_router.merge(admin_router);
456
457    // Origin gate (P1-B): reject arbitrary cross-origin browser fetches against
458    // the browser-facing surface; non-browser API clients send no Origin and are
459    // unaffected. Allowed origins are the server's own bound address + loopback
460    // aliases.
461    let origin_state = OriginLayerState::from_bind_addr(local_addr);
462    let gated = super::apply_origin_gate(merged, origin_state);
463
464    // Bearer auth: the bootstrap --api-key OR any active minted key (G-039).
465    let mut auth_state = AuthLayerState::new(auth, is_loopback);
466    if let Some(store) = runtime.key_store() {
467        auth_state = auth_state.with_key_store(Arc::clone(store));
468    }
469    let app = super::apply_bearer_auth(gated, auth_state);
470
471    // Attach URL to stdout (the operator copies this into a second client).
472    let attach_url = format!(
473        "http://{host}:{port}{path}",
474        host = local_addr.ip(),
475        port = local_addr.port(),
476        path = mcp_http::MCP_PATH,
477    );
478    println!("{attach_url}");
479
480    // First-run / post-update onboarding banner (FR-009). Best-effort: a state
481    // read/write failure never affects serve. Shows once per build version, and
482    // only when anchored to a project data dir (no root => skip silently).
483    //
484    // When a sibling AAP checkout is detected (008 US3 / FR-006), the banner also
485    // surfaces the operator `/admin` panel URL and the AAP embed path dependency
486    // (the AAP-native integration route). Detection is read-only.
487    if let Some(root) = onboarding_root.as_ref() {
488        let state_path = crate::cli::onboarding::state_path(root);
489        let mut sink = crate::cli::onboarding::StderrSink;
490        let detection = crate::server::aap::AapDetection::resolve();
491        let aap_banner = detection.detected.then(|| {
492            let admin_url = format!(
493                "http://{host}:{port}{path}",
494                host = local_addr.ip(),
495                port = local_addr.port(),
496                path = crate::server::admin::ADMIN_PATH,
497            );
498            crate::cli::onboarding::AapBanner {
499                admin_url,
500                embed_path_dep: crate::server::aap::embed_cargo_snippet(),
501            }
502        });
503        crate::cli::onboarding::maybe_show_banner_with_aap(
504            &state_path,
505            env!("CARGO_PKG_VERSION"),
506            &attach_url,
507            aap_banner.as_ref(),
508            &mut sink,
509        );
510    }
511
512    tracing::info!(
513        addr = %local_addr,
514        auth = if runtime.auth().requires_auth(is_loopback) { "required" } else { "loopback-open" },
515        "operator server listening; MCP surface mounted at {}",
516        mcp_http::MCP_PATH
517    );
518
519    // Graceful shutdown (P2-B). On Unix, drain on either SIGINT (Ctrl+C) or
520    // SIGTERM (the signal Docker/K8s/systemd send to stop a container/unit) so
521    // the server actually drains under orchestration instead of being killed.
522    // On Windows, only Ctrl+C is available.
523    let shutdown = async {
524        #[cfg(unix)]
525        {
526            use tokio::signal::unix::{SignalKind, signal};
527            // If SIGTERM cannot be registered, fall back to Ctrl+C only rather
528            // than failing the serve loop.
529            let mut sigterm = signal(SignalKind::terminate()).ok();
530            match sigterm.as_mut() {
531                Some(term) => {
532                    tokio::select! {
533                        _ = tokio::signal::ctrl_c() => {}
534                        _ = term.recv() => {}
535                    }
536                }
537                None => {
538                    let _ = tokio::signal::ctrl_c().await;
539                }
540            }
541        }
542        #[cfg(not(unix))]
543        {
544            let _ = tokio::signal::ctrl_c().await;
545        }
546        tracing::info!("shutdown signal received, stopping operator server");
547    };
548
549    axum::serve(listener, app)
550        .with_graceful_shutdown(shutdown)
551        .await
552        .map_err(|source| ServeError::Serve { source })?;
553
554    // P2-3: after the HTTP server drains, wait (bounded) for any durable ledger
555    // writes scheduled via `spawn_blocking` just before shutdown to finish, so
556    // the economics ledger does not lose events accepted at the very end. A
557    // stuck DB cannot hang shutdown — the drain times out and logs the residual.
558    runtime
559        .protocol()
560        .ledger_write_tracker()
561        .drain(std::time::Duration::from_secs(5))
562        .await;
563
564    tracing::info!("operator server shut down cleanly");
565    Ok(())
566}
567
568#[cfg(test)]
569mod tests {
570    use super::*;
571    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
572    use std::sync::Mutex;
573
574    /// Serializes process-env mutation across the env-dependent tests in this
575    /// module (in addition to the suite-wide `--test-threads=1`).
576    static ENV_LOCK: Mutex<()> = Mutex::new(());
577
578    #[test]
579    fn resolve_api_key_prefers_inline_over_env() {
580        let _guard = ENV_LOCK.lock().unwrap();
581        let var = "SYMFORGE_TEST_SERVE_KEY_PREFER";
582        #[allow(unsafe_code)] // test-only env mutation under ENV_LOCK + --test-threads=1.
583        // SAFETY: serialized by ENV_LOCK; suite runs single-threaded.
584        unsafe {
585            std::env::set_var(var, "from_env")
586        };
587        let resolved = resolve_api_key(Some("from_inline"), Some(var));
588        assert_eq!(resolved.as_deref(), Some("from_inline"));
589        #[allow(unsafe_code)] // test-only env restore under ENV_LOCK + --test-threads=1.
590        unsafe {
591            std::env::remove_var(var)
592        };
593    }
594
595    #[test]
596    fn resolve_api_key_falls_back_to_env() {
597        let _guard = ENV_LOCK.lock().unwrap();
598        let var = "SYMFORGE_TEST_SERVE_KEY_FALLBACK";
599        #[allow(unsafe_code)] // test-only env mutation under ENV_LOCK + --test-threads=1.
600        unsafe {
601            std::env::set_var(var, "from_env")
602        };
603        let resolved = resolve_api_key(None, Some(var));
604        assert_eq!(resolved.as_deref(), Some("from_env"));
605        #[allow(unsafe_code)] // test-only env restore under ENV_LOCK + --test-threads=1.
606        unsafe {
607            std::env::remove_var(var)
608        };
609    }
610
611    #[test]
612    fn resolve_api_key_none_when_unset() {
613        let _guard = ENV_LOCK.lock().unwrap();
614        let var = "SYMFORGE_TEST_SERVE_KEY_UNSET";
615        #[allow(unsafe_code)] // test-only env restore under ENV_LOCK + --test-threads=1.
616        unsafe {
617            std::env::remove_var(var)
618        };
619        assert_eq!(resolve_api_key(None, Some(var)), None);
620        assert_eq!(resolve_api_key(None, None), None);
621        // Empty inline key is treated as unset.
622        assert_eq!(resolve_api_key(Some(""), None), None);
623    }
624
625    #[test]
626    fn inline_key_on_loopback_is_allowed_with_warning() {
627        // P2-E: an inline key on loopback is permitted (warns, does not refuse).
628        assert!(enforce_api_key_source_policy(Some("k"), true).is_ok());
629    }
630
631    #[test]
632    fn inline_key_on_non_loopback_is_refused() {
633        // P2-E: an inline key on a routable bind is refused (argv leak vector).
634        let err = enforce_api_key_source_policy(Some("k"), false)
635            .expect_err("inline key on non-loopback must refuse");
636        assert_eq!(
637            err,
638            super::super::auth::AuthStartupError::InlineKeyOnNonLoopback
639        );
640    }
641
642    #[test]
643    fn no_inline_key_passes_policy_on_any_bind() {
644        // No inline key: policy is a no-op regardless of bind (env/none handled
645        // by the secure-default refuse-to-start rule, not this source policy).
646        assert!(enforce_api_key_source_policy(None, true).is_ok());
647        assert!(enforce_api_key_source_policy(None, false).is_ok());
648        // Empty inline key is treated as "not provided".
649        assert!(enforce_api_key_source_policy(Some(""), false).is_ok());
650    }
651
652    #[tokio::test]
653    async fn run_refuses_inline_key_on_non_loopback() {
654        // P2-E end-to-end: a routable bind WITH an inline key still refuses to
655        // start (before binding) because the key would leak via argv.
656        let args = ServeArgs {
657            listen: "0.0.0.0:8787".to_string(),
658            explicit_listen: true,
659            api_key: Some("inline-secret".to_string()),
660            api_key_env: None,
661        };
662        let err = run(args)
663            .await
664            .expect_err("non-loopback + inline key must refuse");
665        assert!(matches!(
666            err,
667            ServeError::Startup(super::super::auth::AuthStartupError::InlineKeyOnNonLoopback)
668        ));
669    }
670
671    #[test]
672    fn is_loopback_addr_classifies_v4_and_v6() {
673        assert!(is_loopback_addr(&SocketAddr::new(
674            IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
675            8787
676        )));
677        assert!(is_loopback_addr(&SocketAddr::new(
678            IpAddr::V6(Ipv6Addr::LOCALHOST),
679            8787
680        )));
681        // P3-D: an IPv4-mapped IPv6 loopback is normalized and treated as loopback.
682        assert!(is_loopback_addr(
683            &"[::ffff:127.0.0.1]:8787"
684                .parse()
685                .expect("v4-mapped loopback parses")
686        ));
687        assert!(!is_loopback_addr(&SocketAddr::new(
688            IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
689            8787
690        )));
691        assert!(!is_loopback_addr(&SocketAddr::new(
692            IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10)),
693            8787
694        )));
695    }
696
697    #[tokio::test]
698    async fn run_refuses_non_loopback_without_key() {
699        let args = ServeArgs {
700            listen: "0.0.0.0:8787".to_string(),
701            explicit_listen: true,
702            api_key: None,
703            api_key_env: None,
704        };
705        let err = run(args)
706            .await
707            .expect_err("non-loopback + no key must refuse");
708        assert!(matches!(err, ServeError::Startup(_)));
709    }
710
711    #[tokio::test]
712    async fn run_rejects_unparseable_listen() {
713        let args = ServeArgs {
714            listen: "not-an-address".to_string(),
715            explicit_listen: true,
716            api_key: Some("k".to_string()),
717            api_key_env: None,
718        };
719        let err = run(args).await.expect_err("bad --listen must error");
720        assert!(matches!(err, ServeError::InvalidListen { .. }));
721    }
722
723    #[tokio::test]
724    async fn bind_listener_binds_ephemeral_loopback_port() {
725        let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
726        let listener = bind_listener(addr).expect("bind ephemeral loopback");
727        let local = listener.local_addr().expect("local_addr");
728        assert!(local.ip().is_loopback());
729        assert_ne!(local.port(), 0, "OS must assign a concrete port");
730    }
731
732    #[tokio::test]
733    async fn probe_free_port_uses_preferred_when_free() {
734        // Reserve an OS-assigned port, then free it, so we have a known-free
735        // address to prefer. probe_free_port must return exactly that port.
736        let scratch = bind_listener("127.0.0.1:0".parse().unwrap()).expect("scratch bind");
737        let preferred = scratch.local_addr().expect("local_addr");
738        drop(scratch);
739
740        let chosen = probe_free_port(Some(preferred)).expect("probe a free preferred port");
741        assert_eq!(
742            chosen, preferred,
743            "a free preferred port is honored exactly"
744        );
745    }
746
747    /// Occupy a loopback port with an **exclusive** listener (plain `std` bind,
748    /// no `SO_REUSEADDR`) — the honest reproduction of a real squatter
749    /// (`wslrelay`/another service). A `bind_listener` (which sets `SO_REUSEADDR`)
750    /// on this same port then fails: on Windows two sockets only share a port if
751    /// BOTH set `SO_REUSEADDR`, and on Linux `SO_REUSEADDR` does not let a second
752    /// socket bind an actively listening port. Using a `bind_listener` occupier
753    /// here would (wrongly) let the probe *share* the port and never fall back.
754    fn occupy_exclusive() -> (std::net::TcpListener, SocketAddr) {
755        let listener =
756            std::net::TcpListener::bind("127.0.0.1:0").expect("exclusive occupy a loopback port");
757        let addr = listener.local_addr().expect("local_addr");
758        (listener, addr)
759    }
760
761    /// True when `port` is in the operator-friendly ranges (8000-8999 /
762    /// 5000-5999) that `operator_port_candidates` scans — never an OS ephemeral
763    /// high port that a corporate network would block (the 61850 problem).
764    fn in_operator_range(port: u16) -> bool {
765        (8000..=8999).contains(&port) || (5000..=5999).contains(&port)
766    }
767
768    #[tokio::test]
769    async fn probe_free_port_falls_back_to_operator_range_when_preferred_occupied() {
770        // Occupy a port exclusively, then prefer it: probe must fall back to a
771        // DIFFERENT free loopback port IN THE OPERATOR RANGES (never the occupied
772        // port, never an OS ephemeral high port).
773        let (occupied_listener, occupied) = occupy_exclusive();
774
775        let chosen = probe_free_port(Some(occupied)).expect("probe falls back to a free port");
776        assert_ne!(
777            chosen.port(),
778            occupied.port(),
779            "must not pick the occupied port"
780        );
781        assert!(chosen.ip().is_loopback(), "fallback stays on loopback");
782        assert!(
783            in_operator_range(chosen.port()),
784            "fallback must be an operator-range port (8000-8999/5000-5999), not ephemeral: {}",
785            chosen.port()
786        );
787
788        // The fallback address is genuinely bindable (verified-free).
789        let rebind = bind_listener(chosen).expect("the chosen fallback port is free");
790        drop(rebind);
791        drop(occupied_listener);
792    }
793
794    #[tokio::test]
795    async fn probe_free_listener_threads_a_live_listener() {
796        // The production primitive returns a live, serving-capable listener with
797        // no rebind window. Occupy the preferred port exclusively and confirm the
798        // returned listener is on a different, concrete port.
799        let (occupied_listener, occupied) = occupy_exclusive();
800
801        let listener = probe_free_listener(Some(occupied)).expect("probe a live fallback listener");
802        let local = listener.local_addr().expect("local_addr");
803        assert_ne!(
804            local.port(),
805            occupied.port(),
806            "fell back off the occupied port"
807        );
808        assert!(
809            in_operator_range(local.port()),
810            "fallback must be an operator-range port, not ephemeral: {}",
811            local.port()
812        );
813        drop(listener);
814        drop(occupied_listener);
815    }
816
817    #[tokio::test]
818    async fn probe_free_port_none_preference_uses_operator_range() {
819        // No preference: bind the first free OPERATOR-RANGE loopback port
820        // (8000-8999/5000-5999), never an OS ephemeral high port that corporate
821        // networks block (the 61850 problem).
822        let chosen = probe_free_port(None).expect("probe with no preference");
823        assert!(chosen.ip().is_loopback());
824        assert!(
825            in_operator_range(chosen.port()),
826            "no-preference bind must be in the operator ranges, not ephemeral: {}",
827            chosen.port()
828        );
829    }
830}