arcly-http 0.2.2

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
//! Launch contract.
//!
//! Boot phases (executed strictly in this order — order matters):
//!
//! 1. Collect inventory-registered providers + routes.
//! 2. Run each plugin's `on_init` with a *mutable* `ArclyPluginContext` —
//!    plugins may queue providers (`provide<T>`), routes, global
//!    interceptors, OpenAPI mutators.
//! 3. Apply queued provider closures to the `DiContainerBuilder`.
//! 4. **Freeze.** The container becomes `&'static`, lock-free for reads.
//! 5. Build the OpenAPI spec, run plugin spec-mutators, leak it as `&'static`.
//! 6. Mount macro-registered routes, then plugin-registered routes.
//! 7. Bind the listener.
//! 8. Run each plugin's `on_start(&container)` — background tasks spawn here.
//! 9. Serve. Ctrl-C / SIGTERM triggers axum's graceful shutdown — accepts
//!    stop, in-flight drain. **Only after** that completes do plugin
//!    `on_shutdown(&container)` calls run, each wrapped in a 5-second
//!    per-plugin timeout so a wedged plugin can never wedge the process.

use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;

use axum::response::{Html, IntoResponse};
use axum::routing::get;

use crate::core::engine::{
    DiContainerBuilder, HttpMethod, Module, ModuleDescriptor, RouteDescriptor,
};
use crate::core::plugins::{
    build_plugin_route, ArclyPlugin, ArclyPluginContext, PluginError, PluginStage,
};
use crate::openapi::{build_spec_filtered, OpenApiInfo, SWAGGER_UI_HTML};
use crate::realtime::gateway::GatewayDescriptor;
use crate::realtime::{ws_route, ConnectionRegistry};
use crate::web::boundary::adapt;

/// Tunables for the launch contract. Start from `Default` and adjust:
///
/// ```ignore
/// App::launch_configured::<AppModule>(addr, info, plugins, LaunchConfig {
///     request_timeout: Duration::from_secs(10),
///     max_in_flight: 4_096,
///     cors: Some(CorsConfig::for_origins(["https://app.example.com"])),
///     ..Default::default()
/// }).await
/// ```
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct LaunchConfig {
    /// Per-plugin budget for `on_shutdown` / start-failure rollback (and the
    /// concurrent `on_draining` notification). A plugin exceeding it is
    /// skipped with a logged warning — it can never wedge the process.
    /// Default `5s`.
    pub drain_budget: Duration,
    /// Process-wide request deadline. Routes without `#[Timeout]` are
    /// cancelled (worker freed) and answered `504` past this. `ZERO`
    /// disables. Default `30s`.
    pub request_timeout: Duration,
    /// Hard cap on concurrent in-flight requests — beyond it requests are
    /// shed with `503` + `Retry-After` before any body is read (one atomic
    /// counter, no locks). `0` = unlimited. Default `0`.
    pub max_in_flight: usize,
    /// Request body cap for every entry point. Default 8 MiB.
    pub max_body_bytes: usize,
    /// Ceiling on `#[CacheTTL]` store entries — the key includes the query
    /// string, so an unbounded store is a memory-DoS vector. Default 10 000.
    pub cache_max_entries: usize,
    /// How often the cache sweeper reclaims expired entries. Default `30s`.
    pub cache_sweep_interval: Duration,
    /// After a shutdown signal, WebSocket clients get this long to finish
    /// before the server sends Close frames — otherwise live sockets keep
    /// the HTTP drain (and therefore plugin `on_shutdown`) waiting until
    /// the supervisor SIGKILLs. Default `10s`.
    pub ws_drain_deadline: Duration,
    /// CORS policy. `None` (default) mounts no CORS layer at all.
    pub cors: Option<crate::web::cors::CorsConfig>,
    /// Programmatic shutdown trigger — notifying it behaves exactly like
    /// receiving SIGTERM (drain flag, WS deadline, plugin hooks). Wired by
    /// `testing::TestServer::shutdown`; production should keep `None` and
    /// use real signals.
    pub shutdown_trigger: Option<std::sync::Arc<tokio::sync::Notify>>,
    /// Mount `/docs` (Swagger UI) and `/openapi.json`. Default `true`;
    /// disable in hardened deployments that publish the spec elsewhere.
    pub expose_docs: bool,
    /// Per-socket outbound queue depth — the slow-client memory ceiling; a
    /// client that can't drain it is evicted. Default `256`.
    pub ws_outbound_buffer: usize,
    /// Hard cap on concurrent WebSocket connections across all gateways;
    /// beyond it upgrades get `503` before any socket exists. `0` =
    /// unlimited. Default `0`.
    pub ws_max_connections: usize,
    /// Server→client Ping cadence; pongs feed the idle sweeper. `ZERO`
    /// disables. Default `20s`.
    pub ws_ping_interval: Duration,
    /// Adaptive latency shedding: when the EWMA of request latency exceeds
    /// this target, a pressure-proportional slice of traffic is shed with
    /// `503` (capped at 90% so the EWMA can recover). `ZERO` disables.
    /// Default `ZERO` (opt-in).
    pub adaptive_shed_target: Duration,
    /// Reap sockets with no inbound activity for this long — dead TCP links
    /// (NAT drops) never send Close and would linger forever. `ZERO`
    /// disables. Default `60s`.
    pub ws_idle_timeout: Duration,
}

