Skip to main content

arcly_http/
app.rs

1//! Launch contract.
2//!
3//! Boot phases (executed strictly in this order — order matters):
4//!
5//! 1. Collect inventory-registered providers + routes.
6//! 2. Run each plugin's `on_init` with a *mutable* `ArclyPluginContext` —
7//!    plugins may queue providers (`provide<T>`), routes, global
8//!    interceptors, OpenAPI mutators.
9//! 3. Apply queued provider closures to the `DiContainerBuilder`.
10//! 4. **Freeze.** The container becomes `&'static`, lock-free for reads.
11//! 5. Build the OpenAPI spec, run plugin spec-mutators, leak it as `&'static`.
12//! 6. Mount macro-registered routes, then plugin-registered routes.
13//! 7. Bind the listener.
14//! 8. Run each plugin's `on_start(&container)` — background tasks spawn here.
15//! 9. Serve. Ctrl-C / SIGTERM triggers axum's graceful shutdown — accepts
16//!    stop, in-flight drain. **Only after** that completes do plugin
17//!    `on_shutdown(&container)` calls run, each wrapped in a 5-second
18//!    per-plugin timeout so a wedged plugin can never wedge the process.
19
20use std::marker::PhantomData;
21use std::sync::Arc;
22use std::time::Duration;
23
24use axum::response::{Html, IntoResponse};
25use axum::routing::get;
26
27use crate::core::engine::{
28    DiContainerBuilder, HttpMethod, Module, ModuleDescriptor, RouteDescriptor,
29};
30use crate::core::plugins::{
31    build_plugin_route, ArclyPlugin, ArclyPluginContext, PluginError, PluginStage,
32};
33use crate::openapi::{build_spec_filtered, OpenApiInfo, SWAGGER_UI_HTML};
34use crate::realtime::gateway::GatewayDescriptor;
35use crate::realtime::{ws_route, ConnectionRegistry};
36use crate::web::boundary::adapt;
37
38/// Tunables for the launch contract. Start from `Default` and adjust:
39///
40/// ```ignore
41/// App::launch_configured::<AppModule>(addr, info, plugins, LaunchConfig {
42///     request_timeout: Duration::from_secs(10),
43///     max_in_flight: 4_096,
44///     cors: Some(CorsConfig::for_origins(["https://app.example.com"])),
45///     ..Default::default()
46/// }).await
47/// ```
48#[derive(Clone, Debug)]
49pub struct LaunchConfig {
50    /// Per-plugin budget for `on_shutdown` / start-failure rollback (and the
51    /// concurrent `on_draining` notification). A plugin exceeding it is
52    /// skipped with a logged warning — it can never wedge the process.
53    /// Default `5s`.
54    pub drain_budget: Duration,
55    /// Process-wide request deadline. Routes without `#[Timeout]` are
56    /// cancelled (worker freed) and answered `504` past this. `ZERO`
57    /// disables. Default `30s`.
58    pub request_timeout: Duration,
59    /// Hard cap on concurrent in-flight requests — beyond it requests are
60    /// shed with `503` + `Retry-After` before any body is read (one atomic
61    /// counter, no locks). `0` = unlimited. Default `0`.
62    pub max_in_flight: usize,
63    /// Request body cap for every entry point. Default 8 MiB.
64    pub max_body_bytes: usize,
65    /// Ceiling on `#[CacheTTL]` store entries — the key includes the query
66    /// string, so an unbounded store is a memory-DoS vector. Default 10 000.
67    pub cache_max_entries: usize,
68    /// How often the cache sweeper reclaims expired entries. Default `30s`.
69    pub cache_sweep_interval: Duration,
70    /// After a shutdown signal, WebSocket clients get this long to finish
71    /// before the server sends Close frames — otherwise live sockets keep
72    /// the HTTP drain (and therefore plugin `on_shutdown`) waiting until
73    /// the supervisor SIGKILLs. Default `10s`.
74    pub ws_drain_deadline: Duration,
75    /// CORS policy. `None` (default) mounts no CORS layer at all.
76    pub cors: Option<crate::web::cors::CorsConfig>,
77    /// Mount `/docs` (Swagger UI) and `/openapi.json`. Default `true`;
78    /// disable in hardened deployments that publish the spec elsewhere.
79    pub expose_docs: bool,
80    /// Per-socket outbound queue depth — the slow-client memory ceiling; a
81    /// client that can't drain it is evicted. Default `256`.
82    pub ws_outbound_buffer: usize,
83    /// Hard cap on concurrent WebSocket connections across all gateways;
84    /// beyond it upgrades get `503` before any socket exists. `0` =
85    /// unlimited. Default `0`.
86    pub ws_max_connections: usize,
87    /// Server→client Ping cadence; pongs feed the idle sweeper. `ZERO`
88    /// disables. Default `20s`.
89    pub ws_ping_interval: Duration,
90    /// Reap sockets with no inbound activity for this long — dead TCP links
91    /// (NAT drops) never send Close and would linger forever. `ZERO`
92    /// disables. Default `60s`.
93    pub ws_idle_timeout: Duration,
94}
95
96impl Default for LaunchConfig {
97    fn default() -> Self {
98        Self {
99            drain_budget: Duration::from_secs(5),
100            request_timeout: Duration::from_secs(30),
101            max_in_flight: 0,
102            max_body_bytes: 8 * 1024 * 1024,
103            cache_max_entries: 10_000,
104            cache_sweep_interval: Duration::from_secs(30),
105            ws_drain_deadline: Duration::from_secs(10),
106            cors: None,
107            expose_docs: true,
108            ws_outbound_buffer: 256,
109            ws_max_connections: 0,
110            ws_ping_interval: Duration::from_secs(20),
111            ws_idle_timeout: Duration::from_secs(60),
112        }
113    }
114}
115
116impl LaunchConfig {
117    /// Apply `ARCLY_*` environment overrides on top of the coded values —
118    /// applied automatically by the launch path so operators can retune a
119    /// deployment (incident response, load tests) without a rebuild:
120    ///
121    /// | Variable | Field |
122    /// |---|---|
123    /// | `ARCLY_REQUEST_TIMEOUT_MS` | `request_timeout` (`0` disables) |
124    /// | `ARCLY_MAX_IN_FLIGHT` | `max_in_flight` (`0` = unlimited) |
125    /// | `ARCLY_MAX_BODY_BYTES` | `max_body_bytes` |
126    /// | `ARCLY_CACHE_MAX_ENTRIES` | `cache_max_entries` |
127    /// | `ARCLY_WS_DRAIN_DEADLINE_MS` | `ws_drain_deadline` |
128    /// | `ARCLY_DRAIN_BUDGET_MS` | `drain_budget` |
129    /// | `ARCLY_EXPOSE_DOCS` | `expose_docs` (`true`/`false`/`1`/`0`) |
130    ///
131    /// Unparseable values are ignored with a warning — a typo must never
132    /// change behaviour silently or stop the boot.
133    pub fn with_env_overrides(self) -> Self {
134        self.apply_overrides(|k| std::env::var(k).ok())
135    }
136
137    /// Testable core of [`with_env_overrides`](Self::with_env_overrides).
138    pub(crate) fn apply_overrides(mut self, get: impl Fn(&str) -> Option<String>) -> Self {
139        fn parse<T: std::str::FromStr>(key: &str, raw: String) -> Option<T> {
140            match raw.parse() {
141                Ok(v) => Some(v),
142                Err(_) => {
143                    tracing::warn!(key, value = raw, "ignoring unparseable ARCLY_* override");
144                    None
145                }
146            }
147        }
148        if let Some(v) =
149            get("ARCLY_REQUEST_TIMEOUT_MS").and_then(|r| parse("ARCLY_REQUEST_TIMEOUT_MS", r))
150        {
151            self.request_timeout = Duration::from_millis(v);
152        }
153        if let Some(v) = get("ARCLY_MAX_IN_FLIGHT").and_then(|r| parse("ARCLY_MAX_IN_FLIGHT", r)) {
154            self.max_in_flight = v;
155        }
156        if let Some(v) = get("ARCLY_MAX_BODY_BYTES").and_then(|r| parse("ARCLY_MAX_BODY_BYTES", r))
157        {
158            self.max_body_bytes = v;
159        }
160        if let Some(v) =
161            get("ARCLY_CACHE_MAX_ENTRIES").and_then(|r| parse("ARCLY_CACHE_MAX_ENTRIES", r))
162        {
163            self.cache_max_entries = v;
164        }
165        if let Some(v) =
166            get("ARCLY_WS_DRAIN_DEADLINE_MS").and_then(|r| parse("ARCLY_WS_DRAIN_DEADLINE_MS", r))
167        {
168            self.ws_drain_deadline = Duration::from_millis(v);
169        }
170        if let Some(v) =
171            get("ARCLY_DRAIN_BUDGET_MS").and_then(|r| parse("ARCLY_DRAIN_BUDGET_MS", r))
172        {
173            self.drain_budget = Duration::from_millis(v);
174        }
175        if let Some(v) =
176            get("ARCLY_WS_OUTBOUND_BUFFER").and_then(|r| parse("ARCLY_WS_OUTBOUND_BUFFER", r))
177        {
178            self.ws_outbound_buffer = v;
179        }
180        if let Some(v) =
181            get("ARCLY_WS_MAX_CONNECTIONS").and_then(|r| parse("ARCLY_WS_MAX_CONNECTIONS", r))
182        {
183            self.ws_max_connections = v;
184        }
185        if let Some(v) =
186            get("ARCLY_WS_PING_INTERVAL_MS").and_then(|r| parse("ARCLY_WS_PING_INTERVAL_MS", r))
187        {
188            self.ws_ping_interval = Duration::from_millis(v);
189        }
190        if let Some(v) =
191            get("ARCLY_WS_IDLE_TIMEOUT_MS").and_then(|r| parse("ARCLY_WS_IDLE_TIMEOUT_MS", r))
192        {
193            self.ws_idle_timeout = Duration::from_millis(v);
194        }
195        if let Some(raw) = get("ARCLY_EXPOSE_DOCS") {
196            match raw.as_str() {
197                "true" | "1" => self.expose_docs = true,
198                "false" | "0" => self.expose_docs = false,
199                _ => tracing::warn!(value = raw, "ignoring unparseable ARCLY_EXPOSE_DOCS"),
200            }
201        }
202        self
203    }
204}
205
206pub struct App;
207
208impl App {
209    pub async fn launch<RootMod: Module>(addr: &str) -> std::io::Result<()> {
210        let info = OpenApiInfo {
211            title: "arcly-http service",
212            version: env!("CARGO_PKG_VERSION"),
213            ..Default::default()
214        };
215        Self::launch_with_info::<RootMod>(addr, info).await
216    }
217
218    pub async fn launch_named<RootMod: Module>(
219        addr: &str,
220        title: &'static str,
221        version: &'static str,
222    ) -> std::io::Result<()> {
223        let info = OpenApiInfo {
224            title,
225            version,
226            ..Default::default()
227        };
228        Self::launch_with_info::<RootMod>(addr, info).await
229    }
230
231    pub async fn launch_with_info<RootMod: Module>(
232        addr: &str,
233        info: OpenApiInfo,
234    ) -> std::io::Result<()> {
235        Self::launch_with_plugins::<RootMod>(addr, info, Vec::new()).await
236    }
237
238    /// Full launch contract with plugins. See the module docstring for the
239    /// strict phase ordering.
240    pub async fn launch_with_plugins<RootMod: Module>(
241        addr: &str,
242        info: OpenApiInfo,
243        plugins: Vec<Box<dyn ArclyPlugin>>,
244    ) -> std::io::Result<()> {
245        Self::launch_configured::<RootMod>(addr, info, plugins, LaunchConfig::default()).await
246    }
247
248    /// `launch_with_plugins` with explicit [`LaunchConfig`] tunables.
249    pub async fn launch_configured<RootMod: Module>(
250        addr: &str,
251        info: OpenApiInfo,
252        mut plugins: Vec<Box<dyn ArclyPlugin>>,
253        config: LaunchConfig,
254    ) -> std::io::Result<()> {
255        let _root: PhantomData<RootMod> = PhantomData;
256        // ARCLY_* env vars override coded values — retune without a rebuild.
257        let config = config.with_env_overrides();
258        tracing::info!(
259            request_timeout = ?config.request_timeout,
260            max_in_flight = config.max_in_flight,
261            max_body_bytes = config.max_body_bytes,
262            expose_docs = config.expose_docs,
263            "launch config (effective)"
264        );
265
266        // ── 0. Walk the module DAG from RootMod ────────────────────
267        let reachable_modules = collect_reachable_modules(RootMod::descriptor());
268        let allowed_controllers: std::collections::HashSet<&'static str> = reachable_modules
269            .iter()
270            .flat_map(|m| m.controllers.iter().copied())
271            .collect();
272
273        // ── 1. Providers from reachable modules only ───────────────
274        let mut b = DiContainerBuilder::new();
275        for m in &reachable_modules {
276            for p in m.providers {
277                b.add_provider(p);
278            }
279        }
280
281        // ── 2. Plugin on_init — they may queue providers + routes ──
282        let mut plugin_ctx = ArclyPluginContext::new();
283        for p in plugins.iter_mut() {
284            plugin_ctx.current_plugin = p.name();
285            if let Err(e) = p.on_init(&mut plugin_ctx).await {
286                return Err(plugin_io_err(e));
287            }
288        }
289
290        // ── 3. Apply queued provider closures ──────────────────────
291        for f in plugin_ctx.pending_providers.drain(..) {
292            f(&mut b);
293        }
294
295        // ── 4. Freeze the container — `&'static`, lock-free reads ──
296        // The dynamic route table goes in first so services and plugins can
297        // `Inject<DynamicRouteTable>` and mount `/_plugins/*` routes at runtime.
298        b.register(crate::web::dynamic::DynamicRouteTable::new());
299        // Per-app body cap rides the frozen container (lock-free probe) —
300        // a process-global knob would let concurrent apps clobber each other.
301        b.register(crate::web::boundary::BodyLimit(config.max_body_bytes));
302        let container = b.freeze();
303
304        // ── 5. OpenAPI: build (scoped to the module DAG), then mutate ──
305        let mut spec_value = build_spec_filtered(&info, Some(&allowed_controllers));
306        for mutator in plugin_ctx.openapi_mutators.drain(..) {
307            mutator(&mut spec_value);
308        }
309        // Serialize ONCE — /openapi.json then serves a static byte slice
310        // instead of cloning + re-serializing a multi-MB Value per request.
311        let spec_bytes: &'static [u8] = Box::leak(
312            serde_json::to_vec(&spec_value)
313                .unwrap_or_else(|e| {
314                    // Spec is built from our own serde_json::Value — failure
315                    // here is a framework bug; fail at boot, not per-request.
316                    panic!("Arcly: OpenAPI spec serialization failed: {e}")
317                })
318                .into_boxed_slice(),
319        );
320
321        // ── 6. Mount routes (filtered by reachable controller set) ─
322        // Plugin-registered global interceptors wrap every mounted route;
323        // leak the list so route closures can hold a `&'static` slice.
324        let globals: &'static [&'static dyn crate::web::interceptors::Interceptor] =
325            Box::leak(std::mem::take(&mut plugin_ctx.global_interceptors).into_boxed_slice());
326        // Boundary filters run before the body is read, on every entry point.
327        let filters: &'static [&'static dyn crate::web::boundary::BoundaryFilter] =
328            Box::leak(std::mem::take(&mut plugin_ctx.boundary_filters).into_boxed_slice());
329
330        let mut router: axum::Router<&'static crate::core::engine::FrozenDiContainer> =
331            axum::Router::new();
332        let mut mounted: std::collections::HashSet<(&'static str, HttpMethod)> =
333            std::collections::HashSet::new();
334        // Framework-reserved routes, mounted below — plugins may not shadow them.
335        mounted.insert(("/openapi.json", HttpMethod::GET));
336        mounted.insert(("/docs", HttpMethod::GET));
337        for rt in inventory::iter::<&'static RouteDescriptor> {
338            // Empty `controller` = free-fn route → always mount.
339            // Non-empty = must belong to a controller in the reachable DAG.
340            if !rt.controller.is_empty() && !allowed_controllers.contains(rt.controller) {
341                continue;
342            }
343            mounted.insert((rt.path, rt.method));
344            router = router.route(rt.path, adapt(rt, globals, filters));
345        }
346        let mut app = router.with_state(container);
347        for r in &plugin_ctx.extra_routes {
348            // Surface collisions as a clean error naming the plugin, instead
349            // of letting axum's `route()` panic deep inside launch.
350            if !mounted.insert((r.path, r.method)) {
351                return Err(plugin_io_err(PluginError::new(
352                    r.plugin,
353                    PluginStage::Init,
354                    format!(
355                        "route `{:?} {}` is already mounted by another route or plugin",
356                        r.method, r.path
357                    ),
358                )));
359            }
360            app = app.route(r.path, build_plugin_route(container, r, globals, filters));
361        }
362
363        // Runtime-mutable plugin routes: one catch-all, dispatch via ArcSwap.
364        app = app.route(
365            "/_plugins/*rest",
366            crate::web::dynamic::dynamic_dispatch_route(container, globals, filters),
367        );
368
369        // ── 6b. Mount real-time gateways (filtered by reachable module DAG) ─
370        // One process-wide connection registry, leaked to `&'static` and shared
371        // by every gateway upgrade route — sharded, lock-free on the hot path.
372        let allowed_gateways: std::collections::HashSet<&'static str> = reachable_modules
373            .iter()
374            .flat_map(|m| m.gateways.iter().copied())
375            .collect();
376        let registry: &'static ConnectionRegistry = Box::leak(Box::new(ConnectionRegistry::new()));
377        let ws_tuning = crate::realtime::ws::WsTuning {
378            outbound_buffer: config.ws_outbound_buffer,
379            max_connections: config.ws_max_connections,
380            ping_interval: config.ws_ping_interval,
381        };
382        // Idle sweeper: dead TCP links (NAT drops) never send Close — reap
383        // sockets with no inbound activity past the timeout. Pings above
384        // keep healthy-but-quiet clients alive via pongs.
385        if !config.ws_idle_timeout.is_zero() {
386            let idle = config.ws_idle_timeout;
387            tokio::spawn(async move {
388                let mut tick = tokio::time::interval(idle / 2);
389                tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
390                loop {
391                    tick.tick().await;
392                    let reaped = registry.sweep_idle(idle.as_secs());
393                    if !reaped.is_empty() {
394                        tracing::info!(count = reaped.len(), "reaped idle WebSocket connections");
395                    }
396                }
397            });
398        }
399        for gd in inventory::iter::<&'static GatewayDescriptor> {
400            if !allowed_gateways.contains(gd.name) {
401                continue;
402            }
403            let runtime = (gd.build)(container);
404            app = app.route(gd.path, ws_route(runtime, registry, container, ws_tuning));
405        }
406
407        // Docs surface is opt-out (`expose_docs` / ARCLY_EXPOSE_DOCS) for
408        // hardened deployments. The paths stay reserved either way so a
409        // plugin can't squat them when docs are off.
410        if config.expose_docs {
411            app = app
412                .route(
413                    "/openapi.json",
414                    get(move || async move {
415                        (
416                            [(axum::http::header::CONTENT_TYPE, "application/json")],
417                            spec_bytes,
418                        )
419                            .into_response()
420                    }),
421                )
422                .route(
423                    "/docs",
424                    get(|| async { Html(SWAGGER_UI_HTML).into_response() }),
425                );
426        }
427        let mut app = app.layer(axum::middleware::from_fn(
428            crate::security::apply_security_headers,
429        ));
430
431        // ── 6c. Governance layers (outermost) ──────────────────────
432        // CORS (when configured), then the governor: request-id, global
433        // deadline, in-flight admission control. Layer order: the governor
434        // is added last → wraps everything, including CORS preflights.
435        if let Some(cors_cfg) = config.cors.clone() {
436            let cors_cfg: &'static crate::web::cors::CorsConfig = Box::leak(Box::new(cors_cfg));
437            app = app.layer(axum::middleware::from_fn(
438                move |req: axum::extract::Request, next: axum::middleware::Next| {
439                    crate::web::cors::apply_cors(cors_cfg, req, next)
440                },
441            ));
442        }
443        let gov = crate::web::governor::Governor::new(config.request_timeout, config.max_in_flight);
444        let app = app.layer(axum::middleware::from_fn(
445            move |req: axum::extract::Request, next: axum::middleware::Next| {
446                crate::web::governor::govern(Arc::clone(&gov), req, next)
447            },
448        ));
449
450        // ── 6d. Resource governance knobs + cache sweeper ──────────
451        crate::web::cache::set_capacity(config.cache_max_entries);
452        crate::web::cache::spawn_sweeper(config.cache_sweep_interval);
453
454        // ── 7. Bind ────────────────────────────────────────────────
455        let listener = tokio::net::TcpListener::bind(addr).await?;
456
457        // ── 8. Plugin on_start — bg tasks spawn here ───────────────
458        //
459        // If a plugin fails, roll back already-started plugins in reverse order
460        // before propagating the error — prevents orphaned background tasks.
461        // Each rollback `on_shutdown` gets the same 5-second per-plugin budget
462        // used by the post-serve drain loop: a wedged plugin must not hang the
463        // process or starve the remaining rollbacks.
464        let plugins_arc: Arc<Vec<Box<dyn ArclyPlugin>>> = Arc::new(plugins);
465        let mut started = 0usize;
466        #[allow(clippy::explicit_counter_loop)] // counter outlives the loop for the error message
467        for p in plugins_arc.iter() {
468            if let Err(e) = p.on_start(container).await {
469                for already in plugins_arc[..started].iter().rev() {
470                    drain_plugin(already.as_ref(), container, "rollback", config.drain_budget)
471                        .await;
472                }
473                return Err(plugin_io_err(e));
474            }
475            started += 1;
476        }
477
478        // ── 9. Serve with two-phase graceful shutdown ──────────────
479        //
480        // Phase A: SIGTERM/Ctrl-C → axum stops accepting + drains in-flight.
481        // Phase B: AFTER axum has fully drained, run plugin `on_shutdown`s
482        //          in reverse declaration order, wrapped in a 5s timeout.
483        let plugins_for_draining = Arc::clone(&plugins_arc);
484        let draining_budget = config.drain_budget;
485        let ws_deadline = config.ws_drain_deadline;
486        let serve = axum::serve(listener, app).with_graceful_shutdown(async move {
487            shutdown_signal().await;
488            // First thing: flip readiness so the LB stops routing new
489            // traffic to this pod while in-flight requests drain.
490            crate::observability::health::set_draining(true);
491            tracing::info!("shutdown signal received — HTTP draining");
492            // Live WebSockets would hold the drain open until clients leave
493            // (i.e. forever) — give them `ws_drain_deadline`, then close.
494            tokio::spawn(async move {
495                tokio::time::sleep(ws_deadline).await;
496                let closed = registry.close_all();
497                if closed > 0 {
498                    tracing::warn!(closed, "WS drain deadline reached — closed live sockets");
499                }
500            });
501            // Notify plugins concurrently with the HTTP drain (spawned so the
502            // listener closes immediately): stop consuming MQ/scheduler work
503            // while in-flight HTTP requests finish. Cleanup stays in
504            // `on_shutdown`, which runs only after the drain completes.
505            tokio::spawn(async move {
506                for p in plugins_for_draining.iter() {
507                    match tokio::time::timeout(draining_budget, p.on_draining(container)).await {
508                        Ok(Ok(())) => {}
509                        Ok(Err(e)) => {
510                            tracing::error!(plugin = p.name(), error = %e, "plugin draining error")
511                        }
512                        Err(_) => tracing::warn!(
513                            plugin = p.name(),
514                            budget = ?draining_budget,
515                            "plugin on_draining exceeded budget"
516                        ),
517                    }
518                }
519            });
520        });
521        let serve_res = serve.await;
522
523        // HTTP server has now fully stopped. Safe to drain plugins.
524        tracing::info!(
525            budget = ?config.drain_budget,
526            "HTTP fully drained — running plugin on_shutdown (per-plugin budget)"
527        );
528        for p in plugins_arc.iter().rev() {
529            drain_plugin(p.as_ref(), container, "shutdown", config.drain_budget).await;
530        }
531        serve_res
532    }
533}
534
535/// Walk the module `imports` DAG breadth-first from the root, deduplicating
536/// by descriptor pointer identity. Returns descriptors in a stable, root-first
537/// traversal order.
538fn collect_reachable_modules(root: &'static ModuleDescriptor) -> Vec<&'static ModuleDescriptor> {
539    use std::collections::HashSet;
540    let mut visited: HashSet<*const ModuleDescriptor> = HashSet::new();
541    let mut queue: std::collections::VecDeque<&'static ModuleDescriptor> =
542        std::collections::VecDeque::new();
543    let mut order: Vec<&'static ModuleDescriptor> = Vec::new();
544    queue.push_back(root);
545    while let Some(m) = queue.pop_front() {
546        if !visited.insert(m as *const _) {
547            continue;
548        }
549        order.push(m);
550        for getter in m.imports {
551            queue.push_back(getter());
552        }
553    }
554    order
555}
556
557/// Wait for either SIGINT (Ctrl-C) or SIGTERM (process supervisor / K8s / Docker).
558///
559/// On Windows only SIGINT is available; the cfg guard keeps the unix-specific
560/// import out of non-unix builds.
561#[cfg(unix)]
562async fn shutdown_signal() {
563    use tokio::signal::unix::{signal, SignalKind};
564    match signal(SignalKind::terminate()) {
565        Ok(mut sigterm) => {
566            tokio::select! {
567                _ = tokio::signal::ctrl_c() => {}
568                _ = sigterm.recv() => {}
569            }
570        }
571        Err(e) => {
572            tracing::warn!(error = %e, "SIGTERM handler unavailable, falling back to SIGINT only");
573            let _ = tokio::signal::ctrl_c().await;
574        }
575    }
576}
577
578#[cfg(not(unix))]
579async fn shutdown_signal() {
580    let _ = tokio::signal::ctrl_c().await;
581}
582
583/// Run one plugin's `on_shutdown` under a per-plugin timeout, logging (never
584/// propagating) errors and timeouts. Used for both start-failure rollback
585/// and post-serve drain so the two paths can't drift apart.
586async fn drain_plugin(
587    p: &dyn ArclyPlugin,
588    container: &'static crate::core::engine::FrozenDiContainer,
589    phase: &str,
590    budget: Duration,
591) {
592    match tokio::time::timeout(budget, p.on_shutdown(container)).await {
593        Ok(Ok(())) => {}
594        Ok(Err(e)) => {
595            tracing::error!(plugin = p.name(), phase, error = %e, "plugin shutdown error")
596        }
597        Err(_) => tracing::warn!(
598            plugin = p.name(),
599            phase,
600            budget = ?budget,
601            "plugin shutdown exceeded budget — skipped"
602        ),
603    }
604}
605
606fn plugin_io_err(e: PluginError) -> std::io::Error {
607    let kind = match e.stage {
608        PluginStage::Init => std::io::ErrorKind::InvalidInput,
609        PluginStage::Start => std::io::ErrorKind::ConnectionRefused,
610        PluginStage::Shutdown => std::io::ErrorKind::Other,
611    };
612    std::io::Error::new(kind, e)
613}
614
615#[cfg(test)]
616mod tests {
617    use super::*;
618
619    #[test]
620    fn env_overrides_apply_and_ignore_garbage() {
621        let cfg = LaunchConfig::default().apply_overrides(|k| match k {
622            "ARCLY_REQUEST_TIMEOUT_MS" => Some("1500".into()),
623            "ARCLY_MAX_IN_FLIGHT" => Some("notanumber".into()), // ignored, keeps default
624            "ARCLY_MAX_BODY_BYTES" => Some("1024".into()),
625            "ARCLY_EXPOSE_DOCS" => Some("false".into()),
626            _ => None,
627        });
628        assert_eq!(cfg.request_timeout, Duration::from_millis(1500));
629        assert_eq!(cfg.max_in_flight, 0, "unparseable override is ignored");
630        assert_eq!(cfg.max_body_bytes, 1024);
631        assert!(!cfg.expose_docs);
632        // Untouched fields keep coded defaults.
633        assert_eq!(cfg.drain_budget, Duration::from_secs(5));
634    }
635}