Skip to main content

arcly_http/
app.rs

1//! Launch contract.
2//!
3//! Boot phases (executed strictly in this order — order matters):
4//!
5//! 1. Collect inventory-registered providers + routes.
6//! 2. Run each plugin's `on_init` with a *mutable* `ArclyPluginContext` —
7//!    plugins may queue providers (`provide<T>`), routes, global
8//!    interceptors, OpenAPI mutators.
9//! 3. Apply queued provider closures to the `DiContainerBuilder`.
10//! 4. **Freeze.** The container becomes `&'static`, lock-free for reads.
11//! 5. Build the OpenAPI spec, run plugin spec-mutators, leak it as `&'static`.
12//! 6. Mount macro-registered routes, then plugin-registered routes.
13//! 7. Bind the listener.
14//! 8. Run each plugin's `on_start(&container)` — background tasks spawn here.
15//! 9. Serve. Ctrl-C / SIGTERM triggers axum's graceful shutdown — accepts
16//!    stop, in-flight drain. **Only after** that completes do plugin
17//!    `on_shutdown(&container)` calls run, each wrapped in a 5-second
18//!    per-plugin timeout so a wedged plugin can never wedge the process.
19
20use std::marker::PhantomData;
21use std::sync::Arc;
22use std::time::Duration;
23
24use axum::response::{Html, IntoResponse};
25use axum::routing::get;
26
27use crate::core::engine::{
28    DiContainerBuilder, HttpMethod, Module, ModuleDescriptor, RouteDescriptor,
29};
30use crate::core::plugins::{
31    build_plugin_route, ArclyPlugin, ArclyPluginContext, PluginError, PluginStage,
32};
33use crate::openapi::{build_spec_filtered, OpenApiInfo, SWAGGER_UI_HTML};
34use crate::realtime::gateway::GatewayDescriptor;
35use crate::realtime::{ws_route, ConnectionRegistry};
36use crate::web::boundary::adapt;
37
38/// Tunables for the launch contract. Start from `Default` and adjust:
39///
40/// ```ignore
41/// App::launch_configured::<AppModule>(addr, info, plugins, LaunchConfig {
42///     request_timeout: Duration::from_secs(10),
43///     max_in_flight: 4_096,
44///     cors: Some(CorsConfig::for_origins(["https://app.example.com"])),
45///     ..Default::default()
46/// }).await
47/// ```
48#[derive(Clone, Debug)]
49#[non_exhaustive]
50pub struct LaunchConfig {
51    /// Per-plugin budget for `on_shutdown` / start-failure rollback (and the
52    /// concurrent `on_draining` notification). A plugin exceeding it is
53    /// skipped with a logged warning — it can never wedge the process.
54    /// Default `5s`.
55    pub drain_budget: Duration,
56    /// Process-wide request deadline. Routes without `#[Timeout]` are
57    /// cancelled (worker freed) and answered `504` past this. `ZERO`
58    /// disables. Default `30s`.
59    pub request_timeout: Duration,
60    /// Hard cap on concurrent in-flight requests — beyond it requests are
61    /// shed with `503` + `Retry-After` before any body is read (one atomic
62    /// counter, no locks). `0` = unlimited. Default `0`.
63    pub max_in_flight: usize,
64    /// Request body cap for every entry point. Default 8 MiB.
65    pub max_body_bytes: usize,
66    /// Ceiling on `#[CacheTTL]` store entries — the key includes the query
67    /// string, so an unbounded store is a memory-DoS vector. Default 10 000.
68    pub cache_max_entries: usize,
69    /// How often the cache sweeper reclaims expired entries. Default `30s`.
70    pub cache_sweep_interval: Duration,
71    /// After a shutdown signal, WebSocket clients get this long to finish
72    /// before the server sends Close frames — otherwise live sockets keep
73    /// the HTTP drain (and therefore plugin `on_shutdown`) waiting until
74    /// the supervisor SIGKILLs. Default `10s`.
75    pub ws_drain_deadline: Duration,
76    /// CORS policy. `None` (default) mounts no CORS layer at all.
77    pub cors: Option<crate::web::cors::CorsConfig>,
78    /// Mount `/docs` (Swagger UI) and `/openapi.json`. Default `true`;
79    /// disable in hardened deployments that publish the spec elsewhere.
80    pub expose_docs: bool,
81    /// Per-socket outbound queue depth — the slow-client memory ceiling; a
82    /// client that can't drain it is evicted. Default `256`.
83    pub ws_outbound_buffer: usize,
84    /// Hard cap on concurrent WebSocket connections across all gateways;
85    /// beyond it upgrades get `503` before any socket exists. `0` =
86    /// unlimited. Default `0`.
87    pub ws_max_connections: usize,
88    /// Server→client Ping cadence; pongs feed the idle sweeper. `ZERO`
89    /// disables. Default `20s`.
90    pub ws_ping_interval: Duration,
91    /// Adaptive latency shedding: when the EWMA of request latency exceeds
92    /// this target, a pressure-proportional slice of traffic is shed with
93    /// `503` (capped at 90% so the EWMA can recover). `ZERO` disables.
94    /// Default `ZERO` (opt-in).
95    pub adaptive_shed_target: Duration,
96    /// Reap sockets with no inbound activity for this long — dead TCP links
97    /// (NAT drops) never send Close and would linger forever. `ZERO`
98    /// disables. Default `60s`.
99    pub ws_idle_timeout: Duration,
100}
101
102impl Default for LaunchConfig {
103    fn default() -> Self {
104        Self {
105            drain_budget: Duration::from_secs(5),
106            request_timeout: Duration::from_secs(30),
107            max_in_flight: 0,
108            max_body_bytes: 8 * 1024 * 1024,
109            cache_max_entries: 10_000,
110            cache_sweep_interval: Duration::from_secs(30),
111            ws_drain_deadline: Duration::from_secs(10),
112            adaptive_shed_target: Duration::ZERO,
113            cors: None,
114            expose_docs: true,
115            ws_outbound_buffer: 256,
116            ws_max_connections: 0,
117            ws_ping_interval: Duration::from_secs(20),
118            ws_idle_timeout: Duration::from_secs(60),
119        }
120    }
121}
122
123impl LaunchConfig {
124    pub fn drain_budget(mut self, v: Duration) -> Self {
125        self.drain_budget = v;
126        self
127    }
128    pub fn request_timeout(mut self, v: Duration) -> Self {
129        self.request_timeout = v;
130        self
131    }
132    pub fn max_in_flight(mut self, v: usize) -> Self {
133        self.max_in_flight = v;
134        self
135    }
136    pub fn max_body_bytes(mut self, v: usize) -> Self {
137        self.max_body_bytes = v;
138        self
139    }
140    pub fn cache_max_entries(mut self, v: usize) -> Self {
141        self.cache_max_entries = v;
142        self
143    }
144    pub fn cache_sweep_interval(mut self, v: Duration) -> Self {
145        self.cache_sweep_interval = v;
146        self
147    }
148    pub fn ws_drain_deadline(mut self, v: Duration) -> Self {
149        self.ws_drain_deadline = v;
150        self
151    }
152    pub fn adaptive_shed_target(mut self, v: Duration) -> Self {
153        self.adaptive_shed_target = v;
154        self
155    }
156    pub fn cors(mut self, v: crate::web::cors::CorsConfig) -> Self {
157        self.cors = Some(v);
158        self
159    }
160    pub fn expose_docs(mut self, v: bool) -> Self {
161        self.expose_docs = v;
162        self
163    }
164    pub fn ws_outbound_buffer(mut self, v: usize) -> Self {
165        self.ws_outbound_buffer = v;
166        self
167    }
168    pub fn ws_max_connections(mut self, v: usize) -> Self {
169        self.ws_max_connections = v;
170        self
171    }
172    pub fn ws_ping_interval(mut self, v: Duration) -> Self {
173        self.ws_ping_interval = v;
174        self
175    }
176    pub fn ws_idle_timeout(mut self, v: Duration) -> Self {
177        self.ws_idle_timeout = v;
178        self
179    }
180
181    /// Apply `ARCLY_*` environment overrides on top of the coded values —
182    /// applied automatically by the launch path so operators can retune a
183    /// deployment (incident response, load tests) without a rebuild:
184    ///
185    /// | Variable | Field |
186    /// |---|---|
187    /// | `ARCLY_REQUEST_TIMEOUT_MS` | `request_timeout` (`0` disables) |
188    /// | `ARCLY_MAX_IN_FLIGHT` | `max_in_flight` (`0` = unlimited) |
189    /// | `ARCLY_MAX_BODY_BYTES` | `max_body_bytes` |
190    /// | `ARCLY_CACHE_MAX_ENTRIES` | `cache_max_entries` |
191    /// | `ARCLY_WS_DRAIN_DEADLINE_MS` | `ws_drain_deadline` |
192    /// | `ARCLY_DRAIN_BUDGET_MS` | `drain_budget` |
193    /// | `ARCLY_EXPOSE_DOCS` | `expose_docs` (`true`/`false`/`1`/`0`) |
194    ///
195    /// Unparseable values are ignored with a warning — a typo must never
196    /// change behaviour silently or stop the boot.
197    pub fn with_env_overrides(self) -> Self {
198        self.apply_overrides(|k| std::env::var(k).ok())
199    }
200
201    /// Testable core of [`with_env_overrides`](Self::with_env_overrides).
202    pub(crate) fn apply_overrides(mut self, get: impl Fn(&str) -> Option<String>) -> Self {
203        fn parse<T: std::str::FromStr>(key: &str, raw: String) -> Option<T> {
204            match raw.parse() {
205                Ok(v) => Some(v),
206                Err(_) => {
207                    tracing::warn!(key, value = raw, "ignoring unparseable ARCLY_* override");
208                    None
209                }
210            }
211        }
212        if let Some(v) =
213            get("ARCLY_REQUEST_TIMEOUT_MS").and_then(|r| parse("ARCLY_REQUEST_TIMEOUT_MS", r))
214        {
215            self.request_timeout = Duration::from_millis(v);
216        }
217        if let Some(v) = get("ARCLY_MAX_IN_FLIGHT").and_then(|r| parse("ARCLY_MAX_IN_FLIGHT", r)) {
218            self.max_in_flight = v;
219        }
220        if let Some(v) = get("ARCLY_MAX_BODY_BYTES").and_then(|r| parse("ARCLY_MAX_BODY_BYTES", r))
221        {
222            self.max_body_bytes = v;
223        }
224        if let Some(v) =
225            get("ARCLY_CACHE_MAX_ENTRIES").and_then(|r| parse("ARCLY_CACHE_MAX_ENTRIES", r))
226        {
227            self.cache_max_entries = v;
228        }
229        if let Some(v) =
230            get("ARCLY_WS_DRAIN_DEADLINE_MS").and_then(|r| parse("ARCLY_WS_DRAIN_DEADLINE_MS", r))
231        {
232            self.ws_drain_deadline = Duration::from_millis(v);
233        }
234        if let Some(v) =
235            get("ARCLY_DRAIN_BUDGET_MS").and_then(|r| parse("ARCLY_DRAIN_BUDGET_MS", r))
236        {
237            self.drain_budget = Duration::from_millis(v);
238        }
239        if let Some(v) =
240            get("ARCLY_WS_OUTBOUND_BUFFER").and_then(|r| parse("ARCLY_WS_OUTBOUND_BUFFER", r))
241        {
242            self.ws_outbound_buffer = v;
243        }
244        if let Some(v) =
245            get("ARCLY_WS_MAX_CONNECTIONS").and_then(|r| parse("ARCLY_WS_MAX_CONNECTIONS", r))
246        {
247            self.ws_max_connections = v;
248        }
249        if let Some(v) =
250            get("ARCLY_WS_PING_INTERVAL_MS").and_then(|r| parse("ARCLY_WS_PING_INTERVAL_MS", r))
251        {
252            self.ws_ping_interval = Duration::from_millis(v);
253        }
254        if let Some(v) =
255            get("ARCLY_WS_IDLE_TIMEOUT_MS").and_then(|r| parse("ARCLY_WS_IDLE_TIMEOUT_MS", r))
256        {
257            self.ws_idle_timeout = Duration::from_millis(v);
258        }
259        if let Some(v) = get("ARCLY_ADAPTIVE_SHED_TARGET_MS")
260            .and_then(|r| parse("ARCLY_ADAPTIVE_SHED_TARGET_MS", r))
261        {
262            self.adaptive_shed_target = Duration::from_millis(v);
263        }
264        if let Some(raw) = get("ARCLY_EXPOSE_DOCS") {
265            match raw.as_str() {
266                "true" | "1" => self.expose_docs = true,
267                "false" | "0" => self.expose_docs = false,
268                _ => tracing::warn!(value = raw, "ignoring unparseable ARCLY_EXPOSE_DOCS"),
269            }
270        }
271        self
272    }
273}
274
275pub struct App;
276
277impl App {
278    pub async fn launch<RootMod: Module>(addr: &str) -> std::io::Result<()> {
279        let info = OpenApiInfo::new("arcly-http service", env!("CARGO_PKG_VERSION"));
280        Self::launch_with_info::<RootMod>(addr, info).await
281    }
282
283    pub async fn launch_named<RootMod: Module>(
284        addr: &str,
285        title: &'static str,
286        version: &'static str,
287    ) -> std::io::Result<()> {
288        let info = OpenApiInfo::new(title, version);
289        Self::launch_with_info::<RootMod>(addr, info).await
290    }
291
292    pub async fn launch_with_info<RootMod: Module>(
293        addr: &str,
294        info: OpenApiInfo,
295    ) -> std::io::Result<()> {
296        Self::launch_with_plugins::<RootMod>(addr, info, Vec::new()).await
297    }
298
299    /// Full launch contract with plugins. See the module docstring for the
300    /// strict phase ordering.
301    pub async fn launch_with_plugins<RootMod: Module>(
302        addr: &str,
303        info: OpenApiInfo,
304        plugins: Vec<Box<dyn ArclyPlugin>>,
305    ) -> std::io::Result<()> {
306        Self::launch_configured::<RootMod>(addr, info, plugins, LaunchConfig::default()).await
307    }
308
309    /// `launch_with_plugins` with explicit [`LaunchConfig`] tunables.
310    pub async fn launch_configured<RootMod: Module>(
311        addr: &str,
312        info: OpenApiInfo,
313        mut plugins: Vec<Box<dyn ArclyPlugin>>,
314        config: LaunchConfig,
315    ) -> std::io::Result<()> {
316        let _root: PhantomData<RootMod> = PhantomData;
317        // ARCLY_* env vars override coded values — retune without a rebuild.
318        let config = config.with_env_overrides();
319        tracing::info!(
320            request_timeout = ?config.request_timeout,
321            max_in_flight = config.max_in_flight,
322            max_body_bytes = config.max_body_bytes,
323            expose_docs = config.expose_docs,
324            "launch config (effective)"
325        );
326
327        // ── 0. Walk the module DAG from RootMod ────────────────────
328        let reachable_modules = collect_reachable_modules(RootMod::descriptor());
329        let allowed_controllers: std::collections::HashSet<&'static str> = reachable_modules
330            .iter()
331            .flat_map(|m| m.controllers.iter().copied())
332            .collect();
333
334        // ── 1. Providers from reachable modules only ───────────────
335        let mut b = DiContainerBuilder::new();
336        for m in &reachable_modules {
337            for p in m.providers {
338                b.add_provider(p);
339            }
340        }
341
342        // ── 2. Plugin on_init — they may queue providers + routes ──
343        let mut plugin_ctx = ArclyPluginContext::new();
344        for p in plugins.iter_mut() {
345            plugin_ctx.current_plugin = p.name();
346            if let Err(e) = p.on_init(&mut plugin_ctx).await {
347                return Err(plugin_io_err(e));
348            }
349        }
350
351        // ── 3. Apply queued provider closures ──────────────────────
352        for f in plugin_ctx.pending_providers.drain(..) {
353            f(&mut b);
354        }
355
356        // ── 4. Freeze the container — `&'static`, lock-free reads ──
357        // The dynamic route table goes in first so services and plugins can
358        // `Inject<DynamicRouteTable>` and mount `/_plugins/*` routes at runtime.
359        b.register(crate::web::dynamic::DynamicRouteTable::new());
360        // Per-app body cap rides the frozen container (lock-free probe) —
361        // a process-global knob would let concurrent apps clobber each other.
362        b.register(crate::web::boundary::BodyLimit(config.max_body_bytes));
363        let container = b.freeze();
364
365        // ── 5. OpenAPI: build (scoped to the module DAG), then mutate ──
366        let mut spec_value = build_spec_filtered(&info, Some(&allowed_controllers));
367        for mutator in plugin_ctx.openapi_mutators.drain(..) {
368            mutator(&mut spec_value);
369        }
370        // Serialize ONCE — /openapi.json then serves a static byte slice
371        // instead of cloning + re-serializing a multi-MB Value per request.
372        let spec_bytes: &'static [u8] = Box::leak(
373            serde_json::to_vec(&spec_value)
374                .unwrap_or_else(|e| {
375                    // Spec is built from our own serde_json::Value — failure
376                    // here is a framework bug; fail at boot, not per-request.
377                    panic!("Arcly: OpenAPI spec serialization failed: {e}")
378                })
379                .into_boxed_slice(),
380        );
381
382        // ── 6. Mount routes (filtered by reachable controller set) ─
383        // Plugin-registered global interceptors wrap every mounted route;
384        // leak the list so route closures can hold a `&'static` slice.
385        let globals: &'static [&'static dyn crate::web::interceptors::Interceptor] =
386            Box::leak(std::mem::take(&mut plugin_ctx.global_interceptors).into_boxed_slice());
387        // Boundary filters run before the body is read, on every entry point.
388        let filters: &'static [&'static dyn crate::web::boundary::BoundaryFilter] =
389            Box::leak(std::mem::take(&mut plugin_ctx.boundary_filters).into_boxed_slice());
390
391        let mut router: axum::Router<&'static crate::core::engine::FrozenDiContainer> =
392            axum::Router::new();
393        let mut mounted: std::collections::HashSet<(&'static str, HttpMethod)> =
394            std::collections::HashSet::new();
395        // Framework-reserved routes, mounted below — plugins may not shadow them.
396        mounted.insert(("/openapi.json", HttpMethod::GET));
397        mounted.insert(("/docs", HttpMethod::GET));
398        for rt in inventory::iter::<&'static RouteDescriptor> {
399            // Empty `controller` = free-fn route → always mount.
400            // Non-empty = must belong to a controller in the reachable DAG.
401            if !rt.controller.is_empty() && !allowed_controllers.contains(rt.controller) {
402                continue;
403            }
404            mounted.insert((rt.path, rt.method));
405            router = router.route(axum_path_static(rt.path), adapt(rt, globals, filters));
406            // NestJS semantics: `#[Get("/")]` on a `/products` controller
407            // serves BOTH `/products/` and `/products` — previously the
408            // bare form 404'd. Skipped when the bare path is taken.
409            if rt.path.len() > 1 && rt.path.ends_with('/') {
410                let trimmed: &'static str =
411                    Box::leak(rt.path[..rt.path.len() - 1].to_owned().into_boxed_str());
412                if mounted.insert((trimmed, rt.method)) {
413                    router = router.route(axum_path_static(trimmed), adapt(rt, globals, filters));
414                }
415            }
416        }
417        let mut app = router.with_state(container);
418        for r in &plugin_ctx.extra_routes {
419            // Surface collisions as a clean error naming the plugin, instead
420            // of letting axum's `route()` panic deep inside launch.
421            if !mounted.insert((r.path, r.method)) {
422                return Err(plugin_io_err(PluginError::new(
423                    r.plugin,
424                    PluginStage::Init,
425                    format!(
426                        "route `{:?} {}` is already mounted by another route or plugin",
427                        r.method, r.path
428                    ),
429                )));
430            }
431            app = app.route(
432                axum_path_static(r.path),
433                build_plugin_route(container, r, globals, filters),
434            );
435        }
436
437        // Runtime-mutable plugin routes: one catch-all, dispatch via ArcSwap.
438        // Interceptors are installed into the table so mounts compose their
439        // chain once, at mount time.
440        container
441            .get::<crate::web::dynamic::DynamicRouteTable>()
442            .set_globals(globals);
443        app = app.route(
444            "/_plugins/{*rest}",
445            crate::web::dynamic::dynamic_dispatch_route(container, filters),
446        );
447
448        // ── 6b. Mount real-time gateways (filtered by reachable module DAG) ─
449        // One process-wide connection registry, leaked to `&'static` and shared
450        // by every gateway upgrade route — sharded, lock-free on the hot path.
451        let allowed_gateways: std::collections::HashSet<&'static str> = reachable_modules
452            .iter()
453            .flat_map(|m| m.gateways.iter().copied())
454            .collect();
455        let registry: &'static ConnectionRegistry = Box::leak(Box::new(ConnectionRegistry::new()));
456        let ws_tuning = crate::realtime::ws::WsTuning {
457            outbound_buffer: config.ws_outbound_buffer,
458            max_connections: config.ws_max_connections,
459            ping_interval: config.ws_ping_interval,
460        };
461        // Idle sweeper: dead TCP links (NAT drops) never send Close — reap
462        // sockets with no inbound activity past the timeout. Pings above
463        // keep healthy-but-quiet clients alive via pongs.
464        if !config.ws_idle_timeout.is_zero() {
465            let idle = config.ws_idle_timeout;
466            tokio::spawn(async move {
467                let mut tick = tokio::time::interval(idle / 2);
468                tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
469                loop {
470                    tick.tick().await;
471                    let reaped = registry.sweep_idle(idle.as_secs());
472                    if !reaped.is_empty() {
473                        tracing::info!(count = reaped.len(), "reaped idle WebSocket connections");
474                    }
475                }
476            });
477        }
478        for gd in inventory::iter::<&'static GatewayDescriptor> {
479            if !allowed_gateways.contains(gd.name) {
480                continue;
481            }
482            let runtime = (gd.build)(container);
483            app = app.route(
484                axum_path_static(gd.path),
485                ws_route(runtime, registry, container, ws_tuning),
486            );
487        }
488
489        // Docs surface is opt-out (`expose_docs` / ARCLY_EXPOSE_DOCS) for
490        // hardened deployments. The paths stay reserved either way so a
491        // plugin can't squat them when docs are off.
492        if config.expose_docs {
493            app = app
494                .route(
495                    "/openapi.json",
496                    get(move || async move {
497                        (
498                            [(axum::http::header::CONTENT_TYPE, "application/json")],
499                            spec_bytes,
500                        )
501                            .into_response()
502                    }),
503                )
504                .route(
505                    "/docs",
506                    get(|| async { Html(SWAGGER_UI_HTML).into_response() }),
507                );
508        }
509        let mut app = app.layer(axum::middleware::from_fn(
510            crate::security::apply_security_headers,
511        ));
512
513        // ── 6c. Governance layers (outermost) ──────────────────────
514        // CORS (when configured), then the governor: request-id, global
515        // deadline, in-flight admission control. Layer order: the governor
516        // is added last → wraps everything, including CORS preflights.
517        if let Some(cors_cfg) = config.cors.clone() {
518            let cors_cfg: &'static crate::web::cors::CorsConfig = Box::leak(Box::new(cors_cfg));
519            app = app.layer(axum::middleware::from_fn(
520                move |req: axum::extract::Request, next: axum::middleware::Next| {
521                    crate::web::cors::apply_cors(cors_cfg, req, next)
522                },
523            ));
524        }
525        let gov = crate::web::governor::Governor::new(
526            config.request_timeout,
527            config.max_in_flight,
528            config.adaptive_shed_target,
529        );
530        let app = app.layer(axum::middleware::from_fn(
531            move |req: axum::extract::Request, next: axum::middleware::Next| {
532                crate::web::governor::govern(Arc::clone(&gov), req, next)
533            },
534        ));
535
536        // ── 6d. Resource governance knobs + cache sweeper ──────────
537        crate::web::cache::set_capacity(config.cache_max_entries);
538        crate::web::cache::spawn_sweeper(config.cache_sweep_interval);
539
540        // ── 7. Bind ────────────────────────────────────────────────
541        let listener = tokio::net::TcpListener::bind(addr).await?;
542
543        // ── 8. Plugin on_start — bg tasks spawn here ───────────────
544        //
545        // If a plugin fails, roll back already-started plugins in reverse order
546        // before propagating the error — prevents orphaned background tasks.
547        // Each rollback `on_shutdown` gets the same 5-second per-plugin budget
548        // used by the post-serve drain loop: a wedged plugin must not hang the
549        // process or starve the remaining rollbacks.
550        let plugins_arc: Arc<Vec<Box<dyn ArclyPlugin>>> = Arc::new(plugins);
551        let mut started = 0usize;
552        #[allow(clippy::explicit_counter_loop)] // counter outlives the loop for the error message
553        for p in plugins_arc.iter() {
554            if let Err(e) = p.on_start(container).await {
555                for already in plugins_arc[..started].iter().rev() {
556                    drain_plugin(already.as_ref(), container, "rollback", config.drain_budget)
557                        .await;
558                }
559                return Err(plugin_io_err(e));
560            }
561            started += 1;
562        }
563
564        // ── 9. Serve with two-phase graceful shutdown ──────────────
565        //
566        // Phase A: SIGTERM/Ctrl-C → axum stops accepting + drains in-flight.
567        // Phase B: AFTER axum has fully drained, run plugin `on_shutdown`s
568        //          in reverse declaration order, wrapped in a 5s timeout.
569        let plugins_for_draining = Arc::clone(&plugins_arc);
570        let draining_budget = config.drain_budget;
571        let ws_deadline = config.ws_drain_deadline;
572        let serve = axum::serve(listener, app).with_graceful_shutdown(async move {
573            shutdown_signal().await;
574            // First thing: flip readiness so the LB stops routing new
575            // traffic to this pod while in-flight requests drain.
576            crate::observability::health::set_draining(true);
577            tracing::info!("shutdown signal received — HTTP draining");
578            // Live WebSockets would hold the drain open until clients leave
579            // (i.e. forever) — give them `ws_drain_deadline`, then close.
580            tokio::spawn(async move {
581                tokio::time::sleep(ws_deadline).await;
582                let closed = registry.close_all();
583                if closed > 0 {
584                    tracing::warn!(closed, "WS drain deadline reached — closed live sockets");
585                }
586            });
587            // Notify plugins concurrently with the HTTP drain (spawned so the
588            // listener closes immediately): stop consuming MQ/scheduler work
589            // while in-flight HTTP requests finish. Cleanup stays in
590            // `on_shutdown`, which runs only after the drain completes.
591            tokio::spawn(async move {
592                for p in plugins_for_draining.iter() {
593                    match tokio::time::timeout(draining_budget, p.on_draining(container)).await {
594                        Ok(Ok(())) => {}
595                        Ok(Err(e)) => {
596                            tracing::error!(plugin = p.name(), error = %e, "plugin draining error")
597                        }
598                        Err(_) => tracing::warn!(
599                            plugin = p.name(),
600                            budget = ?draining_budget,
601                            "plugin on_draining exceeded budget"
602                        ),
603                    }
604                }
605            });
606        });
607        let serve_res = serve.await;
608
609        // HTTP server has now fully stopped. Safe to drain plugins.
610        tracing::info!(
611            budget = ?config.drain_budget,
612            "HTTP fully drained — running plugin on_shutdown (per-plugin budget)"
613        );
614        for p in plugins_arc.iter().rev() {
615            drain_plugin(p.as_ref(), container, "shutdown", config.drain_budget).await;
616        }
617        serve_res
618    }
619}
620
621/// Walk the module `imports` DAG breadth-first from the root, deduplicating
622/// by descriptor pointer identity. Returns descriptors in a stable, root-first
623/// traversal order.
624/// Translate arcly's NestJS-style path syntax (`/users/:id`, `/files/*rest`)
625/// into axum 0.8 matcher syntax (`/users/{id}`, `/files/{*rest}`).
626/// Route descriptors keep the user syntax (OpenAPI + metrics labels read
627/// it); only the string handed to the axum router is translated, leaked
628/// once per route at boot.
629fn axum_path(path: &str) -> String {
630    path.split('/')
631        .map(|seg| {
632            if let Some(name) = seg.strip_prefix(':') {
633                format!("{{{name}}}")
634            } else if let Some(name) = seg.strip_prefix('*') {
635                format!("{{*{name}}}")
636            } else {
637                seg.to_owned()
638            }
639        })
640        .collect::<Vec<_>>()
641        .join("/")
642}
643
644fn axum_path_static(path: &str) -> &'static str {
645    Box::leak(axum_path(path).into_boxed_str())
646}
647
648fn collect_reachable_modules(root: &'static ModuleDescriptor) -> Vec<&'static ModuleDescriptor> {
649    use std::collections::HashSet;
650    let mut visited: HashSet<*const ModuleDescriptor> = HashSet::new();
651    let mut queue: std::collections::VecDeque<&'static ModuleDescriptor> =
652        std::collections::VecDeque::new();
653    let mut order: Vec<&'static ModuleDescriptor> = Vec::new();
654    queue.push_back(root);
655    while let Some(m) = queue.pop_front() {
656        if !visited.insert(m as *const _) {
657            continue;
658        }
659        order.push(m);
660        for getter in m.imports {
661            queue.push_back(getter());
662        }
663    }
664    order
665}
666
667/// Wait for either SIGINT (Ctrl-C) or SIGTERM (process supervisor / K8s / Docker).
668///
669/// On Windows only SIGINT is available; the cfg guard keeps the unix-specific
670/// import out of non-unix builds.
671#[cfg(unix)]
672async fn shutdown_signal() {
673    use tokio::signal::unix::{signal, SignalKind};
674    match signal(SignalKind::terminate()) {
675        Ok(mut sigterm) => {
676            tokio::select! {
677                _ = tokio::signal::ctrl_c() => {}
678                _ = sigterm.recv() => {}
679            }
680        }
681        Err(e) => {
682            tracing::warn!(error = %e, "SIGTERM handler unavailable, falling back to SIGINT only");
683            let _ = tokio::signal::ctrl_c().await;
684        }
685    }
686}
687
688#[cfg(not(unix))]
689async fn shutdown_signal() {
690    let _ = tokio::signal::ctrl_c().await;
691}
692
693/// Run one plugin's `on_shutdown` under a per-plugin timeout, logging (never
694/// propagating) errors and timeouts. Used for both start-failure rollback
695/// and post-serve drain so the two paths can't drift apart.
696async fn drain_plugin(
697    p: &dyn ArclyPlugin,
698    container: &'static crate::core::engine::FrozenDiContainer,
699    phase: &str,
700    budget: Duration,
701) {
702    match tokio::time::timeout(budget, p.on_shutdown(container)).await {
703        Ok(Ok(())) => {}
704        Ok(Err(e)) => {
705            tracing::error!(plugin = p.name(), phase, error = %e, "plugin shutdown error")
706        }
707        Err(_) => tracing::warn!(
708            plugin = p.name(),
709            phase,
710            budget = ?budget,
711            "plugin shutdown exceeded budget — skipped"
712        ),
713    }
714}
715
716fn plugin_io_err(e: PluginError) -> std::io::Error {
717    let kind = match e.stage {
718        PluginStage::Init => std::io::ErrorKind::InvalidInput,
719        PluginStage::Start => std::io::ErrorKind::ConnectionRefused,
720        PluginStage::Shutdown => std::io::ErrorKind::Other,
721    };
722    std::io::Error::new(kind, e)
723}
724
725#[cfg(test)]
726mod tests {
727    use super::*;
728
729    #[test]
730    fn env_overrides_apply_and_ignore_garbage() {
731        let cfg = LaunchConfig::default().apply_overrides(|k| match k {
732            "ARCLY_REQUEST_TIMEOUT_MS" => Some("1500".into()),
733            "ARCLY_MAX_IN_FLIGHT" => Some("notanumber".into()), // ignored, keeps default
734            "ARCLY_MAX_BODY_BYTES" => Some("1024".into()),
735            "ARCLY_EXPOSE_DOCS" => Some("false".into()),
736            _ => None,
737        });
738        assert_eq!(cfg.request_timeout, Duration::from_millis(1500));
739        assert_eq!(cfg.max_in_flight, 0, "unparseable override is ignored");
740        assert_eq!(cfg.max_body_bytes, 1024);
741        assert!(!cfg.expose_docs);
742        // Untouched fields keep coded defaults.
743        assert_eq!(cfg.drain_budget, Duration::from_secs(5));
744    }
745}