impl Default for LaunchConfig {
    fn default() -> Self {
        Self {
            drain_budget: Duration::from_secs(5),
            request_timeout: Duration::from_secs(30),
            max_in_flight: 0,
            max_body_bytes: 8 * 1024 * 1024,
            cache_max_entries: 10_000,
            cache_sweep_interval: Duration::from_secs(30),
            ws_drain_deadline: Duration::from_secs(10),
            adaptive_shed_target: Duration::ZERO,
            cors: None,
            shutdown_trigger: None,
            expose_docs: true,
            ws_outbound_buffer: 256,
            ws_max_connections: 0,
            ws_ping_interval: Duration::from_secs(20),
            ws_idle_timeout: Duration::from_secs(60),
        }
    }
}

impl LaunchConfig {
    pub fn drain_budget(mut self, v: Duration) -> Self {
        self.drain_budget = v;
        self
    }
    pub fn request_timeout(mut self, v: Duration) -> Self {
        self.request_timeout = v;
        self
    }
    pub fn max_in_flight(mut self, v: usize) -> Self {
        self.max_in_flight = v;
        self
    }
    pub fn max_body_bytes(mut self, v: usize) -> Self {
        self.max_body_bytes = v;
        self
    }
    pub fn cache_max_entries(mut self, v: usize) -> Self {
        self.cache_max_entries = v;
        self
    }
    pub fn cache_sweep_interval(mut self, v: Duration) -> Self {
        self.cache_sweep_interval = v;
        self
    }
    pub fn ws_drain_deadline(mut self, v: Duration) -> Self {
        self.ws_drain_deadline = v;
        self
    }
    pub fn adaptive_shed_target(mut self, v: Duration) -> Self {
        self.adaptive_shed_target = v;
        self
    }
    pub fn cors(mut self, v: crate::web::cors::CorsConfig) -> Self {
        self.cors = Some(v);
        self
    }
    pub fn shutdown_trigger(mut self, v: std::sync::Arc<tokio::sync::Notify>) -> Self {
        self.shutdown_trigger = Some(v);
        self
    }
    pub fn expose_docs(mut self, v: bool) -> Self {
        self.expose_docs = v;
        self
    }
    pub fn ws_outbound_buffer(mut self, v: usize) -> Self {
        self.ws_outbound_buffer = v;
        self
    }
    pub fn ws_max_connections(mut self, v: usize) -> Self {
        self.ws_max_connections = v;
        self
    }
    pub fn ws_ping_interval(mut self, v: Duration) -> Self {
        self.ws_ping_interval = v;
        self
    }
    pub fn ws_idle_timeout(mut self, v: Duration) -> Self {
        self.ws_idle_timeout = v;
        self
    }

