Skip to main content

arcly_http/
app.rs

1//! Launch contract.
2//!
3//! Boot phases (executed strictly in this order — order matters):
4//!
5//! 1. Collect inventory-registered providers + routes.
6//! 2. Run each plugin's `on_init` with a *mutable* `ArclyPluginContext` —
7//!    plugins may queue providers (`provide<T>`), routes, global
8//!    interceptors, OpenAPI mutators.
9//! 3. Apply queued provider closures to the `DiContainerBuilder`.
10//! 4. **Freeze.** The container becomes `&'static`, lock-free for reads.
11//! 5. Build the OpenAPI spec, run plugin spec-mutators, leak it as `&'static`.
12//! 6. Mount macro-registered routes, then plugin-registered routes.
13//! 7. Bind the listener.
14//! 8. Run each plugin's `on_start(&container)` — background tasks spawn here.
15//! 9. Serve. Ctrl-C / SIGTERM triggers axum's graceful shutdown — accepts
16//!    stop, in-flight drain. **Only after** that completes do plugin
17//!    `on_shutdown(&container)` calls run, each wrapped in a 5-second
18//!    per-plugin timeout so a wedged plugin can never wedge the process.
19
20use std::marker::PhantomData;
21use std::sync::Arc;
22use std::time::Duration;
23
24use axum::response::{Html, IntoResponse};
25use axum::routing::get;
26
27use crate::core::engine::{
28    DiContainerBuilder, HttpMethod, Module, ModuleDescriptor, RouteDescriptor,
29};
30use crate::core::plugins::{
31    build_plugin_route, ArclyPlugin, ArclyPluginContext, PluginError, PluginStage,
32};
33use crate::openapi::{build_spec_filtered, OpenApiInfo, SWAGGER_UI_HTML};
34use crate::realtime::gateway::GatewayDescriptor;
35use crate::realtime::{ws_route, ConnectionRegistry};
36use crate::web::boundary::adapt;
37
38/// Tunables for the launch contract. Start from `Default` and adjust:
39///
40/// ```ignore
41/// App::launch_configured::<AppModule>(addr, info, plugins, LaunchConfig {
42///     request_timeout: Duration::from_secs(10),
43///     max_in_flight: 4_096,
44///     cors: Some(CorsConfig::for_origins(["https://app.example.com"])),
45///     ..Default::default()
46/// }).await
47/// ```
48#[derive(Clone, Debug)]
49pub struct LaunchConfig {
50    /// Per-plugin budget for `on_shutdown` / start-failure rollback (and the
51    /// concurrent `on_draining` notification). A plugin exceeding it is
52    /// skipped with a logged warning — it can never wedge the process.
53    /// Default `5s`.
54    pub drain_budget: Duration,
55    /// Process-wide request deadline. Routes without `#[Timeout]` are
56    /// cancelled (worker freed) and answered `504` past this. `ZERO`
57    /// disables. Default `30s`.
58    pub request_timeout: Duration,
59    /// Hard cap on concurrent in-flight requests — beyond it requests are
60    /// shed with `503` + `Retry-After` before any body is read (one atomic
61    /// counter, no locks). `0` = unlimited. Default `0`.
62    pub max_in_flight: usize,
63    /// Request body cap for every entry point. Default 8 MiB.
64    pub max_body_bytes: usize,
65    /// Ceiling on `#[CacheTTL]` store entries — the key includes the query
66    /// string, so an unbounded store is a memory-DoS vector. Default 10 000.
67    pub cache_max_entries: usize,
68    /// How often the cache sweeper reclaims expired entries. Default `30s`.
69    pub cache_sweep_interval: Duration,
70    /// After a shutdown signal, WebSocket clients get this long to finish
71    /// before the server sends Close frames — otherwise live sockets keep
72    /// the HTTP drain (and therefore plugin `on_shutdown`) waiting until
73    /// the supervisor SIGKILLs. Default `10s`.
74    pub ws_drain_deadline: Duration,
75    /// CORS policy. `None` (default) mounts no CORS layer at all.
76    pub cors: Option<crate::web::cors::CorsConfig>,
77}
78
79impl Default for LaunchConfig {
80    fn default() -> Self {
81        Self {
82            drain_budget: Duration::from_secs(5),
83            request_timeout: Duration::from_secs(30),
84            max_in_flight: 0,
85            max_body_bytes: 8 * 1024 * 1024,
86            cache_max_entries: 10_000,
87            cache_sweep_interval: Duration::from_secs(30),
88            ws_drain_deadline: Duration::from_secs(10),
89            cors: None,
90        }
91    }
92}
93
94pub struct App;
95
96impl App {
97    pub async fn launch<RootMod: Module>(addr: &str) -> std::io::Result<()> {
98        let info = OpenApiInfo {
99            title: "arcly-http service",
100            version: env!("CARGO_PKG_VERSION"),
101            ..Default::default()
102        };
103        Self::launch_with_info::<RootMod>(addr, info).await
104    }
105
106    pub async fn launch_named<RootMod: Module>(
107        addr: &str,
108        title: &'static str,
109        version: &'static str,
110    ) -> std::io::Result<()> {
111        let info = OpenApiInfo {
112            title,
113            version,
114            ..Default::default()
115        };
116        Self::launch_with_info::<RootMod>(addr, info).await
117    }
118
119    pub async fn launch_with_info<RootMod: Module>(
120        addr: &str,
121        info: OpenApiInfo,
122    ) -> std::io::Result<()> {
123        Self::launch_with_plugins::<RootMod>(addr, info, Vec::new()).await
124    }
125
126    /// Full launch contract with plugins. See the module docstring for the
127    /// strict phase ordering.
128    pub async fn launch_with_plugins<RootMod: Module>(
129        addr: &str,
130        info: OpenApiInfo,
131        plugins: Vec<Box<dyn ArclyPlugin>>,
132    ) -> std::io::Result<()> {
133        Self::launch_configured::<RootMod>(addr, info, plugins, LaunchConfig::default()).await
134    }
135
136    /// `launch_with_plugins` with explicit [`LaunchConfig`] tunables.
137    pub async fn launch_configured<RootMod: Module>(
138        addr: &str,
139        info: OpenApiInfo,
140        mut plugins: Vec<Box<dyn ArclyPlugin>>,
141        config: LaunchConfig,
142    ) -> std::io::Result<()> {
143        let _root: PhantomData<RootMod> = PhantomData;
144
145        // ── 0. Walk the module DAG from RootMod ────────────────────
146        let reachable_modules = collect_reachable_modules(RootMod::descriptor());
147        let allowed_controllers: std::collections::HashSet<&'static str> = reachable_modules
148            .iter()
149            .flat_map(|m| m.controllers.iter().copied())
150            .collect();
151
152        // ── 1. Providers from reachable modules only ───────────────
153        let mut b = DiContainerBuilder::new();
154        for m in &reachable_modules {
155            for p in m.providers {
156                b.add_provider(p);
157            }
158        }
159
160        // ── 2. Plugin on_init — they may queue providers + routes ──
161        let mut plugin_ctx = ArclyPluginContext::new();
162        for p in plugins.iter_mut() {
163            plugin_ctx.current_plugin = p.name();
164            if let Err(e) = p.on_init(&mut plugin_ctx).await {
165                return Err(plugin_io_err(e));
166            }
167        }
168
169        // ── 3. Apply queued provider closures ──────────────────────
170        for f in plugin_ctx.pending_providers.drain(..) {
171            f(&mut b);
172        }
173
174        // ── 4. Freeze the container — `&'static`, lock-free reads ──
175        // The dynamic route table goes in first so services and plugins can
176        // `Inject<DynamicRouteTable>` and mount `/_plugins/*` routes at runtime.
177        b.register(crate::web::dynamic::DynamicRouteTable::new());
178        let container = b.freeze();
179
180        // ── 5. OpenAPI: build (scoped to the module DAG), then mutate ──
181        let mut spec_value = build_spec_filtered(&info, Some(&allowed_controllers));
182        for mutator in plugin_ctx.openapi_mutators.drain(..) {
183            mutator(&mut spec_value);
184        }
185        // Serialize ONCE — /openapi.json then serves a static byte slice
186        // instead of cloning + re-serializing a multi-MB Value per request.
187        let spec_bytes: &'static [u8] = Box::leak(
188            serde_json::to_vec(&spec_value)
189                .unwrap_or_else(|e| {
190                    // Spec is built from our own serde_json::Value — failure
191                    // here is a framework bug; fail at boot, not per-request.
192                    panic!("Arcly: OpenAPI spec serialization failed: {e}")
193                })
194                .into_boxed_slice(),
195        );
196
197        // ── 6. Mount routes (filtered by reachable controller set) ─
198        // Plugin-registered global interceptors wrap every mounted route;
199        // leak the list so route closures can hold a `&'static` slice.
200        let globals: &'static [&'static dyn crate::web::interceptors::Interceptor] =
201            Box::leak(std::mem::take(&mut plugin_ctx.global_interceptors).into_boxed_slice());
202        // Boundary filters run before the body is read, on every entry point.
203        let filters: &'static [&'static dyn crate::web::boundary::BoundaryFilter] =
204            Box::leak(std::mem::take(&mut plugin_ctx.boundary_filters).into_boxed_slice());
205
206        let mut router: axum::Router<&'static crate::core::engine::FrozenDiContainer> =
207            axum::Router::new();
208        let mut mounted: std::collections::HashSet<(&'static str, HttpMethod)> =
209            std::collections::HashSet::new();
210        // Framework-reserved routes, mounted below — plugins may not shadow them.
211        mounted.insert(("/openapi.json", HttpMethod::GET));
212        mounted.insert(("/docs", HttpMethod::GET));
213        for rt in inventory::iter::<&'static RouteDescriptor> {
214            // Empty `controller` = free-fn route → always mount.
215            // Non-empty = must belong to a controller in the reachable DAG.
216            if !rt.controller.is_empty() && !allowed_controllers.contains(rt.controller) {
217                continue;
218            }
219            mounted.insert((rt.path, rt.method));
220            router = router.route(rt.path, adapt(rt, globals, filters));
221        }
222        let mut app = router.with_state(container);
223        for r in &plugin_ctx.extra_routes {
224            // Surface collisions as a clean error naming the plugin, instead
225            // of letting axum's `route()` panic deep inside launch.
226            if !mounted.insert((r.path, r.method)) {
227                return Err(plugin_io_err(PluginError::new(
228                    r.plugin,
229                    PluginStage::Init,
230                    format!(
231                        "route `{:?} {}` is already mounted by another route or plugin",
232                        r.method, r.path
233                    ),
234                )));
235            }
236            app = app.route(r.path, build_plugin_route(container, r, globals, filters));
237        }
238
239        // Runtime-mutable plugin routes: one catch-all, dispatch via ArcSwap.
240        app = app.route(
241            "/_plugins/*rest",
242            crate::web::dynamic::dynamic_dispatch_route(container, globals, filters),
243        );
244
245        // ── 6b. Mount real-time gateways (filtered by reachable module DAG) ─
246        // One process-wide connection registry, leaked to `&'static` and shared
247        // by every gateway upgrade route — sharded, lock-free on the hot path.
248        let allowed_gateways: std::collections::HashSet<&'static str> = reachable_modules
249            .iter()
250            .flat_map(|m| m.gateways.iter().copied())
251            .collect();
252        let registry: &'static ConnectionRegistry = Box::leak(Box::new(ConnectionRegistry::new()));
253        for gd in inventory::iter::<&'static GatewayDescriptor> {
254            if !allowed_gateways.contains(gd.name) {
255                continue;
256            }
257            let runtime = (gd.build)(container);
258            app = app.route(gd.path, ws_route(runtime, registry, container));
259        }
260
261        let mut app = app
262            .route(
263                "/openapi.json",
264                get(move || async move {
265                    (
266                        [(axum::http::header::CONTENT_TYPE, "application/json")],
267                        spec_bytes,
268                    )
269                        .into_response()
270                }),
271            )
272            .route(
273                "/docs",
274                get(|| async { Html(SWAGGER_UI_HTML).into_response() }),
275            )
276            .layer(axum::middleware::from_fn(
277                crate::security::apply_security_headers,
278            ));
279
280        // ── 6c. Governance layers (outermost) ──────────────────────
281        // CORS (when configured), then the governor: request-id, global
282        // deadline, in-flight admission control. Layer order: the governor
283        // is added last → wraps everything, including CORS preflights.
284        if let Some(cors_cfg) = config.cors.clone() {
285            let cors_cfg: &'static crate::web::cors::CorsConfig = Box::leak(Box::new(cors_cfg));
286            app = app.layer(axum::middleware::from_fn(
287                move |req: axum::extract::Request, next: axum::middleware::Next| {
288                    crate::web::cors::apply_cors(cors_cfg, req, next)
289                },
290            ));
291        }
292        let gov = crate::web::governor::Governor::new(config.request_timeout, config.max_in_flight);
293        let app = app.layer(axum::middleware::from_fn(
294            move |req: axum::extract::Request, next: axum::middleware::Next| {
295                crate::web::governor::govern(Arc::clone(&gov), req, next)
296            },
297        ));
298
299        // ── 6d. Resource governance knobs + cache sweeper ──────────
300        crate::web::boundary::set_max_body(config.max_body_bytes);
301        crate::web::cache::set_capacity(config.cache_max_entries);
302        crate::web::cache::spawn_sweeper(config.cache_sweep_interval);
303
304        // ── 7. Bind ────────────────────────────────────────────────
305        let listener = tokio::net::TcpListener::bind(addr).await?;
306
307        // ── 8. Plugin on_start — bg tasks spawn here ───────────────
308        //
309        // If a plugin fails, roll back already-started plugins in reverse order
310        // before propagating the error — prevents orphaned background tasks.
311        // Each rollback `on_shutdown` gets the same 5-second per-plugin budget
312        // used by the post-serve drain loop: a wedged plugin must not hang the
313        // process or starve the remaining rollbacks.
314        let plugins_arc: Arc<Vec<Box<dyn ArclyPlugin>>> = Arc::new(plugins);
315        let mut started = 0usize;
316        #[allow(clippy::explicit_counter_loop)] // counter outlives the loop for the error message
317        for p in plugins_arc.iter() {
318            if let Err(e) = p.on_start(container).await {
319                for already in plugins_arc[..started].iter().rev() {
320                    drain_plugin(already.as_ref(), container, "rollback", config.drain_budget)
321                        .await;
322                }
323                return Err(plugin_io_err(e));
324            }
325            started += 1;
326        }
327
328        // ── 9. Serve with two-phase graceful shutdown ──────────────
329        //
330        // Phase A: SIGTERM/Ctrl-C → axum stops accepting + drains in-flight.
331        // Phase B: AFTER axum has fully drained, run plugin `on_shutdown`s
332        //          in reverse declaration order, wrapped in a 5s timeout.
333        let plugins_for_draining = Arc::clone(&plugins_arc);
334        let draining_budget = config.drain_budget;
335        let ws_deadline = config.ws_drain_deadline;
336        let serve = axum::serve(listener, app).with_graceful_shutdown(async move {
337            shutdown_signal().await;
338            tracing::info!("shutdown signal received — HTTP draining");
339            // Live WebSockets would hold the drain open until clients leave
340            // (i.e. forever) — give them `ws_drain_deadline`, then close.
341            tokio::spawn(async move {
342                tokio::time::sleep(ws_deadline).await;
343                let closed = registry.close_all();
344                if closed > 0 {
345                    tracing::warn!(closed, "WS drain deadline reached — closed live sockets");
346                }
347            });
348            // Notify plugins concurrently with the HTTP drain (spawned so the
349            // listener closes immediately): stop consuming MQ/scheduler work
350            // while in-flight HTTP requests finish. Cleanup stays in
351            // `on_shutdown`, which runs only after the drain completes.
352            tokio::spawn(async move {
353                for p in plugins_for_draining.iter() {
354                    match tokio::time::timeout(draining_budget, p.on_draining(container)).await {
355                        Ok(Ok(())) => {}
356                        Ok(Err(e)) => {
357                            tracing::error!(plugin = p.name(), error = %e, "plugin draining error")
358                        }
359                        Err(_) => tracing::warn!(
360                            plugin = p.name(),
361                            budget = ?draining_budget,
362                            "plugin on_draining exceeded budget"
363                        ),
364                    }
365                }
366            });
367        });
368        let serve_res = serve.await;
369
370        // HTTP server has now fully stopped. Safe to drain plugins.
371        tracing::info!(
372            budget = ?config.drain_budget,
373            "HTTP fully drained — running plugin on_shutdown (per-plugin budget)"
374        );
375        for p in plugins_arc.iter().rev() {
376            drain_plugin(p.as_ref(), container, "shutdown", config.drain_budget).await;
377        }
378        serve_res
379    }
380}
381
382/// Walk the module `imports` DAG breadth-first from the root, deduplicating
383/// by descriptor pointer identity. Returns descriptors in a stable, root-first
384/// traversal order.
385fn collect_reachable_modules(root: &'static ModuleDescriptor) -> Vec<&'static ModuleDescriptor> {
386    use std::collections::HashSet;
387    let mut visited: HashSet<*const ModuleDescriptor> = HashSet::new();
388    let mut queue: std::collections::VecDeque<&'static ModuleDescriptor> =
389        std::collections::VecDeque::new();
390    let mut order: Vec<&'static ModuleDescriptor> = Vec::new();
391    queue.push_back(root);
392    while let Some(m) = queue.pop_front() {
393        if !visited.insert(m as *const _) {
394            continue;
395        }
396        order.push(m);
397        for getter in m.imports {
398            queue.push_back(getter());
399        }
400    }
401    order
402}
403
404/// Wait for either SIGINT (Ctrl-C) or SIGTERM (process supervisor / K8s / Docker).
405///
406/// On Windows only SIGINT is available; the cfg guard keeps the unix-specific
407/// import out of non-unix builds.
408#[cfg(unix)]
409async fn shutdown_signal() {
410    use tokio::signal::unix::{signal, SignalKind};
411    match signal(SignalKind::terminate()) {
412        Ok(mut sigterm) => {
413            tokio::select! {
414                _ = tokio::signal::ctrl_c() => {}
415                _ = sigterm.recv() => {}
416            }
417        }
418        Err(e) => {
419            tracing::warn!(error = %e, "SIGTERM handler unavailable, falling back to SIGINT only");
420            let _ = tokio::signal::ctrl_c().await;
421        }
422    }
423}
424
425#[cfg(not(unix))]
426async fn shutdown_signal() {
427    let _ = tokio::signal::ctrl_c().await;
428}
429
430/// Run one plugin's `on_shutdown` under a per-plugin timeout, logging (never
431/// propagating) errors and timeouts. Used for both start-failure rollback
432/// and post-serve drain so the two paths can't drift apart.
433async fn drain_plugin(
434    p: &dyn ArclyPlugin,
435    container: &'static crate::core::engine::FrozenDiContainer,
436    phase: &str,
437    budget: Duration,
438) {
439    match tokio::time::timeout(budget, p.on_shutdown(container)).await {
440        Ok(Ok(())) => {}
441        Ok(Err(e)) => {
442            tracing::error!(plugin = p.name(), phase, error = %e, "plugin shutdown error")
443        }
444        Err(_) => tracing::warn!(
445            plugin = p.name(),
446            phase,
447            budget = ?budget,
448            "plugin shutdown exceeded budget — skipped"
449        ),
450    }
451}
452
453fn plugin_io_err(e: PluginError) -> std::io::Error {
454    let kind = match e.stage {
455        PluginStage::Init => std::io::ErrorKind::InvalidInput,
456        PluginStage::Start => std::io::ErrorKind::ConnectionRefused,
457        PluginStage::Shutdown => std::io::ErrorKind::Other,
458    };
459    std::io::Error::new(kind, e)
460}