Skip to main content

arcly_http/
app.rs

1//! Launch contract.
2//!
3//! Boot phases (executed strictly in this order — order matters):
4//!
5//! 1. Collect inventory-registered providers + routes.
6//! 2. Run each plugin's `on_init` with a *mutable* `ArclyPluginContext` —
7//!    plugins may queue providers (`provide<T>`), routes, global
8//!    interceptors, OpenAPI mutators.
9//! 3. Apply queued provider closures to the `DiContainerBuilder`.
10//! 4. **Freeze.** The container becomes `&'static`, lock-free for reads.
11//! 5. Build the OpenAPI spec, run plugin spec-mutators, leak it as `&'static`.
12//! 6. Mount macro-registered routes, then plugin-registered routes.
13//! 7. Bind the listener.
14//! 8. Run each plugin's `on_start(&container)` — background tasks spawn here.
15//! 9. Serve. Ctrl-C / SIGTERM triggers axum's graceful shutdown — accepts
16//!    stop, in-flight drain. **Only after** that completes do plugin
17//!    `on_shutdown(&container)` calls run, each wrapped in a 5-second
18//!    per-plugin timeout so a wedged plugin can never wedge the process.
19
20use std::marker::PhantomData;
21use std::sync::Arc;
22use std::time::Duration;
23
24use axum::response::{Html, IntoResponse};
25use axum::routing::get;
26
27use crate::core::engine::{
28    DiContainerBuilder, HttpMethod, Module, ModuleDescriptor, RouteDescriptor,
29};
30use crate::core::plugins::{
31    build_plugin_route, ArclyPlugin, ArclyPluginContext, PluginError, PluginStage,
32};
33use crate::openapi::{build_spec_filtered, OpenApiInfo, SWAGGER_UI_HTML};
34use crate::realtime::gateway::GatewayDescriptor;
35use crate::realtime::{ws_route, ConnectionRegistry};
36use crate::web::boundary::adapt;
37
38/// Tunables for the launch contract. Start from `Default` and adjust:
39///
40/// ```ignore
41/// App::launch_configured::<AppModule>(addr, info, plugins, LaunchConfig {
42///     request_timeout: Duration::from_secs(10),
43///     max_in_flight: 4_096,
44///     cors: Some(CorsConfig::for_origins(["https://app.example.com"])),
45///     ..Default::default()
46/// }).await
47/// ```
48#[derive(Clone, Debug)]
49#[non_exhaustive]
50pub struct LaunchConfig {
51    /// Per-plugin budget for `on_shutdown` / start-failure rollback (and the
52    /// concurrent `on_draining` notification). A plugin exceeding it is
53    /// skipped with a logged warning — it can never wedge the process.
54    /// Default `5s`.
55    pub drain_budget: Duration,
56    /// Process-wide request deadline. Routes without `#[Timeout]` are
57    /// cancelled (worker freed) and answered `504` past this. `ZERO`
58    /// disables. Default `30s`.
59    pub request_timeout: Duration,
60    /// Hard cap on concurrent in-flight requests — beyond it requests are
61    /// shed with `503` + `Retry-After` before any body is read (one atomic
62    /// counter, no locks). `0` = unlimited. Default `0`.
63    pub max_in_flight: usize,
64    /// Request body cap for every entry point. Default 8 MiB.
65    pub max_body_bytes: usize,
66    /// Ceiling on `#[CacheTTL]` store entries — the key includes the query
67    /// string, so an unbounded store is a memory-DoS vector. Default 10 000.
68    pub cache_max_entries: usize,
69    /// How often the cache sweeper reclaims expired entries. Default `30s`.
70    pub cache_sweep_interval: Duration,
71    /// After a shutdown signal, WebSocket clients get this long to finish
72    /// before the server sends Close frames — otherwise live sockets keep
73    /// the HTTP drain (and therefore plugin `on_shutdown`) waiting until
74    /// the supervisor SIGKILLs. Default `10s`.
75    pub ws_drain_deadline: Duration,
76    /// CORS policy. `None` (default) mounts no CORS layer at all.
77    pub cors: Option<crate::web::cors::CorsConfig>,
78    /// Programmatic shutdown trigger — notifying it behaves exactly like
79    /// receiving SIGTERM (drain flag, WS deadline, plugin hooks). Wired by
80    /// `testing::TestServer::shutdown`; production should keep `None` and
81    /// use real signals.
82    pub shutdown_trigger: Option<std::sync::Arc<tokio::sync::Notify>>,
83    /// Mount `/docs` (Swagger UI) and `/openapi.json`. Default `true`;
84    /// disable in hardened deployments that publish the spec elsewhere.
85    pub expose_docs: bool,
86    /// Per-socket outbound queue depth — the slow-client memory ceiling; a
87    /// client that can't drain it is evicted. Default `256`.
88    pub ws_outbound_buffer: usize,
89    /// Hard cap on concurrent WebSocket connections across all gateways;
90    /// beyond it upgrades get `503` before any socket exists. `0` =
91    /// unlimited. Default `0`.
92    pub ws_max_connections: usize,
93    /// Server→client Ping cadence; pongs feed the idle sweeper. `ZERO`
94    /// disables. Default `20s`.
95    pub ws_ping_interval: Duration,
96    /// Adaptive latency shedding: when the EWMA of request latency exceeds
97    /// this target, a pressure-proportional slice of traffic is shed with
98    /// `503` (capped at 90% so the EWMA can recover). `ZERO` disables.
99    /// Default `ZERO` (opt-in).
100    pub adaptive_shed_target: Duration,
101    /// Reap sockets with no inbound activity for this long — dead TCP links
102    /// (NAT drops) never send Close and would linger forever. `ZERO`
103    /// disables. Default `60s`.
104    pub ws_idle_timeout: Duration,
105}
106
107impl Default for LaunchConfig {
108    fn default() -> Self {
109        Self {
110            drain_budget: Duration::from_secs(5),
111            request_timeout: Duration::from_secs(30),
112            max_in_flight: 0,
113            max_body_bytes: 8 * 1024 * 1024,
114            cache_max_entries: 10_000,
115            cache_sweep_interval: Duration::from_secs(30),
116            ws_drain_deadline: Duration::from_secs(10),
117            adaptive_shed_target: Duration::ZERO,
118            cors: None,
119            shutdown_trigger: None,
120            expose_docs: true,
121            ws_outbound_buffer: 256,
122            ws_max_connections: 0,
123            ws_ping_interval: Duration::from_secs(20),
124            ws_idle_timeout: Duration::from_secs(60),
125        }
126    }
127}
128
129impl LaunchConfig {
130    pub fn drain_budget(mut self, v: Duration) -> Self {
131        self.drain_budget = v;
132        self
133    }
134    pub fn request_timeout(mut self, v: Duration) -> Self {
135        self.request_timeout = v;
136        self
137    }
138    pub fn max_in_flight(mut self, v: usize) -> Self {
139        self.max_in_flight = v;
140        self
141    }
142    pub fn max_body_bytes(mut self, v: usize) -> Self {
143        self.max_body_bytes = v;
144        self
145    }
146    pub fn cache_max_entries(mut self, v: usize) -> Self {
147        self.cache_max_entries = v;
148        self
149    }
150    pub fn cache_sweep_interval(mut self, v: Duration) -> Self {
151        self.cache_sweep_interval = v;
152        self
153    }
154    pub fn ws_drain_deadline(mut self, v: Duration) -> Self {
155        self.ws_drain_deadline = v;
156        self
157    }
158    pub fn adaptive_shed_target(mut self, v: Duration) -> Self {
159        self.adaptive_shed_target = v;
160        self
161    }
162    pub fn cors(mut self, v: crate::web::cors::CorsConfig) -> Self {
163        self.cors = Some(v);
164        self
165    }
166    pub fn shutdown_trigger(mut self, v: std::sync::Arc<tokio::sync::Notify>) -> Self {
167        self.shutdown_trigger = Some(v);
168        self
169    }
170    pub fn expose_docs(mut self, v: bool) -> Self {
171        self.expose_docs = v;
172        self
173    }
174    pub fn ws_outbound_buffer(mut self, v: usize) -> Self {
175        self.ws_outbound_buffer = v;
176        self
177    }
178    pub fn ws_max_connections(mut self, v: usize) -> Self {
179        self.ws_max_connections = v;
180        self
181    }
182    pub fn ws_ping_interval(mut self, v: Duration) -> Self {
183        self.ws_ping_interval = v;
184        self
185    }
186    pub fn ws_idle_timeout(mut self, v: Duration) -> Self {
187        self.ws_idle_timeout = v;
188        self
189    }
190
191    /// Apply `ARCLY_*` environment overrides on top of the coded values —
192    /// applied automatically by the launch path so operators can retune a
193    /// deployment (incident response, load tests) without a rebuild:
194    ///
195    /// | Variable | Field |
196    /// |---|---|
197    /// | `ARCLY_REQUEST_TIMEOUT_MS` | `request_timeout` (`0` disables) |
198    /// | `ARCLY_MAX_IN_FLIGHT` | `max_in_flight` (`0` = unlimited) |
199    /// | `ARCLY_MAX_BODY_BYTES` | `max_body_bytes` |
200    /// | `ARCLY_CACHE_MAX_ENTRIES` | `cache_max_entries` |
201    /// | `ARCLY_WS_DRAIN_DEADLINE_MS` | `ws_drain_deadline` |
202    /// | `ARCLY_DRAIN_BUDGET_MS` | `drain_budget` |
203    /// | `ARCLY_EXPOSE_DOCS` | `expose_docs` (`true`/`false`/`1`/`0`) |
204    ///
205    /// Unparseable values are ignored with a warning — a typo must never
206    /// change behaviour silently or stop the boot.
207    pub fn with_env_overrides(self) -> Self {
208        self.apply_overrides(|k| std::env::var(k).ok())
209    }
210
211    /// Testable core of [`with_env_overrides`](Self::with_env_overrides).
212    pub(crate) fn apply_overrides(mut self, get: impl Fn(&str) -> Option<String>) -> Self {
213        fn parse<T: std::str::FromStr>(key: &str, raw: String) -> Option<T> {
214            match raw.parse() {
215                Ok(v) => Some(v),
216                Err(_) => {
217                    tracing::warn!(key, value = raw, "ignoring unparseable ARCLY_* override");
218                    None
219                }
220            }
221        }
222        if let Some(v) =
223            get("ARCLY_REQUEST_TIMEOUT_MS").and_then(|r| parse("ARCLY_REQUEST_TIMEOUT_MS", r))
224        {
225            self.request_timeout = Duration::from_millis(v);
226        }
227        if let Some(v) = get("ARCLY_MAX_IN_FLIGHT").and_then(|r| parse("ARCLY_MAX_IN_FLIGHT", r)) {
228            self.max_in_flight = v;
229        }
230        if let Some(v) = get("ARCLY_MAX_BODY_BYTES").and_then(|r| parse("ARCLY_MAX_BODY_BYTES", r))
231        {
232            self.max_body_bytes = v;
233        }
234        if let Some(v) =
235            get("ARCLY_CACHE_MAX_ENTRIES").and_then(|r| parse("ARCLY_CACHE_MAX_ENTRIES", r))
236        {
237            self.cache_max_entries = v;
238        }
239        if let Some(v) =
240            get("ARCLY_WS_DRAIN_DEADLINE_MS").and_then(|r| parse("ARCLY_WS_DRAIN_DEADLINE_MS", r))
241        {
242            self.ws_drain_deadline = Duration::from_millis(v);
243        }
244        if let Some(v) =
245            get("ARCLY_DRAIN_BUDGET_MS").and_then(|r| parse("ARCLY_DRAIN_BUDGET_MS", r))
246        {
247            self.drain_budget = Duration::from_millis(v);
248        }
249        if let Some(v) =
250            get("ARCLY_WS_OUTBOUND_BUFFER").and_then(|r| parse("ARCLY_WS_OUTBOUND_BUFFER", r))
251        {
252            self.ws_outbound_buffer = v;
253        }
254        if let Some(v) =
255            get("ARCLY_WS_MAX_CONNECTIONS").and_then(|r| parse("ARCLY_WS_MAX_CONNECTIONS", r))
256        {
257            self.ws_max_connections = v;
258        }
259        if let Some(v) =
260            get("ARCLY_WS_PING_INTERVAL_MS").and_then(|r| parse("ARCLY_WS_PING_INTERVAL_MS", r))
261        {
262            self.ws_ping_interval = Duration::from_millis(v);
263        }
264        if let Some(v) =
265            get("ARCLY_WS_IDLE_TIMEOUT_MS").and_then(|r| parse("ARCLY_WS_IDLE_TIMEOUT_MS", r))
266        {
267            self.ws_idle_timeout = Duration::from_millis(v);
268        }
269        if let Some(v) = get("ARCLY_ADAPTIVE_SHED_TARGET_MS")
270            .and_then(|r| parse("ARCLY_ADAPTIVE_SHED_TARGET_MS", r))
271        {
272            self.adaptive_shed_target = Duration::from_millis(v);
273        }
274        if let Some(raw) = get("ARCLY_EXPOSE_DOCS") {
275            match raw.as_str() {
276                "true" | "1" => self.expose_docs = true,
277                "false" | "0" => self.expose_docs = false,
278                _ => tracing::warn!(value = raw, "ignoring unparseable ARCLY_EXPOSE_DOCS"),
279            }
280        }
281        self
282    }
283}
284
285pub struct App;
286
287impl App {
288    pub async fn launch<RootMod: Module>(addr: &str) -> std::io::Result<()> {
289        let info = OpenApiInfo::new("arcly-http service", env!("CARGO_PKG_VERSION"));
290        Self::launch_with_info::<RootMod>(addr, info).await
291    }
292
293    pub async fn launch_named<RootMod: Module>(
294        addr: &str,
295        title: &'static str,
296        version: &'static str,
297    ) -> std::io::Result<()> {
298        let info = OpenApiInfo::new(title, version);
299        Self::launch_with_info::<RootMod>(addr, info).await
300    }
301
302    pub async fn launch_with_info<RootMod: Module>(
303        addr: &str,
304        info: OpenApiInfo,
305    ) -> std::io::Result<()> {
306        Self::launch_with_plugins::<RootMod>(addr, info, Vec::new()).await
307    }
308
309    /// Full launch contract with plugins. See the module docstring for the
310    /// strict phase ordering.
311    pub async fn launch_with_plugins<RootMod: Module>(
312        addr: &str,
313        info: OpenApiInfo,
314        plugins: Vec<Box<dyn ArclyPlugin>>,
315    ) -> std::io::Result<()> {
316        Self::launch_configured::<RootMod>(addr, info, plugins, LaunchConfig::default()).await
317    }
318
319    /// `launch_with_plugins` with explicit [`LaunchConfig`] tunables.
320    pub async fn launch_configured<RootMod: Module>(
321        addr: &str,
322        info: OpenApiInfo,
323        plugins: Vec<Box<dyn ArclyPlugin>>,
324        config: LaunchConfig,
325    ) -> std::io::Result<()> {
326        let listener = tokio::net::TcpListener::bind(addr).await?;
327        Self::launch_on_listener::<RootMod>(listener, info, plugins, config).await
328    }
329
330    /// `launch_configured` over a pre-bound listener. This is how test
331    /// harnesses dodge the bind-the-port-twice race: the port is yours from
332    /// the moment you bind it, with no release-and-rebind window for a
333    /// parallel launch to steal it. Production code normally passes an
334    /// address string instead.
335    pub async fn launch_on_listener<RootMod: Module>(
336        listener: tokio::net::TcpListener,
337        info: OpenApiInfo,
338        mut plugins: Vec<Box<dyn ArclyPlugin>>,
339        config: LaunchConfig,
340    ) -> std::io::Result<()> {
341        let _root: PhantomData<RootMod> = PhantomData;
342        // ARCLY_* env vars override coded values — retune without a rebuild.
343        let config = config.with_env_overrides();
344        tracing::info!(
345            request_timeout = ?config.request_timeout,
346            max_in_flight = config.max_in_flight,
347            max_body_bytes = config.max_body_bytes,
348            expose_docs = config.expose_docs,
349            "launch config (effective)"
350        );
351
352        // ── 0. Walk the module DAG from RootMod ────────────────────
353        let reachable_modules = collect_reachable_modules(RootMod::descriptor());
354        let allowed_controllers: std::collections::HashSet<&'static str> = reachable_modules
355            .iter()
356            .flat_map(|m| m.controllers.iter().copied())
357            .collect();
358
359        // ── 1. Providers from reachable modules only ───────────────
360        let mut b = DiContainerBuilder::new();
361        for m in &reachable_modules {
362            for p in m.providers {
363                b.add_provider(p);
364            }
365        }
366
367        // ── 2. Plugin on_init — they may queue providers + routes ──
368        let mut plugin_ctx = ArclyPluginContext::new();
369        for p in plugins.iter_mut() {
370            plugin_ctx.current_plugin = p.name();
371            if let Err(e) = p.on_init(&mut plugin_ctx).await {
372                return Err(plugin_io_err(e));
373            }
374        }
375
376        // ── 3. Apply queued provider closures ──────────────────────
377        for f in plugin_ctx.pending_providers.drain(..) {
378            f(&mut b);
379        }
380
381        // ── 4. Freeze the container — `&'static`, lock-free reads ──
382        // The dynamic route table goes in first so services and plugins can
383        // `Inject<DynamicRouteTable>` and mount `/_plugins/*` routes at runtime.
384        b.register(crate::web::dynamic::DynamicRouteTable::new());
385        // Per-app body cap rides the frozen container (lock-free probe) —
386        // a process-global knob would let concurrent apps clobber each other.
387        b.register(crate::web::boundary::BodyLimit(config.max_body_bytes));
388        let container = b.freeze();
389
390        // ── 5. OpenAPI: build (scoped to the module DAG), then mutate ──
391        let mut spec_value = build_spec_filtered(&info, Some(&allowed_controllers));
392        for mutator in plugin_ctx.openapi_mutators.drain(..) {
393            mutator(&mut spec_value);
394        }
395        // Serialize ONCE — /openapi.json then serves a static byte slice
396        // instead of cloning + re-serializing a multi-MB Value per request.
397        let spec_bytes: &'static [u8] = Box::leak(
398            serde_json::to_vec(&spec_value)
399                .unwrap_or_else(|e| {
400                    // Spec is built from our own serde_json::Value — failure
401                    // here is a framework bug; fail at boot, not per-request.
402                    panic!("Arcly: OpenAPI spec serialization failed: {e}")
403                })
404                .into_boxed_slice(),
405        );
406
407        // ── 6. Mount routes (filtered by reachable controller set) ─
408        // Plugin-registered global interceptors wrap every mounted route;
409        // leak the list so route closures can hold a `&'static` slice.
410        let globals: &'static [&'static dyn crate::web::interceptors::Interceptor] =
411            Box::leak(std::mem::take(&mut plugin_ctx.global_interceptors).into_boxed_slice());
412        // Boundary filters run before the body is read, on every entry point.
413        let filters: &'static [&'static dyn crate::web::boundary::BoundaryFilter] =
414            Box::leak(std::mem::take(&mut plugin_ctx.boundary_filters).into_boxed_slice());
415
416        let mut router: axum::Router<&'static crate::core::engine::FrozenDiContainer> =
417            axum::Router::new();
418        let mut mounted: std::collections::HashSet<(&'static str, HttpMethod)> =
419            std::collections::HashSet::new();
420        // Framework-reserved routes, mounted below — plugins may not shadow them.
421        mounted.insert(("/openapi.json", HttpMethod::GET));
422        mounted.insert(("/docs", HttpMethod::GET));
423        for rt in inventory::iter::<&'static RouteDescriptor> {
424            // Empty `controller` = free-fn route → always mount.
425            // Non-empty = must belong to a controller in the reachable DAG.
426            if !rt.controller.is_empty() && !allowed_controllers.contains(rt.controller) {
427                continue;
428            }
429            mounted.insert((rt.path, rt.method));
430            router = router.route(axum_path_static(rt.path), adapt(rt, globals, filters));
431            // NestJS semantics: `#[Get("/")]` on a `/products` controller
432            // serves BOTH `/products/` and `/products` — previously the
433            // bare form 404'd. Skipped when the bare path is taken.
434            if rt.path.len() > 1 && rt.path.ends_with('/') {
435                let trimmed: &'static str =
436                    Box::leak(rt.path[..rt.path.len() - 1].to_owned().into_boxed_str());
437                if mounted.insert((trimmed, rt.method)) {
438                    router = router.route(axum_path_static(trimmed), adapt(rt, globals, filters));
439                }
440            }
441        }
442        let mut app = router.with_state(container);
443        for r in &plugin_ctx.extra_routes {
444            // Surface collisions as a clean error naming the plugin, instead
445            // of letting axum's `route()` panic deep inside launch.
446            if !mounted.insert((r.path, r.method)) {
447                return Err(plugin_io_err(PluginError::new(
448                    r.plugin,
449                    PluginStage::Init,
450                    format!(
451                        "route `{:?} {}` is already mounted by another route or plugin",
452                        r.method, r.path
453                    ),
454                )));
455            }
456            app = app.route(
457                axum_path_static(r.path),
458                build_plugin_route(container, r, globals, filters),
459            );
460        }
461
462        // Runtime-mutable plugin routes: one catch-all, dispatch via ArcSwap.
463        // Interceptors are installed into the table so mounts compose their
464        // chain once, at mount time.
465        container
466            .get::<crate::web::dynamic::DynamicRouteTable>()
467            .set_globals(globals);
468        app = app.route(
469            "/_plugins/{*rest}",
470            crate::web::dynamic::dynamic_dispatch_route(container, filters),
471        );
472
473        // ── 6b. Mount real-time gateways (filtered by reachable module DAG) ─
474        // One process-wide connection registry, leaked to `&'static` and shared
475        // by every gateway upgrade route — sharded, lock-free on the hot path.
476        let allowed_gateways: std::collections::HashSet<&'static str> = reachable_modules
477            .iter()
478            .flat_map(|m| m.gateways.iter().copied())
479            .collect();
480        let registry: &'static ConnectionRegistry = Box::leak(Box::new(ConnectionRegistry::new()));
481        let ws_tuning = crate::realtime::ws::WsTuning {
482            outbound_buffer: config.ws_outbound_buffer,
483            max_connections: config.ws_max_connections,
484            ping_interval: config.ws_ping_interval,
485        };
486        // Idle sweeper: dead TCP links (NAT drops) never send Close — reap
487        // sockets with no inbound activity past the timeout. Pings above
488        // keep healthy-but-quiet clients alive via pongs.
489        if !config.ws_idle_timeout.is_zero() {
490            let idle = config.ws_idle_timeout;
491            tokio::spawn(async move {
492                let mut tick = tokio::time::interval(idle / 2);
493                tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
494                loop {
495                    tick.tick().await;
496                    let reaped = registry.sweep_idle(idle.as_secs());
497                    if !reaped.is_empty() {
498                        tracing::info!(count = reaped.len(), "reaped idle WebSocket connections");
499                    }
500                }
501            });
502        }
503        for gd in inventory::iter::<&'static GatewayDescriptor> {
504            if !allowed_gateways.contains(gd.name) {
505                continue;
506            }
507            let runtime = (gd.build)(container);
508            app = app.route(
509                axum_path_static(gd.path),
510                ws_route(runtime, registry, container, ws_tuning),
511            );
512        }
513
514        // Docs surface is opt-out (`expose_docs` / ARCLY_EXPOSE_DOCS) for
515        // hardened deployments. The paths stay reserved either way so a
516        // plugin can't squat them when docs are off.
517        if config.expose_docs {
518            app = app
519                .route(
520                    "/openapi.json",
521                    get(move || async move {
522                        (
523                            [(axum::http::header::CONTENT_TYPE, "application/json")],
524                            spec_bytes,
525                        )
526                            .into_response()
527                    }),
528                )
529                .route(
530                    "/docs",
531                    get(|| async { Html(SWAGGER_UI_HTML).into_response() }),
532                );
533        }
534        let mut app = app.layer(axum::middleware::from_fn(
535            crate::security::apply_security_headers,
536        ));
537
538        // ── 6c. Governance layers (outermost) ──────────────────────
539        // CORS (when configured), then the governor: request-id, global
540        // deadline, in-flight admission control. Layer order: the governor
541        // is added last → wraps everything, including CORS preflights.
542        if let Some(cors_cfg) = config.cors.clone() {
543            let cors_cfg: &'static crate::web::cors::CorsConfig = Box::leak(Box::new(cors_cfg));
544            app = app.layer(axum::middleware::from_fn(
545                move |req: axum::extract::Request, next: axum::middleware::Next| {
546                    crate::web::cors::apply_cors(cors_cfg, req, next)
547                },
548            ));
549        }
550        let gov = crate::web::governor::Governor::new(
551            config.request_timeout,
552            config.max_in_flight,
553            config.adaptive_shed_target,
554        );
555        let app = app.layer(axum::middleware::from_fn(
556            move |req: axum::extract::Request, next: axum::middleware::Next| {
557                crate::web::governor::govern(Arc::clone(&gov), req, next)
558            },
559        ));
560
561        // ── 6d. Resource governance knobs + cache sweeper ──────────
562        crate::web::cache::set_capacity(config.cache_max_entries);
563        crate::web::cache::spawn_sweeper(config.cache_sweep_interval);
564
565        // ── 7. Bind ────────────────────────────────────────────────
566        // (already bound — handed in by the caller)
567
568        // ── 8. Plugin on_start — bg tasks spawn here ───────────────
569        //
570        // If a plugin fails, roll back already-started plugins in reverse order
571        // before propagating the error — prevents orphaned background tasks.
572        // Each rollback `on_shutdown` gets the same 5-second per-plugin budget
573        // used by the post-serve drain loop: a wedged plugin must not hang the
574        // process or starve the remaining rollbacks.
575        let plugins_arc: Arc<Vec<Box<dyn ArclyPlugin>>> = Arc::new(plugins);
576        let mut started = 0usize;
577        #[allow(clippy::explicit_counter_loop)] // counter outlives the loop for the error message
578        for p in plugins_arc.iter() {
579            if let Err(e) = p.on_start(container).await {
580                for already in plugins_arc[..started].iter().rev() {
581                    drain_plugin(already.as_ref(), container, "rollback", config.drain_budget)
582                        .await;
583                }
584                return Err(plugin_io_err(e));
585            }
586            started += 1;
587        }
588
589        // ── 9. Serve with two-phase graceful shutdown ──────────────
590        //
591        // Phase A: SIGTERM/Ctrl-C → axum stops accepting + drains in-flight.
592        // Phase B: AFTER axum has fully drained, run plugin `on_shutdown`s
593        //          in reverse declaration order, wrapped in a 5s timeout.
594        let plugins_for_draining = Arc::clone(&plugins_arc);
595        let draining_budget = config.drain_budget;
596        let ws_deadline = config.ws_drain_deadline;
597        let trigger = config.shutdown_trigger.clone();
598        let serve = axum::serve(listener, app).with_graceful_shutdown(async move {
599            match trigger {
600                Some(n) => {
601                    tokio::select! {
602                        _ = shutdown_signal() => {}
603                        _ = n.notified() => {}
604                    }
605                }
606                None => shutdown_signal().await,
607            }
608            // First thing: flip readiness so the LB stops routing new
609            // traffic to this pod while in-flight requests drain.
610            crate::observability::health::set_draining(true);
611            tracing::info!("shutdown signal received — HTTP draining");
612            // Live WebSockets would hold the drain open until clients leave
613            // (i.e. forever) — give them `ws_drain_deadline`, then close.
614            tokio::spawn(async move {
615                tokio::time::sleep(ws_deadline).await;
616                let closed = registry.close_all();
617                if closed > 0 {
618                    tracing::warn!(closed, "WS drain deadline reached — closed live sockets");
619                }
620            });
621            // Notify plugins concurrently with the HTTP drain (spawned so the
622            // listener closes immediately): stop consuming MQ/scheduler work
623            // while in-flight HTTP requests finish. Cleanup stays in
624            // `on_shutdown`, which runs only after the drain completes.
625            tokio::spawn(async move {
626                for p in plugins_for_draining.iter() {
627                    match tokio::time::timeout(draining_budget, p.on_draining(container)).await {
628                        Ok(Ok(())) => {}
629                        Ok(Err(e)) => {
630                            tracing::error!(plugin = p.name(), error = %e, "plugin draining error")
631                        }
632                        Err(_) => tracing::warn!(
633                            plugin = p.name(),
634                            budget = ?draining_budget,
635                            "plugin on_draining exceeded budget"
636                        ),
637                    }
638                }
639            });
640        });
641        let serve_res = serve.await;
642
643        // HTTP server has now fully stopped. Safe to drain plugins.
644        tracing::info!(
645            budget = ?config.drain_budget,
646            "HTTP fully drained — running plugin on_shutdown (per-plugin budget)"
647        );
648        for p in plugins_arc.iter().rev() {
649            drain_plugin(p.as_ref(), container, "shutdown", config.drain_budget).await;
650        }
651        serve_res
652    }
653}
654
655/// Walk the module `imports` DAG breadth-first from the root, deduplicating
656/// by descriptor pointer identity. Returns descriptors in a stable, root-first
657/// traversal order.
658/// Translate arcly's NestJS-style path syntax (`/users/:id`, `/files/*rest`)
659/// into axum 0.8 matcher syntax (`/users/{id}`, `/files/{*rest}`).
660/// Route descriptors keep the user syntax (OpenAPI + metrics labels read
661/// it); only the string handed to the axum router is translated, leaked
662/// once per route at boot.
663fn axum_path(path: &str) -> String {
664    path.split('/')
665        .map(|seg| {
666            if let Some(name) = seg.strip_prefix(':') {
667                format!("{{{name}}}")
668            } else if let Some(name) = seg.strip_prefix('*') {
669                format!("{{*{name}}}")
670            } else {
671                seg.to_owned()
672            }
673        })
674        .collect::<Vec<_>>()
675        .join("/")
676}
677
678fn axum_path_static(path: &str) -> &'static str {
679    Box::leak(axum_path(path).into_boxed_str())
680}
681
682fn collect_reachable_modules(root: &'static ModuleDescriptor) -> Vec<&'static ModuleDescriptor> {
683    use std::collections::HashSet;
684    let mut visited: HashSet<*const ModuleDescriptor> = HashSet::new();
685    let mut queue: std::collections::VecDeque<&'static ModuleDescriptor> =
686        std::collections::VecDeque::new();
687    let mut order: Vec<&'static ModuleDescriptor> = Vec::new();
688    queue.push_back(root);
689    while let Some(m) = queue.pop_front() {
690        if !visited.insert(m as *const _) {
691            continue;
692        }
693        order.push(m);
694        for getter in m.imports {
695            queue.push_back(getter());
696        }
697    }
698    order
699}
700
701/// Wait for either SIGINT (Ctrl-C) or SIGTERM (process supervisor / K8s / Docker).
702///
703/// On Windows only SIGINT is available; the cfg guard keeps the unix-specific
704/// import out of non-unix builds.
705#[cfg(unix)]
706async fn shutdown_signal() {
707    use tokio::signal::unix::{signal, SignalKind};
708    match signal(SignalKind::terminate()) {
709        Ok(mut sigterm) => {
710            tokio::select! {
711                _ = tokio::signal::ctrl_c() => {}
712                _ = sigterm.recv() => {}
713            }
714        }
715        Err(e) => {
716            tracing::warn!(error = %e, "SIGTERM handler unavailable, falling back to SIGINT only");
717            let _ = tokio::signal::ctrl_c().await;
718        }
719    }
720}
721
722#[cfg(not(unix))]
723async fn shutdown_signal() {
724    let _ = tokio::signal::ctrl_c().await;
725}
726
727/// Run one plugin's `on_shutdown` under a per-plugin timeout, logging (never
728/// propagating) errors and timeouts. Used for both start-failure rollback
729/// and post-serve drain so the two paths can't drift apart.
730async fn drain_plugin(
731    p: &dyn ArclyPlugin,
732    container: &'static crate::core::engine::FrozenDiContainer,
733    phase: &str,
734    budget: Duration,
735) {
736    match tokio::time::timeout(budget, p.on_shutdown(container)).await {
737        Ok(Ok(())) => {}
738        Ok(Err(e)) => {
739            tracing::error!(plugin = p.name(), phase, error = %e, "plugin shutdown error")
740        }
741        Err(_) => tracing::warn!(
742            plugin = p.name(),
743            phase,
744            budget = ?budget,
745            "plugin shutdown exceeded budget — skipped"
746        ),
747    }
748}
749
750fn plugin_io_err(e: PluginError) -> std::io::Error {
751    let kind = match e.stage {
752        PluginStage::Init => std::io::ErrorKind::InvalidInput,
753        PluginStage::Start => std::io::ErrorKind::ConnectionRefused,
754        PluginStage::Shutdown => std::io::ErrorKind::Other,
755    };
756    std::io::Error::new(kind, e)
757}
758
759#[cfg(test)]
760mod tests {
761    use super::*;
762
763    #[test]
764    fn env_overrides_apply_and_ignore_garbage() {
765        let cfg = LaunchConfig::default().apply_overrides(|k| match k {
766            "ARCLY_REQUEST_TIMEOUT_MS" => Some("1500".into()),
767            "ARCLY_MAX_IN_FLIGHT" => Some("notanumber".into()), // ignored, keeps default
768            "ARCLY_MAX_BODY_BYTES" => Some("1024".into()),
769            "ARCLY_EXPOSE_DOCS" => Some("false".into()),
770            _ => None,
771        });
772        assert_eq!(cfg.request_timeout, Duration::from_millis(1500));
773        assert_eq!(cfg.max_in_flight, 0, "unparseable override is ignored");
774        assert_eq!(cfg.max_body_bytes, 1024);
775        assert!(!cfg.expose_docs);
776        // Untouched fields keep coded defaults.
777        assert_eq!(cfg.drain_budget, Duration::from_secs(5));
778    }
779}