    /// Apply `ARCLY_*` environment overrides on top of the coded values —
    /// applied automatically by the launch path so operators can retune a
    /// deployment (incident response, load tests) without a rebuild:
    ///
    /// | Variable | Field |
    /// |---|---|
    /// | `ARCLY_REQUEST_TIMEOUT_MS` | `request_timeout` (`0` disables) |
    /// | `ARCLY_MAX_IN_FLIGHT` | `max_in_flight` (`0` = unlimited) |
    /// | `ARCLY_MAX_BODY_BYTES` | `max_body_bytes` |
    /// | `ARCLY_CACHE_MAX_ENTRIES` | `cache_max_entries` |
    /// | `ARCLY_WS_DRAIN_DEADLINE_MS` | `ws_drain_deadline` |
    /// | `ARCLY_DRAIN_BUDGET_MS` | `drain_budget` |
    /// | `ARCLY_EXPOSE_DOCS` | `expose_docs` (`true`/`false`/`1`/`0`) |
    ///
    /// Unparseable values are ignored with a warning — a typo must never
    /// change behaviour silently or stop the boot.
    pub fn with_env_overrides(self) -> Self {
        self.apply_overrides(|k| std::env::var(k).ok())
    }

    /// Testable core of [`with_env_overrides`](Self::with_env_overrides).
    pub(crate) fn apply_overrides(mut self, get: impl Fn(&str) -> Option<String>) -> Self {
        fn parse<T: std::str::FromStr>(key: &str, raw: String) -> Option<T> {
            match raw.parse() {
                Ok(v) => Some(v),
                Err(_) => {
                    tracing::warn!(key, value = raw, "ignoring unparseable ARCLY_* override");
                    None
                }
            }
        }
        if let Some(v) =
            get("ARCLY_REQUEST_TIMEOUT_MS").and_then(|r| parse("ARCLY_REQUEST_TIMEOUT_MS", r))
        {
            self.request_timeout = Duration::from_millis(v);
        }
        if let Some(v) = get("ARCLY_MAX_IN_FLIGHT").and_then(|r| parse("ARCLY_MAX_IN_FLIGHT", r)) {
            self.max_in_flight = v;
        }
        if let Some(v) = get("ARCLY_MAX_BODY_BYTES").and_then(|r| parse("ARCLY_MAX_BODY_BYTES", r))
        {
            self.max_body_bytes = v;
        }
        if let Some(v) =
            get("ARCLY_CACHE_MAX_ENTRIES").and_then(|r| parse("ARCLY_CACHE_MAX_ENTRIES", r))
        {
            self.cache_max_entries = v;
        }
        if let Some(v) =
            get("ARCLY_WS_DRAIN_DEADLINE_MS").and_then(|r| parse("ARCLY_WS_DRAIN_DEADLINE_MS", r))
        {
            self.ws_drain_deadline = Duration::from_millis(v);
        }
        if let Some(v) =
            get("ARCLY_DRAIN_BUDGET_MS").and_then(|r| parse("ARCLY_DRAIN_BUDGET_MS", r))
        {
            self.drain_budget = Duration::from_millis(v);
        }
        if let Some(v) =
            get("ARCLY_WS_OUTBOUND_BUFFER").and_then(|r| parse("ARCLY_WS_OUTBOUND_BUFFER", r))
        {
            self.ws_outbound_buffer = v;
        }
        if let Some(v) =
            get("ARCLY_WS_MAX_CONNECTIONS").and_then(|r| parse("ARCLY_WS_MAX_CONNECTIONS", r))
        {
            self.ws_max_connections = v;
        }
        if let Some(v) =
            get("ARCLY_WS_PING_INTERVAL_MS").and_then(|r| parse("ARCLY_WS_PING_INTERVAL_MS", r))
        {
            self.ws_ping_interval = Duration::from_millis(v);
        }
        if let Some(v) =
            get("ARCLY_WS_IDLE_TIMEOUT_MS").and_then(|r| parse("ARCLY_WS_IDLE_TIMEOUT_MS", r))
        {
            self.ws_idle_timeout = Duration::from_millis(v);
        }
        if let Some(v) = get("ARCLY_ADAPTIVE_SHED_TARGET_MS")
            .and_then(|r| parse("ARCLY_ADAPTIVE_SHED_TARGET_MS", r))
        {
            self.adaptive_shed_target = Duration::from_millis(v);
        }
        if let Some(raw) = get("ARCLY_EXPOSE_DOCS") {
            match raw.as_str() {
                "true" | "1" => self.expose_docs = true,
                "false" | "0" => self.expose_docs = false,
                _ => tracing::warn!(value = raw, "ignoring unparseable ARCLY_EXPOSE_DOCS"),
            }
        }
        self
    }
}

