Skip to main content

arcly_http/
app.rs

1//! Launch contract.
2//!
3//! Boot phases (executed strictly in this order — order matters):
4//!
5//! 1. Collect inventory-registered providers + routes.
6//! 2. Run each plugin's `on_init` with a *mutable* `ArclyPluginContext` —
7//!    plugins may queue providers (`provide<T>`), routes, global
8//!    interceptors, OpenAPI mutators.
9//! 3. Apply queued provider closures to the `DiContainerBuilder`.
10//! 4. **Freeze.** The container becomes `&'static`, lock-free for reads.
11//! 5. Build the OpenAPI spec, run plugin spec-mutators, leak it as `&'static`.
12//! 6. Mount macro-registered routes, then plugin-registered routes.
13//! 7. Bind the listener.
14//! 8. Run each plugin's `on_start(&container)` — background tasks spawn here.
15//! 9. Serve. Ctrl-C / SIGTERM triggers axum's graceful shutdown — accepts
16//!    stop, in-flight drain. **Only after** that completes do plugin
17//!    `on_shutdown(&container)` calls run, wrapped in a 5-second timeout so
18//!    a wedged plugin can never wedge the process.
19
20use std::marker::PhantomData;
21use std::sync::Arc;
22use std::time::Duration;
23
24use axum::response::{Html, IntoResponse, Json};
25use axum::routing::get;
26
27use crate::core::engine::{DiContainerBuilder, Module, ModuleDescriptor, RouteDescriptor};
28use crate::core::plugins::{
29    build_plugin_route, ArclyPlugin, ArclyPluginContext, PluginError, PluginStage,
30};
31use crate::openapi::{build_spec_filtered, OpenApiInfo, SWAGGER_UI_HTML};
32use crate::realtime::gateway::GatewayDescriptor;
33use crate::realtime::{ws_route, ConnectionRegistry};
34use crate::web::boundary::adapt;
35
36pub struct App;
37
38impl App {
39    pub async fn launch<RootMod: Module>(addr: &str) -> std::io::Result<()> {
40        let info = OpenApiInfo {
41            title: "arcly-http service",
42            version: env!("CARGO_PKG_VERSION"),
43            ..Default::default()
44        };
45        Self::launch_with_info::<RootMod>(addr, info).await
46    }
47
48    pub async fn launch_named<RootMod: Module>(
49        addr: &str,
50        title: &'static str,
51        version: &'static str,
52    ) -> std::io::Result<()> {
53        let info = OpenApiInfo {
54            title,
55            version,
56            ..Default::default()
57        };
58        Self::launch_with_info::<RootMod>(addr, info).await
59    }
60
61    pub async fn launch_with_info<RootMod: Module>(
62        addr: &str,
63        info: OpenApiInfo,
64    ) -> std::io::Result<()> {
65        Self::launch_with_plugins::<RootMod>(addr, info, Vec::new()).await
66    }
67
68    /// Full launch contract with plugins. See the module docstring for the
69    /// strict phase ordering.
70    pub async fn launch_with_plugins<RootMod: Module>(
71        addr: &str,
72        info: OpenApiInfo,
73        mut plugins: Vec<Box<dyn ArclyPlugin>>,
74    ) -> std::io::Result<()> {
75        let _root: PhantomData<RootMod> = PhantomData;
76
77        // ── 0. Walk the module DAG from RootMod ────────────────────
78        let reachable_modules = collect_reachable_modules(RootMod::descriptor());
79        let allowed_controllers: std::collections::HashSet<&'static str> = reachable_modules
80            .iter()
81            .flat_map(|m| m.controllers.iter().copied())
82            .collect();
83
84        // ── 1. Providers from reachable modules only ───────────────
85        let mut b = DiContainerBuilder::new();
86        for m in &reachable_modules {
87            for p in m.providers {
88                b.add_provider(p);
89            }
90        }
91
92        // ── 2. Plugin on_init — they may queue providers + routes ──
93        let mut plugin_ctx = ArclyPluginContext::new();
94        for p in plugins.iter_mut() {
95            if let Err(e) = p.on_init(&mut plugin_ctx).await {
96                return Err(plugin_io_err(e));
97            }
98        }
99
100        // ── 3. Apply queued provider closures ──────────────────────
101        for f in plugin_ctx.pending_providers.drain(..) {
102            f(&mut b);
103        }
104
105        // ── 4. Freeze the container — `&'static`, lock-free reads ──
106        let container = b.freeze();
107
108        // ── 5. OpenAPI: build (scoped to the module DAG), then mutate ──
109        let mut spec_value = build_spec_filtered(&info, Some(&allowed_controllers));
110        for mutator in plugin_ctx.openapi_mutators.drain(..) {
111            mutator(&mut spec_value);
112        }
113        let spec: &'static serde_json::Value = Box::leak(Box::new(spec_value));
114
115        // ── 6. Mount routes (filtered by reachable controller set) ─
116        let mut router: axum::Router<&'static crate::core::engine::FrozenDiContainer> =
117            axum::Router::new();
118        for rt in inventory::iter::<&'static RouteDescriptor> {
119            // Empty `controller` = free-fn route → always mount.
120            // Non-empty = must belong to a controller in the reachable DAG.
121            if !rt.controller.is_empty() && !allowed_controllers.contains(rt.controller) {
122                continue;
123            }
124            router = router.route(rt.path, adapt(rt));
125        }
126        let mut app = router.with_state(container);
127        for r in &plugin_ctx.extra_routes {
128            app = app.route(r.path, build_plugin_route(container, r));
129        }
130
131        // ── 6b. Mount real-time gateways (filtered by reachable module DAG) ─
132        // One process-wide connection registry, leaked to `&'static` and shared
133        // by every gateway upgrade route — sharded, lock-free on the hot path.
134        let allowed_gateways: std::collections::HashSet<&'static str> = reachable_modules
135            .iter()
136            .flat_map(|m| m.gateways.iter().copied())
137            .collect();
138        let registry: &'static ConnectionRegistry = Box::leak(Box::new(ConnectionRegistry::new()));
139        for gd in inventory::iter::<&'static GatewayDescriptor> {
140            if !allowed_gateways.contains(gd.name) {
141                continue;
142            }
143            let runtime = (gd.build)(container);
144            app = app.route(gd.path, ws_route(runtime, registry, container));
145        }
146
147        let app = app
148            .route(
149                "/openapi.json",
150                get(move || async move { Json(spec.clone()) }),
151            )
152            .route(
153                "/docs",
154                get(|| async { Html(SWAGGER_UI_HTML).into_response() }),
155            )
156            .layer(axum::middleware::from_fn(
157                crate::security::apply_security_headers,
158            ));
159
160        // ── 7. Bind ────────────────────────────────────────────────
161        let listener = tokio::net::TcpListener::bind(addr).await?;
162
163        // ── 8. Plugin on_start — bg tasks spawn here ───────────────
164        //
165        // If a plugin fails, roll back already-started plugins in reverse order
166        // before propagating the error — prevents orphaned background tasks.
167        // The rollback is wrapped in the same 5-second budget used by the
168        // post-serve drain loop: a wedged on_shutdown must not hang the process.
169        let plugins_arc: Arc<Vec<Box<dyn ArclyPlugin>>> = Arc::new(plugins);
170        let rollback_budget = Duration::from_secs(5);
171        let mut started = 0usize;
172        #[allow(clippy::explicit_counter_loop)] // counter outlives the loop for the error message
173        for p in plugins_arc.iter() {
174            if let Err(e) = p.on_start(container).await {
175                let already_started = Arc::clone(&plugins_arc);
176                let rolled = tokio::time::timeout(rollback_budget, async move {
177                    for already in already_started[..started].iter().rev() {
178                        if let Err(se) = already.on_shutdown(container).await {
179                            eprintln!("[arcly] plugin `{}` cleanup error: {}", already.name(), se);
180                        }
181                    }
182                })
183                .await;
184                if rolled.is_err() {
185                    eprintln!("[arcly] plugin rollback exceeded {rollback_budget:?} — forced");
186                }
187                return Err(plugin_io_err(e));
188            }
189            started += 1;
190        }
191
192        // ── 9. Serve with two-phase graceful shutdown ──────────────
193        //
194        // Phase A: SIGTERM/Ctrl-C → axum stops accepting + drains in-flight.
195        // Phase B: AFTER axum has fully drained, run plugin `on_shutdown`s
196        //          in reverse declaration order, wrapped in a 5s timeout.
197        let serve = axum::serve(listener, app).with_graceful_shutdown(async {
198            shutdown_signal().await;
199            eprintln!("[arcly] shutdown signal received — HTTP draining");
200        });
201        let serve_res = serve.await;
202
203        // HTTP server has now fully stopped. Safe to drain plugins.
204        eprintln!("[arcly] HTTP fully drained — running plugin on_shutdown (5s budget)");
205        let drain_budget = Duration::from_secs(5);
206        let plugins_for_drain = Arc::clone(&plugins_arc);
207        let drained = tokio::time::timeout(drain_budget, async move {
208            for p in plugins_for_drain.iter().rev() {
209                if let Err(e) = p.on_shutdown(container).await {
210                    eprintln!("[arcly] plugin `{}` shutdown error: {}", p.name(), e);
211                }
212            }
213        })
214        .await;
215        if drained.is_err() {
216            eprintln!("[arcly] plugin drain exceeded {drain_budget:?} — forced exit");
217        }
218        serve_res
219    }
220}
221
222/// Walk the module `imports` DAG breadth-first from the root, deduplicating
223/// by descriptor pointer identity. Returns descriptors in a stable, root-first
224/// traversal order.
225fn collect_reachable_modules(root: &'static ModuleDescriptor) -> Vec<&'static ModuleDescriptor> {
226    use std::collections::HashSet;
227    let mut visited: HashSet<*const ModuleDescriptor> = HashSet::new();
228    let mut queue: std::collections::VecDeque<&'static ModuleDescriptor> =
229        std::collections::VecDeque::new();
230    let mut order: Vec<&'static ModuleDescriptor> = Vec::new();
231    queue.push_back(root);
232    while let Some(m) = queue.pop_front() {
233        if !visited.insert(m as *const _) {
234            continue;
235        }
236        order.push(m);
237        for getter in m.imports {
238            queue.push_back(getter());
239        }
240    }
241    order
242}
243
244/// Wait for either SIGINT (Ctrl-C) or SIGTERM (process supervisor / K8s / Docker).
245///
246/// On Windows only SIGINT is available; the cfg guard keeps the unix-specific
247/// import out of non-unix builds.
248#[cfg(unix)]
249async fn shutdown_signal() {
250    use tokio::signal::unix::{signal, SignalKind};
251    match signal(SignalKind::terminate()) {
252        Ok(mut sigterm) => {
253            tokio::select! {
254                _ = tokio::signal::ctrl_c() => {}
255                _ = sigterm.recv() => {}
256            }
257        }
258        Err(e) => {
259            eprintln!("[arcly] SIGTERM handler unavailable ({e}), falling back to SIGINT only");
260            let _ = tokio::signal::ctrl_c().await;
261        }
262    }
263}
264
265#[cfg(not(unix))]
266async fn shutdown_signal() {
267    let _ = tokio::signal::ctrl_c().await;
268}
269
270fn plugin_io_err(e: PluginError) -> std::io::Error {
271    let kind = match e.stage {
272        PluginStage::Init => std::io::ErrorKind::InvalidInput,
273        PluginStage::Start => std::io::ErrorKind::ConnectionRefused,
274        PluginStage::Shutdown => std::io::ErrorKind::Other,
275    };
276    std::io::Error::new(kind, e)
277}