Skip to main content

arcly_http/
app.rs

1//! Launch contract.
2//!
3//! Boot phases (executed strictly in this order — order matters):
4//!
5//! 1. Collect inventory-registered providers + routes.
6//! 2. Run each plugin's `on_init` with a *mutable* `ArclyPluginContext` —
7//!    plugins may queue providers (`provide<T>`), routes, global
8//!    interceptors, OpenAPI mutators.
9//! 3. Apply queued provider closures to the `DiContainerBuilder`.
10//! 4. **Freeze.** The container becomes `&'static`, lock-free for reads.
11//! 5. Build the OpenAPI spec, run plugin spec-mutators, leak it as `&'static`.
12//! 6. Mount macro-registered routes, then plugin-registered routes.
13//! 7. Bind the listener.
14//! 8. Run each plugin's `on_start(&container)` — background tasks spawn here.
15//! 9. Serve. Ctrl-C / SIGTERM triggers axum's graceful shutdown — accepts
16//!    stop, in-flight drain. **Only after** that completes do plugin
17//!    `on_shutdown(&container)` calls run, each wrapped in a 5-second
18//!    per-plugin timeout so a wedged plugin can never wedge the process.
19
20use std::marker::PhantomData;
21use std::sync::Arc;
22use std::time::Duration;
23
24use axum::response::{Html, IntoResponse};
25use axum::routing::get;
26
27use crate::core::engine::{
28    DiContainerBuilder, HttpMethod, Module, ModuleDescriptor, RouteDescriptor,
29};
30use crate::core::plugins::{
31    build_plugin_route, ArclyPlugin, ArclyPluginContext, PluginError, PluginStage,
32};
33use crate::openapi::{build_spec_filtered, OpenApiInfo, SWAGGER_UI_HTML};
34use crate::realtime::gateway::GatewayDescriptor;
35use crate::realtime::{ws_route, ConnectionRegistry};
36use crate::web::boundary::adapt;
37
38/// Tunables for the launch contract. Start from `Default` and adjust:
39///
40/// ```ignore
41/// App::launch_configured::<AppModule>(addr, info, plugins, LaunchConfig {
42///     request_timeout: Duration::from_secs(10),
43///     max_in_flight: 4_096,
44///     cors: Some(CorsConfig::for_origins(["https://app.example.com"])),
45///     ..Default::default()
46/// }).await
47/// ```
48#[derive(Clone, Debug)]
49#[non_exhaustive]
50pub struct LaunchConfig {
51    /// Per-plugin budget for `on_shutdown` / start-failure rollback (and the
52    /// concurrent `on_draining` notification). A plugin exceeding it is
53    /// skipped with a logged warning — it can never wedge the process.
54    /// Default `5s`.
55    pub drain_budget: Duration,
56    /// Process-wide request deadline. Routes without `#[Timeout]` are
57    /// cancelled (worker freed) and answered `504` past this. `ZERO`
58    /// disables. Default `30s`.
59    pub request_timeout: Duration,
60    /// Hard cap on concurrent in-flight requests — beyond it requests are
61    /// shed with `503` + `Retry-After` before any body is read (one atomic
62    /// counter, no locks). `0` = unlimited. Default `0`.
63    pub max_in_flight: usize,
64    /// Request body cap for every entry point. Default 8 MiB.
65    pub max_body_bytes: usize,
66    /// Ceiling on `#[CacheTTL]` store entries — the key includes the query
67    /// string, so an unbounded store is a memory-DoS vector. Default 10 000.
68    pub cache_max_entries: usize,
69    /// How often the cache sweeper reclaims expired entries. Default `30s`.
70    pub cache_sweep_interval: Duration,
71    /// After a shutdown signal, WebSocket clients get this long to finish
72    /// before the server sends Close frames — otherwise live sockets keep
73    /// the HTTP drain (and therefore plugin `on_shutdown`) waiting until
74    /// the supervisor SIGKILLs. Default `10s`.
75    pub ws_drain_deadline: Duration,
76    /// CORS policy. `None` (default) mounts no CORS layer at all.
77    pub cors: Option<crate::web::cors::CorsConfig>,
78    /// Programmatic shutdown trigger — notifying it behaves exactly like
79    /// receiving SIGTERM (drain flag, WS deadline, plugin hooks). Wired by
80    /// `testing::TestServer::shutdown`; production should keep `None` and
81    /// use real signals.
82    pub shutdown_trigger: Option<std::sync::Arc<tokio::sync::Notify>>,
83    /// Mount `/docs` (Swagger UI) and `/openapi.json`. Default `true`;
84    /// disable in hardened deployments that publish the spec elsewhere.
85    pub expose_docs: bool,
86    /// Per-socket outbound queue depth — the slow-client memory ceiling; a
87    /// client that can't drain it is evicted. Default `256`.
88    pub ws_outbound_buffer: usize,
89    /// Hard cap on concurrent WebSocket connections across all gateways;
90    /// beyond it upgrades get `503` before any socket exists. `0` =
91    /// unlimited. Default `0`.
92    pub ws_max_connections: usize,
93    /// Server→client Ping cadence; pongs feed the idle sweeper. `ZERO`
94    /// disables. Default `20s`.
95    pub ws_ping_interval: Duration,
96    /// Adaptive latency shedding: when the EWMA of request latency exceeds
97    /// this target, a pressure-proportional slice of traffic is shed with
98    /// `503` (capped at 90% so the EWMA can recover). `ZERO` disables.
99    /// Default `ZERO` (opt-in).
100    pub adaptive_shed_target: Duration,
101    /// Reap sockets with no inbound activity for this long — dead TCP links
102    /// (NAT drops) never send Close and would linger forever. `ZERO`
103    /// disables. Default `60s`.
104    pub ws_idle_timeout: Duration,
105}
106
107impl Default for LaunchConfig {
108    fn default() -> Self {
109        Self {
110            drain_budget: Duration::from_secs(5),
111            request_timeout: Duration::from_secs(30),
112            max_in_flight: 0,
113            max_body_bytes: 8 * 1024 * 1024,
114            cache_max_entries: 10_000,
115            cache_sweep_interval: Duration::from_secs(30),
116            ws_drain_deadline: Duration::from_secs(10),
117            adaptive_shed_target: Duration::ZERO,
118            cors: None,
119            shutdown_trigger: None,
120            expose_docs: true,
121            ws_outbound_buffer: 256,
122            ws_max_connections: 0,
123            ws_ping_interval: Duration::from_secs(20),
124            ws_idle_timeout: Duration::from_secs(60),
125        }
126    }
127}
128
129impl LaunchConfig {
130    pub fn drain_budget(mut self, v: Duration) -> Self {
131        self.drain_budget = v;
132        self
133    }
134    pub fn request_timeout(mut self, v: Duration) -> Self {
135        self.request_timeout = v;
136        self
137    }
138    pub fn max_in_flight(mut self, v: usize) -> Self {
139        self.max_in_flight = v;
140        self
141    }
142    pub fn max_body_bytes(mut self, v: usize) -> Self {
143        self.max_body_bytes = v;
144        self
145    }
146    pub fn cache_max_entries(mut self, v: usize) -> Self {
147        self.cache_max_entries = v;
148        self
149    }
150    pub fn cache_sweep_interval(mut self, v: Duration) -> Self {
151        self.cache_sweep_interval = v;
152        self
153    }
154    pub fn ws_drain_deadline(mut self, v: Duration) -> Self {
155        self.ws_drain_deadline = v;
156        self
157    }
158    pub fn adaptive_shed_target(mut self, v: Duration) -> Self {
159        self.adaptive_shed_target = v;
160        self
161    }
162    pub fn cors(mut self, v: crate::web::cors::CorsConfig) -> Self {
163        self.cors = Some(v);
164        self
165    }
166    pub fn shutdown_trigger(mut self, v: std::sync::Arc<tokio::sync::Notify>) -> Self {
167        self.shutdown_trigger = Some(v);
168        self
169    }
170    pub fn expose_docs(mut self, v: bool) -> Self {
171        self.expose_docs = v;
172        self
173    }
174    pub fn ws_outbound_buffer(mut self, v: usize) -> Self {
175        self.ws_outbound_buffer = v;
176        self
177    }
178    pub fn ws_max_connections(mut self, v: usize) -> Self {
179        self.ws_max_connections = v;
180        self
181    }
182    pub fn ws_ping_interval(mut self, v: Duration) -> Self {
183        self.ws_ping_interval = v;
184        self
185    }
186    pub fn ws_idle_timeout(mut self, v: Duration) -> Self {
187        self.ws_idle_timeout = v;
188        self
189    }
190
191    /// Apply `ARCLY_*` environment overrides on top of the coded values —
192    /// applied automatically by the launch path so operators can retune a
193    /// deployment (incident response, load tests) without a rebuild:
194    ///
195    /// | Variable | Field |
196    /// |---|---|
197    /// | `ARCLY_REQUEST_TIMEOUT_MS` | `request_timeout` (`0` disables) |
198    /// | `ARCLY_MAX_IN_FLIGHT` | `max_in_flight` (`0` = unlimited) |
199    /// | `ARCLY_MAX_BODY_BYTES` | `max_body_bytes` |
200    /// | `ARCLY_CACHE_MAX_ENTRIES` | `cache_max_entries` |
201    /// | `ARCLY_WS_DRAIN_DEADLINE_MS` | `ws_drain_deadline` |
202    /// | `ARCLY_DRAIN_BUDGET_MS` | `drain_budget` |
203    /// | `ARCLY_EXPOSE_DOCS` | `expose_docs` (`true`/`false`/`1`/`0`) |
204    ///
205    /// Unparseable values are ignored with a warning — a typo must never
206    /// change behaviour silently or stop the boot.
207    pub fn with_env_overrides(self) -> Self {
208        self.apply_overrides(|k| std::env::var(k).ok())
209    }
210
211    /// Testable core of [`with_env_overrides`](Self::with_env_overrides).
212    pub(crate) fn apply_overrides(mut self, get: impl Fn(&str) -> Option<String>) -> Self {
213        fn parse<T: std::str::FromStr>(key: &str, raw: String) -> Option<T> {
214            match raw.parse() {
215                Ok(v) => Some(v),
216                Err(_) => {
217                    tracing::warn!(key, value = raw, "ignoring unparseable ARCLY_* override");
218                    None
219                }
220            }
221        }
222        if let Some(v) =
223            get("ARCLY_REQUEST_TIMEOUT_MS").and_then(|r| parse("ARCLY_REQUEST_TIMEOUT_MS", r))
224        {
225            self.request_timeout = Duration::from_millis(v);
226        }
227        if let Some(v) = get("ARCLY_MAX_IN_FLIGHT").and_then(|r| parse("ARCLY_MAX_IN_FLIGHT", r)) {
228            self.max_in_flight = v;
229        }
230        if let Some(v) = get("ARCLY_MAX_BODY_BYTES").and_then(|r| parse("ARCLY_MAX_BODY_BYTES", r))
231        {
232            self.max_body_bytes = v;
233        }
234        if let Some(v) =
235            get("ARCLY_CACHE_MAX_ENTRIES").and_then(|r| parse("ARCLY_CACHE_MAX_ENTRIES", r))
236        {
237            self.cache_max_entries = v;
238        }
239        if let Some(v) =
240            get("ARCLY_WS_DRAIN_DEADLINE_MS").and_then(|r| parse("ARCLY_WS_DRAIN_DEADLINE_MS", r))
241        {
242            self.ws_drain_deadline = Duration::from_millis(v);
243        }
244        if let Some(v) =
245            get("ARCLY_DRAIN_BUDGET_MS").and_then(|r| parse("ARCLY_DRAIN_BUDGET_MS", r))
246        {
247            self.drain_budget = Duration::from_millis(v);
248        }
249        if let Some(v) =
250            get("ARCLY_WS_OUTBOUND_BUFFER").and_then(|r| parse("ARCLY_WS_OUTBOUND_BUFFER", r))
251        {
252            self.ws_outbound_buffer = v;
253        }
254        if let Some(v) =
255            get("ARCLY_WS_MAX_CONNECTIONS").and_then(|r| parse("ARCLY_WS_MAX_CONNECTIONS", r))
256        {
257            self.ws_max_connections = v;
258        }
259        if let Some(v) =
260            get("ARCLY_WS_PING_INTERVAL_MS").and_then(|r| parse("ARCLY_WS_PING_INTERVAL_MS", r))
261        {
262            self.ws_ping_interval = Duration::from_millis(v);
263        }
264        if let Some(v) =
265            get("ARCLY_WS_IDLE_TIMEOUT_MS").and_then(|r| parse("ARCLY_WS_IDLE_TIMEOUT_MS", r))
266        {
267            self.ws_idle_timeout = Duration::from_millis(v);
268        }
269        if let Some(v) = get("ARCLY_ADAPTIVE_SHED_TARGET_MS")
270            .and_then(|r| parse("ARCLY_ADAPTIVE_SHED_TARGET_MS", r))
271        {
272            self.adaptive_shed_target = Duration::from_millis(v);
273        }
274        if let Some(raw) = get("ARCLY_EXPOSE_DOCS") {
275            match raw.as_str() {
276                "true" | "1" => self.expose_docs = true,
277                "false" | "0" => self.expose_docs = false,
278                _ => tracing::warn!(value = raw, "ignoring unparseable ARCLY_EXPOSE_DOCS"),
279            }
280        }
281        self
282    }
283}
284
285pub struct App;
286
287impl App {
288    pub async fn launch<RootMod: Module>(addr: &str) -> std::io::Result<()> {
289        let info = OpenApiInfo::new("arcly-http service", env!("CARGO_PKG_VERSION"));
290        Self::launch_with_info::<RootMod>(addr, info).await
291    }
292
293    pub async fn launch_named<RootMod: Module>(
294        addr: &str,
295        title: &'static str,
296        version: &'static str,
297    ) -> std::io::Result<()> {
298        let info = OpenApiInfo::new(title, version);
299        Self::launch_with_info::<RootMod>(addr, info).await
300    }
301
302    pub async fn launch_with_info<RootMod: Module>(
303        addr: &str,
304        info: OpenApiInfo,
305    ) -> std::io::Result<()> {
306        Self::launch_with_plugins::<RootMod>(addr, info, Vec::new()).await
307    }
308
309    /// Full launch contract with plugins. See the module docstring for the
310    /// strict phase ordering.
311    pub async fn launch_with_plugins<RootMod: Module>(
312        addr: &str,
313        info: OpenApiInfo,
314        plugins: Vec<Box<dyn ArclyPlugin>>,
315    ) -> std::io::Result<()> {
316        Self::launch_configured::<RootMod>(addr, info, plugins, LaunchConfig::default()).await
317    }
318
319    /// `launch_with_plugins` with explicit [`LaunchConfig`] tunables.
320    pub async fn launch_configured<RootMod: Module>(
321        addr: &str,
322        info: OpenApiInfo,
323        mut plugins: Vec<Box<dyn ArclyPlugin>>,
324        config: LaunchConfig,
325    ) -> std::io::Result<()> {
326        let _root: PhantomData<RootMod> = PhantomData;
327        // ARCLY_* env vars override coded values — retune without a rebuild.
328        let config = config.with_env_overrides();
329        tracing::info!(
330            request_timeout = ?config.request_timeout,
331            max_in_flight = config.max_in_flight,
332            max_body_bytes = config.max_body_bytes,
333            expose_docs = config.expose_docs,
334            "launch config (effective)"
335        );
336
337        // ── 0. Walk the module DAG from RootMod ────────────────────
338        let reachable_modules = collect_reachable_modules(RootMod::descriptor());
339        let allowed_controllers: std::collections::HashSet<&'static str> = reachable_modules
340            .iter()
341            .flat_map(|m| m.controllers.iter().copied())
342            .collect();
343
344        // ── 1. Providers from reachable modules only ───────────────
345        let mut b = DiContainerBuilder::new();
346        for m in &reachable_modules {
347            for p in m.providers {
348                b.add_provider(p);
349            }
350        }
351
352        // ── 2. Plugin on_init — they may queue providers + routes ──
353        let mut plugin_ctx = ArclyPluginContext::new();
354        for p in plugins.iter_mut() {
355            plugin_ctx.current_plugin = p.name();
356            if let Err(e) = p.on_init(&mut plugin_ctx).await {
357                return Err(plugin_io_err(e));
358            }
359        }
360
361        // ── 3. Apply queued provider closures ──────────────────────
362        for f in plugin_ctx.pending_providers.drain(..) {
363            f(&mut b);
364        }
365
366        // ── 4. Freeze the container — `&'static`, lock-free reads ──
367        // The dynamic route table goes in first so services and plugins can
368        // `Inject<DynamicRouteTable>` and mount `/_plugins/*` routes at runtime.
369        b.register(crate::web::dynamic::DynamicRouteTable::new());
370        // Per-app body cap rides the frozen container (lock-free probe) —
371        // a process-global knob would let concurrent apps clobber each other.
372        b.register(crate::web::boundary::BodyLimit(config.max_body_bytes));
373        let container = b.freeze();
374
375        // ── 5. OpenAPI: build (scoped to the module DAG), then mutate ──
376        let mut spec_value = build_spec_filtered(&info, Some(&allowed_controllers));
377        for mutator in plugin_ctx.openapi_mutators.drain(..) {
378            mutator(&mut spec_value);
379        }
380        // Serialize ONCE — /openapi.json then serves a static byte slice
381        // instead of cloning + re-serializing a multi-MB Value per request.
382        let spec_bytes: &'static [u8] = Box::leak(
383            serde_json::to_vec(&spec_value)
384                .unwrap_or_else(|e| {
385                    // Spec is built from our own serde_json::Value — failure
386                    // here is a framework bug; fail at boot, not per-request.
387                    panic!("Arcly: OpenAPI spec serialization failed: {e}")
388                })
389                .into_boxed_slice(),
390        );
391
392        // ── 6. Mount routes (filtered by reachable controller set) ─
393        // Plugin-registered global interceptors wrap every mounted route;
394        // leak the list so route closures can hold a `&'static` slice.
395        let globals: &'static [&'static dyn crate::web::interceptors::Interceptor] =
396            Box::leak(std::mem::take(&mut plugin_ctx.global_interceptors).into_boxed_slice());
397        // Boundary filters run before the body is read, on every entry point.
398        let filters: &'static [&'static dyn crate::web::boundary::BoundaryFilter] =
399            Box::leak(std::mem::take(&mut plugin_ctx.boundary_filters).into_boxed_slice());
400
401        let mut router: axum::Router<&'static crate::core::engine::FrozenDiContainer> =
402            axum::Router::new();
403        let mut mounted: std::collections::HashSet<(&'static str, HttpMethod)> =
404            std::collections::HashSet::new();
405        // Framework-reserved routes, mounted below — plugins may not shadow them.
406        mounted.insert(("/openapi.json", HttpMethod::GET));
407        mounted.insert(("/docs", HttpMethod::GET));
408        for rt in inventory::iter::<&'static RouteDescriptor> {
409            // Empty `controller` = free-fn route → always mount.
410            // Non-empty = must belong to a controller in the reachable DAG.
411            if !rt.controller.is_empty() && !allowed_controllers.contains(rt.controller) {
412                continue;
413            }
414            mounted.insert((rt.path, rt.method));
415            router = router.route(axum_path_static(rt.path), adapt(rt, globals, filters));
416            // NestJS semantics: `#[Get("/")]` on a `/products` controller
417            // serves BOTH `/products/` and `/products` — previously the
418            // bare form 404'd. Skipped when the bare path is taken.
419            if rt.path.len() > 1 && rt.path.ends_with('/') {
420                let trimmed: &'static str =
421                    Box::leak(rt.path[..rt.path.len() - 1].to_owned().into_boxed_str());
422                if mounted.insert((trimmed, rt.method)) {
423                    router = router.route(axum_path_static(trimmed), adapt(rt, globals, filters));
424                }
425            }
426        }
427        let mut app = router.with_state(container);
428        for r in &plugin_ctx.extra_routes {
429            // Surface collisions as a clean error naming the plugin, instead
430            // of letting axum's `route()` panic deep inside launch.
431            if !mounted.insert((r.path, r.method)) {
432                return Err(plugin_io_err(PluginError::new(
433                    r.plugin,
434                    PluginStage::Init,
435                    format!(
436                        "route `{:?} {}` is already mounted by another route or plugin",
437                        r.method, r.path
438                    ),
439                )));
440            }
441            app = app.route(
442                axum_path_static(r.path),
443                build_plugin_route(container, r, globals, filters),
444            );
445        }
446
447        // Runtime-mutable plugin routes: one catch-all, dispatch via ArcSwap.
448        // Interceptors are installed into the table so mounts compose their
449        // chain once, at mount time.
450        container
451            .get::<crate::web::dynamic::DynamicRouteTable>()
452            .set_globals(globals);
453        app = app.route(
454            "/_plugins/{*rest}",
455            crate::web::dynamic::dynamic_dispatch_route(container, filters),
456        );
457
458        // ── 6b. Mount real-time gateways (filtered by reachable module DAG) ─
459        // One process-wide connection registry, leaked to `&'static` and shared
460        // by every gateway upgrade route — sharded, lock-free on the hot path.
461        let allowed_gateways: std::collections::HashSet<&'static str> = reachable_modules
462            .iter()
463            .flat_map(|m| m.gateways.iter().copied())
464            .collect();
465        let registry: &'static ConnectionRegistry = Box::leak(Box::new(ConnectionRegistry::new()));
466        let ws_tuning = crate::realtime::ws::WsTuning {
467            outbound_buffer: config.ws_outbound_buffer,
468            max_connections: config.ws_max_connections,
469            ping_interval: config.ws_ping_interval,
470        };
471        // Idle sweeper: dead TCP links (NAT drops) never send Close — reap
472        // sockets with no inbound activity past the timeout. Pings above
473        // keep healthy-but-quiet clients alive via pongs.
474        if !config.ws_idle_timeout.is_zero() {
475            let idle = config.ws_idle_timeout;
476            tokio::spawn(async move {
477                let mut tick = tokio::time::interval(idle / 2);
478                tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
479                loop {
480                    tick.tick().await;
481                    let reaped = registry.sweep_idle(idle.as_secs());
482                    if !reaped.is_empty() {
483                        tracing::info!(count = reaped.len(), "reaped idle WebSocket connections");
484                    }
485                }
486            });
487        }
488        for gd in inventory::iter::<&'static GatewayDescriptor> {
489            if !allowed_gateways.contains(gd.name) {
490                continue;
491            }
492            let runtime = (gd.build)(container);
493            app = app.route(
494                axum_path_static(gd.path),
495                ws_route(runtime, registry, container, ws_tuning),
496            );
497        }
498
499        // Docs surface is opt-out (`expose_docs` / ARCLY_EXPOSE_DOCS) for
500        // hardened deployments. The paths stay reserved either way so a
501        // plugin can't squat them when docs are off.
502        if config.expose_docs {
503            app = app
504                .route(
505                    "/openapi.json",
506                    get(move || async move {
507                        (
508                            [(axum::http::header::CONTENT_TYPE, "application/json")],
509                            spec_bytes,
510                        )
511                            .into_response()
512                    }),
513                )
514                .route(
515                    "/docs",
516                    get(|| async { Html(SWAGGER_UI_HTML).into_response() }),
517                );
518        }
519        let mut app = app.layer(axum::middleware::from_fn(
520            crate::security::apply_security_headers,
521        ));
522
523        // ── 6c. Governance layers (outermost) ──────────────────────
524        // CORS (when configured), then the governor: request-id, global
525        // deadline, in-flight admission control. Layer order: the governor
526        // is added last → wraps everything, including CORS preflights.
527        if let Some(cors_cfg) = config.cors.clone() {
528            let cors_cfg: &'static crate::web::cors::CorsConfig = Box::leak(Box::new(cors_cfg));
529            app = app.layer(axum::middleware::from_fn(
530                move |req: axum::extract::Request, next: axum::middleware::Next| {
531                    crate::web::cors::apply_cors(cors_cfg, req, next)
532                },
533            ));
534        }
535        let gov = crate::web::governor::Governor::new(
536            config.request_timeout,
537            config.max_in_flight,
538            config.adaptive_shed_target,
539        );
540        let app = app.layer(axum::middleware::from_fn(
541            move |req: axum::extract::Request, next: axum::middleware::Next| {
542                crate::web::governor::govern(Arc::clone(&gov), req, next)
543            },
544        ));
545
546        // ── 6d. Resource governance knobs + cache sweeper ──────────
547        crate::web::cache::set_capacity(config.cache_max_entries);
548        crate::web::cache::spawn_sweeper(config.cache_sweep_interval);
549
550        // ── 7. Bind ────────────────────────────────────────────────
551        let listener = tokio::net::TcpListener::bind(addr).await?;
552
553        // ── 8. Plugin on_start — bg tasks spawn here ───────────────
554        //
555        // If a plugin fails, roll back already-started plugins in reverse order
556        // before propagating the error — prevents orphaned background tasks.
557        // Each rollback `on_shutdown` gets the same 5-second per-plugin budget
558        // used by the post-serve drain loop: a wedged plugin must not hang the
559        // process or starve the remaining rollbacks.
560        let plugins_arc: Arc<Vec<Box<dyn ArclyPlugin>>> = Arc::new(plugins);
561        let mut started = 0usize;
562        #[allow(clippy::explicit_counter_loop)] // counter outlives the loop for the error message
563        for p in plugins_arc.iter() {
564            if let Err(e) = p.on_start(container).await {
565                for already in plugins_arc[..started].iter().rev() {
566                    drain_plugin(already.as_ref(), container, "rollback", config.drain_budget)
567                        .await;
568                }
569                return Err(plugin_io_err(e));
570            }
571            started += 1;
572        }
573
574        // ── 9. Serve with two-phase graceful shutdown ──────────────
575        //
576        // Phase A: SIGTERM/Ctrl-C → axum stops accepting + drains in-flight.
577        // Phase B: AFTER axum has fully drained, run plugin `on_shutdown`s
578        //          in reverse declaration order, wrapped in a 5s timeout.
579        let plugins_for_draining = Arc::clone(&plugins_arc);
580        let draining_budget = config.drain_budget;
581        let ws_deadline = config.ws_drain_deadline;
582        let trigger = config.shutdown_trigger.clone();
583        let serve = axum::serve(listener, app).with_graceful_shutdown(async move {
584            match trigger {
585                Some(n) => {
586                    tokio::select! {
587                        _ = shutdown_signal() => {}
588                        _ = n.notified() => {}
589                    }
590                }
591                None => shutdown_signal().await,
592            }
593            // First thing: flip readiness so the LB stops routing new
594            // traffic to this pod while in-flight requests drain.
595            crate::observability::health::set_draining(true);
596            tracing::info!("shutdown signal received — HTTP draining");
597            // Live WebSockets would hold the drain open until clients leave
598            // (i.e. forever) — give them `ws_drain_deadline`, then close.
599            tokio::spawn(async move {
600                tokio::time::sleep(ws_deadline).await;
601                let closed = registry.close_all();
602                if closed > 0 {
603                    tracing::warn!(closed, "WS drain deadline reached — closed live sockets");
604                }
605            });
606            // Notify plugins concurrently with the HTTP drain (spawned so the
607            // listener closes immediately): stop consuming MQ/scheduler work
608            // while in-flight HTTP requests finish. Cleanup stays in
609            // `on_shutdown`, which runs only after the drain completes.
610            tokio::spawn(async move {
611                for p in plugins_for_draining.iter() {
612                    match tokio::time::timeout(draining_budget, p.on_draining(container)).await {
613                        Ok(Ok(())) => {}
614                        Ok(Err(e)) => {
615                            tracing::error!(plugin = p.name(), error = %e, "plugin draining error")
616                        }
617                        Err(_) => tracing::warn!(
618                            plugin = p.name(),
619                            budget = ?draining_budget,
620                            "plugin on_draining exceeded budget"
621                        ),
622                    }
623                }
624            });
625        });
626        let serve_res = serve.await;
627
628        // HTTP server has now fully stopped. Safe to drain plugins.
629        tracing::info!(
630            budget = ?config.drain_budget,
631            "HTTP fully drained — running plugin on_shutdown (per-plugin budget)"
632        );
633        for p in plugins_arc.iter().rev() {
634            drain_plugin(p.as_ref(), container, "shutdown", config.drain_budget).await;
635        }
636        serve_res
637    }
638}
639
640/// Walk the module `imports` DAG breadth-first from the root, deduplicating
641/// by descriptor pointer identity. Returns descriptors in a stable, root-first
642/// traversal order.
643/// Translate arcly's NestJS-style path syntax (`/users/:id`, `/files/*rest`)
644/// into axum 0.8 matcher syntax (`/users/{id}`, `/files/{*rest}`).
645/// Route descriptors keep the user syntax (OpenAPI + metrics labels read
646/// it); only the string handed to the axum router is translated, leaked
647/// once per route at boot.
648fn axum_path(path: &str) -> String {
649    path.split('/')
650        .map(|seg| {
651            if let Some(name) = seg.strip_prefix(':') {
652                format!("{{{name}}}")
653            } else if let Some(name) = seg.strip_prefix('*') {
654                format!("{{*{name}}}")
655            } else {
656                seg.to_owned()
657            }
658        })
659        .collect::<Vec<_>>()
660        .join("/")
661}
662
663fn axum_path_static(path: &str) -> &'static str {
664    Box::leak(axum_path(path).into_boxed_str())
665}
666
667fn collect_reachable_modules(root: &'static ModuleDescriptor) -> Vec<&'static ModuleDescriptor> {
668    use std::collections::HashSet;
669    let mut visited: HashSet<*const ModuleDescriptor> = HashSet::new();
670    let mut queue: std::collections::VecDeque<&'static ModuleDescriptor> =
671        std::collections::VecDeque::new();
672    let mut order: Vec<&'static ModuleDescriptor> = Vec::new();
673    queue.push_back(root);
674    while let Some(m) = queue.pop_front() {
675        if !visited.insert(m as *const _) {
676            continue;
677        }
678        order.push(m);
679        for getter in m.imports {
680            queue.push_back(getter());
681        }
682    }
683    order
684}
685
686/// Wait for either SIGINT (Ctrl-C) or SIGTERM (process supervisor / K8s / Docker).
687///
688/// On Windows only SIGINT is available; the cfg guard keeps the unix-specific
689/// import out of non-unix builds.
690#[cfg(unix)]
691async fn shutdown_signal() {
692    use tokio::signal::unix::{signal, SignalKind};
693    match signal(SignalKind::terminate()) {
694        Ok(mut sigterm) => {
695            tokio::select! {
696                _ = tokio::signal::ctrl_c() => {}
697                _ = sigterm.recv() => {}
698            }
699        }
700        Err(e) => {
701            tracing::warn!(error = %e, "SIGTERM handler unavailable, falling back to SIGINT only");
702            let _ = tokio::signal::ctrl_c().await;
703        }
704    }
705}
706
707#[cfg(not(unix))]
708async fn shutdown_signal() {
709    let _ = tokio::signal::ctrl_c().await;
710}
711
712/// Run one plugin's `on_shutdown` under a per-plugin timeout, logging (never
713/// propagating) errors and timeouts. Used for both start-failure rollback
714/// and post-serve drain so the two paths can't drift apart.
715async fn drain_plugin(
716    p: &dyn ArclyPlugin,
717    container: &'static crate::core::engine::FrozenDiContainer,
718    phase: &str,
719    budget: Duration,
720) {
721    match tokio::time::timeout(budget, p.on_shutdown(container)).await {
722        Ok(Ok(())) => {}
723        Ok(Err(e)) => {
724            tracing::error!(plugin = p.name(), phase, error = %e, "plugin shutdown error")
725        }
726        Err(_) => tracing::warn!(
727            plugin = p.name(),
728            phase,
729            budget = ?budget,
730            "plugin shutdown exceeded budget — skipped"
731        ),
732    }
733}
734
735fn plugin_io_err(e: PluginError) -> std::io::Error {
736    let kind = match e.stage {
737        PluginStage::Init => std::io::ErrorKind::InvalidInput,
738        PluginStage::Start => std::io::ErrorKind::ConnectionRefused,
739        PluginStage::Shutdown => std::io::ErrorKind::Other,
740    };
741    std::io::Error::new(kind, e)
742}
743
744#[cfg(test)]
745mod tests {
746    use super::*;
747
748    #[test]
749    fn env_overrides_apply_and_ignore_garbage() {
750        let cfg = LaunchConfig::default().apply_overrides(|k| match k {
751            "ARCLY_REQUEST_TIMEOUT_MS" => Some("1500".into()),
752            "ARCLY_MAX_IN_FLIGHT" => Some("notanumber".into()), // ignored, keeps default
753            "ARCLY_MAX_BODY_BYTES" => Some("1024".into()),
754            "ARCLY_EXPOSE_DOCS" => Some("false".into()),
755            _ => None,
756        });
757        assert_eq!(cfg.request_timeout, Duration::from_millis(1500));
758        assert_eq!(cfg.max_in_flight, 0, "unparseable override is ignored");
759        assert_eq!(cfg.max_body_bytes, 1024);
760        assert!(!cfg.expose_docs);
761        // Untouched fields keep coded defaults.
762        assert_eq!(cfg.drain_budget, Duration::from_secs(5));
763    }
764}