pub struct App;

impl App {
    pub async fn launch<RootMod: Module>(addr: &str) -> std::io::Result<()> {
        let info = OpenApiInfo::new("arcly-http service", env!("CARGO_PKG_VERSION"));
        Self::launch_with_info::<RootMod>(addr, info).await
    }

    pub async fn launch_named<RootMod: Module>(
        addr: &str,
        title: &'static str,
        version: &'static str,
    ) -> std::io::Result<()> {
        let info = OpenApiInfo::new(title, version);
        Self::launch_with_info::<RootMod>(addr, info).await
    }

    pub async fn launch_with_info<RootMod: Module>(
        addr: &str,
        info: OpenApiInfo,
    ) -> std::io::Result<()> {
        Self::launch_with_plugins::<RootMod>(addr, info, Vec::new()).await
    }

    /// Full launch contract with plugins. See the module docstring for the
    /// strict phase ordering.
    pub async fn launch_with_plugins<RootMod: Module>(
        addr: &str,
        info: OpenApiInfo,
        plugins: Vec<Box<dyn ArclyPlugin>>,
    ) -> std::io::Result<()> {
        Self::launch_configured::<RootMod>(addr, info, plugins, LaunchConfig::default()).await
    }

    /// `launch_with_plugins` with explicit [`LaunchConfig`] tunables.
    pub async fn launch_configured<RootMod: Module>(
        addr: &str,
        info: OpenApiInfo,
        plugins: Vec<Box<dyn ArclyPlugin>>,
        config: LaunchConfig,
    ) -> std::io::Result<()> {
        let listener = tokio::net::TcpListener::bind(addr).await?;
        Self::launch_on_listener::<RootMod>(listener, info, plugins, config).await
    }

    /// `launch_configured` over a pre-bound listener. This is how test
    /// harnesses dodge the bind-the-port-twice race: the port is yours from
    /// the moment you bind it, with no release-and-rebind window for a
    /// parallel launch to steal it. Production code normally passes an
    /// address string instead.
    pub async fn launch_on_listener<RootMod: Module>(
        listener: tokio::net::TcpListener,
        info: OpenApiInfo,
        mut plugins: Vec<Box<dyn ArclyPlugin>>,
        config: LaunchConfig,
    ) -> std::io::Result<()> {
        let _root: PhantomData<RootMod> = PhantomData;
        // ARCLY_* env vars override coded values — retune without a rebuild.
        let config = config.with_env_overrides();
        tracing::info!(
            request_timeout = ?config.request_timeout,
            max_in_flight = config.max_in_flight,
            max_body_bytes = config.max_body_bytes,
            expose_docs = config.expose_docs,
            "launch config (effective)"
        );

        // ── 0. Walk the module DAG from RootMod ────────────────────
        let reachable_modules = collect_reachable_modules(RootMod::descriptor());
        let allowed_controllers: std::collections::HashSet<&'static str> = reachable_modules
            .iter()
            .flat_map(|m| m.controllers.iter().copied())
            .collect();

        // ── 1. Providers from reachable modules only ───────────────
        let mut b = DiContainerBuilder::new();
        for m in &reachable_modules {
            for p in m.providers {
                b.add_provider(p);
            }
        }

        // ── 2. Plugin on_init — they may queue providers + routes ──
        let mut plugin_ctx = ArclyPluginContext::new();
        for p in plugins.iter_mut() {
            plugin_ctx.current_plugin = p.name();
            if let Err(e) = p.on_init(&mut plugin_ctx).await {
                return Err(plugin_io_err(e));
            }
        }

        // ── 3. Apply queued provider closures ──────────────────────
        for f in plugin_ctx.pending_providers.drain(..) {
            f(&mut b);
        }

        // ── 4. Freeze the container — `&'static`, lock-free reads ──
        // The dynamic route table goes in first so services and plugins can
        // `Inject<DynamicRouteTable>` and mount `/_plugins/*` routes at runtime.
        b.register(crate::web::dynamic::DynamicRouteTable::new());
        // Per-app body cap rides the frozen container (lock-free probe) —
        // a process-global knob would let concurrent apps clobber each other.
        b.register(crate::web::boundary::BodyLimit(config.max_body_bytes));
        let container = b.freeze();

        // ── 5. OpenAPI: build (scoped to the module DAG), then mutate ──
        let mut spec_value = build_spec_filtered(&info, Some(&allowed_controllers));
        for mutator in plugin_ctx.openapi_mutators.drain(..) {
            mutator(&mut spec_value);
        }
        // Serialize ONCE — /openapi.json then serves a static byte slice
        // instead of cloning + re-serializing a multi-MB Value per request.
        let spec_bytes: &'static [u8] = Box::leak(
            serde_json::to_vec(&spec_value)
                .unwrap_or_else(|e| {
                    // Spec is built from our own serde_json::Value — failure
                    // here is a framework bug; fail at boot, not per-request.
                    panic!("Arcly: OpenAPI spec serialization failed: {e}")
                })
                .into_boxed_slice(),
        );

        // ── 6. Mount routes (filtered by reachable controller set) ─
        // Plugin-registered global interceptors wrap every mounted route;
        // leak the list so route closures can hold a `&'static` slice.
        let globals: &'static [&'static dyn crate::web::interceptors::Interceptor] =
            Box::leak(std::mem::take(&mut plugin_ctx.global_interceptors).into_boxed_slice());
        // Boundary filters run before the body is read, on every entry point.
        let filters: &'static [&'static dyn crate::web::boundary::BoundaryFilter] =
            Box::leak(std::mem::take(&mut plugin_ctx.boundary_filters).into_boxed_slice());

        let mut router: axum::Router<&'static crate::core::engine::FrozenDiContainer> =
            axum::Router::new();
        let mut mounted: std::collections::HashSet<(&'static str, HttpMethod)> =
            std::collections::HashSet::new();
        // Framework-reserved routes, mounted below — plugins may not shadow them.
        mounted.insert(("/openapi.json", HttpMethod::GET));
        mounted.insert(("/docs", HttpMethod::GET));
        for rt in inventory::iter::<&'static RouteDescriptor> {
            // Empty `controller` = free-fn route → always mount.
            // Non-empty = must belong to a controller in the reachable DAG.
            if !rt.controller.is_empty() && !allowed_controllers.contains(rt.controller) {
                continue;
            }
            mounted.insert((rt.path, rt.method));
            router = router.route(axum_path_static(rt.path), adapt(rt, globals, filters));
            // NestJS semantics: `#[Get("/")]` on a `/products` controller
            // serves BOTH `/products/` and `/products` — previously the
            // bare form 404'd. Skipped when the bare path is taken.
            if rt.path.len() > 1 && rt.path.ends_with('/') {
                let trimmed: &'static str =
                    Box::leak(rt.path[..rt.path.len() - 1].to_owned().into_boxed_str());
                if mounted.insert((trimmed, rt.method)) {
                    router = router.route(axum_path_static(trimmed), adapt(rt, globals, filters));
                }
            }
        }
        let mut app = router.with_state(container);
        for r in &plugin_ctx.extra_routes {
            // Surface collisions as a clean error naming the plugin, instead
            // of letting axum's `route()` panic deep inside launch.
            if !mounted.insert((r.path, r.method)) {
                return Err(plugin_io_err(PluginError::new(
                    r.plugin,
                    PluginStage::Init,
                    format!(
                        "route `{:?} {}` is already mounted by another route or plugin",
                        r.method, r.path
                    ),
                )));
            }
            app = app.route(
                axum_path_static(r.path),
                build_plugin_route(container, r, globals, filters),
            );
        }

        // Runtime-mutable plugin routes: one catch-all, dispatch via ArcSwap.
        // Interceptors are installed into the table so mounts compose their
        // chain once, at mount time.
        container
            .get::<crate::web::dynamic::DynamicRouteTable>()
            .set_globals(globals);
        app = app.route(
            "/_plugins/{*rest}",
            crate::web::dynamic::dynamic_dispatch_route(container, filters),
        );

        // ── 6b. Mount real-time gateways (filtered by reachable module DAG) ─
        // One process-wide connection registry, leaked to `&'static` and shared
        // by every gateway upgrade route — sharded, lock-free on the hot path.
        let allowed_gateways: std::collections::HashSet<&'static str> = reachable_modules
            .iter()
            .flat_map(|m| m.gateways.iter().copied())
            .collect();
        let registry: &'static ConnectionRegistry = Box::leak(Box::new(ConnectionRegistry::new()));
        let ws_tuning = crate::realtime::ws::WsTuning {
            outbound_buffer: config.ws_outbound_buffer,
            max_connections: config.ws_max_connections,
            ping_interval: config.ws_ping_interval,
        };
        // Idle sweeper: dead TCP links (NAT drops) never send Close — reap
        // sockets with no inbound activity past the timeout. Pings above
        // keep healthy-but-quiet clients alive via pongs.
        if !config.ws_idle_timeout.is_zero() {
            let idle = config.ws_idle_timeout;
            tokio::spawn(async move {
                let mut tick = tokio::time::interval(idle / 2);
                tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
                loop {
                    tick.tick().await;
                    let reaped = registry.sweep_idle(idle.as_secs());
                    if !reaped.is_empty() {
                        tracing::info!(count = reaped.len(), "reaped idle WebSocket connections");
                    }
                }
            });
        }
        for gd in inventory::iter::<&'static GatewayDescriptor> {
            if !allowed_gateways.contains(gd.name) {
                continue;
            }
            let runtime = (gd.build)(container);
            app = app.route(
                axum_path_static(gd.path),
                ws_route(runtime, registry, container, ws_tuning),
            );
        }

        // Docs surface is opt-out (`expose_docs` / ARCLY_EXPOSE_DOCS) for
        // hardened deployments. The paths stay reserved either way so a
        // plugin can't squat them when docs are off.
        if config.expose_docs {
            app = app
                .route(
                    "/openapi.json",
                    get(move || async move {
                        (
                            [(axum::http::header::CONTENT_TYPE, "application/json")],
                            spec_bytes,
                        )
                            .into_response()
                    }),
                )
                .route(
                    "/docs",
                    get(|| async { Html(SWAGGER_UI_HTML).into_response() }),
                );
        }
        let mut app = app.layer(axum::middleware::from_fn(
            crate::security::apply_security_headers,
        ));

        // ── 6c. Governance layers (outermost) ──────────────────────
        // CORS (when configured), then the governor: request-id, global
        // deadline, in-flight admission control. Layer order: the governor
        // is added last → wraps everything, including CORS preflights.
        if let Some(cors_cfg) = config.cors.clone() {
            let cors_cfg: &'static crate::web::cors::CorsConfig = Box::leak(Box::new(cors_cfg));
            app = app.layer(axum::middleware::from_fn(
                move |req: axum::extract::Request, next: axum::middleware::Next| {
                    crate::web::cors::apply_cors(cors_cfg, req, next)
                },
            ));
        }
        let gov = crate::web::governor::Governor::new(
            config.request_timeout,
            config.max_in_flight,
            config.adaptive_shed_target,
        );
        let app = app.layer(axum::middleware::from_fn(
            move |req: axum::extract::Request, next: axum::middleware::Next| {
                crate::web::governor::govern(Arc::clone(&gov), req, next)
            },
        ));

        // ── 6d. Resource governance knobs + cache sweeper ──────────
        crate::web::cache::set_capacity(config.cache_max_entries);
        crate::web::cache::spawn_sweeper(config.cache_sweep_interval);

        // ── 7. Bind ────────────────────────────────────────────────
        // (already bound — handed in by the caller)

        // ── 8. Plugin on_start — bg tasks spawn here ───────────────
        //
        // If a plugin fails, roll back already-started plugins in reverse order
        // before propagating the error — prevents orphaned background tasks.
        // Each rollback `on_shutdown` gets the same 5-second per-plugin budget
        // used by the post-serve drain loop: a wedged plugin must not hang the
        // process or starve the remaining rollbacks.
        let plugins_arc: Arc<Vec<Box<dyn ArclyPlugin>>> = Arc::new(plugins);
        let mut started = 0usize;
        #[allow(clippy::explicit_counter_loop)] // counter outlives the loop for the error message
        for p in plugins_arc.iter() {
            if let Err(e) = p.on_start(container).await {
                for already in plugins_arc[..started].iter().rev() {
                    drain_plugin(already.as_ref(), container, "rollback", config.drain_budget)
                        .await;
                }
                return Err(plugin_io_err(e));
            }
            started += 1;
        }

        // ── 9. Serve with two-phase graceful shutdown ──────────────
        //
        // Phase A: SIGTERM/Ctrl-C → axum stops accepting + drains in-flight.
        // Phase B: AFTER axum has fully drained, run plugin `on_shutdown`s
        //          in reverse declaration order, wrapped in a 5s timeout.
        let plugins_for_draining = Arc::clone(&plugins_arc);
        let draining_budget = config.drain_budget;
        let ws_deadline = config.ws_drain_deadline;
        let trigger = config.shutdown_trigger.clone();
        let serve = axum::serve(listener, app).with_graceful_shutdown(async move {
            match trigger {
                Some(n) => {
                    tokio::select! {
                        _ = shutdown_signal() => {}
                        _ = n.notified() => {}
                    }
                }
                None => shutdown_signal().await,
            }
            // First thing: flip readiness so the LB stops routing new
            // traffic to this pod while in-flight requests drain.
            crate::observability::health::set_draining(true);
            tracing::info!("shutdown signal received — HTTP draining");
            // Live WebSockets would hold the drain open until clients leave
            // (i.e. forever) — give them `ws_drain_deadline`, then close.
            tokio::spawn(async move {
                tokio::time::sleep(ws_deadline).await;
                let closed = registry.close_all();
                if closed > 0 {
                    tracing::warn!(closed, "WS drain deadline reached — closed live sockets");
                }
            });
            // Notify plugins concurrently with the HTTP drain (spawned so the
            // listener closes immediately): stop consuming MQ/scheduler work
            // while in-flight HTTP requests finish. Cleanup stays in
            // `on_shutdown`, which runs only after the drain completes.
            tokio::spawn(async move {
                for p in plugins_for_draining.iter() {
                    match tokio::time::timeout(draining_budget, p.on_draining(container)).await {
                        Ok(Ok(())) => {}
                        Ok(Err(e)) => {
                            tracing::error!(plugin = p.name(), error = %e, "plugin draining error")
                        }
                        Err(_) => tracing::warn!(
                            plugin = p.name(),
                            budget = ?draining_budget,
                            "plugin on_draining exceeded budget"
                        ),
                    }
                }
            });
        });
        let serve_res = serve.await;

        // HTTP server has now fully stopped. Safe to drain plugins.
        tracing::info!(
            budget = ?config.drain_budget,
            "HTTP fully drained — running plugin on_shutdown (per-plugin budget)"
        );
        for p in plugins_arc.iter().rev() {
            drain_plugin(p.as_ref(), container, "shutdown", config.drain_budget).await;
        }
        serve_res
    }
}

/// Walk the module `imports` DAG breadth-first from the root, deduplicating
/// by descriptor pointer identity. Returns descriptors in a stable, root-first
/// traversal order.
/// Translate arcly's NestJS-style path syntax (`/users/:id`, `/files/*rest`)
/// into axum 0.8 matcher syntax (`/users/{id}`, `/files/{*rest}`).
/// Route descriptors keep the user syntax (OpenAPI + metrics labels read
/// it); only the string handed to the axum router is translated, leaked
/// once per route at boot.
fn axum_path(path: &str) -> String {
    path.split('/')
        .map(|seg| {
            if let Some(name) = seg.strip_prefix(':') {
                format!("{{{name}}}")
            } else if let Some(name) = seg.strip_prefix('*') {
                format!("{{*{name}}}")
            } else {
                seg.to_owned()
            }
        })
        .collect::<Vec<_>>()
        .join("/")
}

fn axum_path_static(path: &str) -> &'static str {
    Box::leak(axum_path(path).into_boxed_str())
}

fn collect_reachable_modules(root: &'static ModuleDescriptor) -> Vec<&'static ModuleDescriptor> {
    use std::collections::HashSet;
    let mut visited: HashSet<*const ModuleDescriptor> = HashSet::new();
    let mut queue: std::collections::VecDeque<&'static ModuleDescriptor> =
        std::collections::VecDeque::new();
    let mut order: Vec<&'static ModuleDescriptor> = Vec::new();
    queue.push_back(root);
    while let Some(m) = queue.pop_front() {
        if !visited.insert(m as *const _) {
            continue;
        }
        order.push(m);
        for getter in m.imports {
            queue.push_back(getter());
        }
    }
    order
}

/// Wait for either SIGINT (Ctrl-C) or SIGTERM (process supervisor / K8s / Docker).
///
/// On Windows only SIGINT is available; the cfg guard keeps the unix-specific
/// import out of non-unix builds.
#[cfg(unix)]
async fn shutdown_signal() {
    use tokio::signal::unix::{signal, SignalKind};
    match signal(SignalKind::terminate()) {
        Ok(mut sigterm) => {
            tokio::select! {
                _ = tokio::signal::ctrl_c() => {}
                _ = sigterm.recv() => {}
            }
        }
        Err(e) => {
            tracing::warn!(error = %e, "SIGTERM handler unavailable, falling back to SIGINT only");
            let _ = tokio::signal::ctrl_c().await;
        }
    }
}

#[cfg(not(unix))]
async fn shutdown_signal() {
    let _ = tokio::signal::ctrl_c().await;
}

/// Run one plugin's `on_shutdown` under a per-plugin timeout, logging (never
/// propagating) errors and timeouts. Used for both start-failure rollback
/// and post-serve drain so the two paths can't drift apart.
async fn drain_plugin(
    p: &dyn ArclyPlugin,
    container: &'static crate::core::engine::FrozenDiContainer,
    phase: &str,
    budget: Duration,
) {
    match tokio::time::timeout(budget, p.on_shutdown(container)).await {
        Ok(Ok(())) => {}
        Ok(Err(e)) => {
            tracing::error!(plugin = p.name(), phase, error = %e, "plugin shutdown error")
        }
        Err(_) => tracing::warn!(
            plugin = p.name(),
            phase,
            budget = ?budget,
            "plugin shutdown exceeded budget — skipped"
        ),
    }
}

fn plugin_io_err(e: PluginError) -> std::io::Error {
    let kind = match e.stage {
        PluginStage::Init => std::io::ErrorKind::InvalidInput,
        PluginStage::Start => std::io::ErrorKind::ConnectionRefused,
        PluginStage::Shutdown => std::io::ErrorKind::Other,
    };
    std::io::Error::new(kind, e)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn env_overrides_apply_and_ignore_garbage() {
        let cfg = LaunchConfig::default().apply_overrides(|k| match k {
            "ARCLY_REQUEST_TIMEOUT_MS" => Some("1500".into()),
            "ARCLY_MAX_IN_FLIGHT" => Some("notanumber".into()), // ignored, keeps default
            "ARCLY_MAX_BODY_BYTES" => Some("1024".into()),
            "ARCLY_EXPOSE_DOCS" => Some("false".into()),
            _ => None,
        });
        assert_eq!(cfg.request_timeout, Duration::from_millis(1500));
        assert_eq!(cfg.max_in_flight, 0, "unparseable override is ignored");
        assert_eq!(cfg.max_body_bytes, 1024);
        assert!(!cfg.expose_docs);
        // Untouched fields keep coded defaults.
        assert_eq!(cfg.drain_budget, Duration::from_secs(5));
    }
}