arcly-http 0.1.1

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
//! Launch contract.
//!
//! Boot phases (executed strictly in this order — order matters):
//!
//! 1. Collect inventory-registered providers + routes.
//! 2. Run each plugin's `on_init` with a *mutable* `ArclyPluginContext` —
//!    plugins may queue providers (`provide<T>`), routes, global
//!    interceptors, OpenAPI mutators.
//! 3. Apply queued provider closures to the `DiContainerBuilder`.
//! 4. **Freeze.** The container becomes `&'static`, lock-free for reads.
//! 5. Build the OpenAPI spec, run plugin spec-mutators, leak it as `&'static`.
//! 6. Mount macro-registered routes, then plugin-registered routes.
//! 7. Bind the listener.
//! 8. Run each plugin's `on_start(&container)` — background tasks spawn here.
//! 9. Serve. Ctrl-C / SIGTERM triggers axum's graceful shutdown — accepts
//!    stop, in-flight drain. **Only after** that completes do plugin
//!    `on_shutdown(&container)` calls run, each wrapped in a 5-second
//!    per-plugin timeout so a wedged plugin can never wedge the process.

use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;

use axum::response::{Html, IntoResponse};
use axum::routing::get;

use crate::core::engine::{
    DiContainerBuilder, HttpMethod, Module, ModuleDescriptor, RouteDescriptor,
};
use crate::core::plugins::{
    build_plugin_route, ArclyPlugin, ArclyPluginContext, PluginError, PluginStage,
};
use crate::openapi::{build_spec_filtered, OpenApiInfo, SWAGGER_UI_HTML};
use crate::realtime::gateway::GatewayDescriptor;
use crate::realtime::{ws_route, ConnectionRegistry};
use crate::web::boundary::adapt;

/// Tunables for the launch contract. Start from `Default` and adjust:
///
/// ```ignore
/// App::launch_configured::<AppModule>(addr, info, plugins, LaunchConfig {
///     request_timeout: Duration::from_secs(10),
///     max_in_flight: 4_096,
///     cors: Some(CorsConfig::for_origins(["https://app.example.com"])),
///     ..Default::default()
/// }).await
/// ```
#[derive(Clone, Debug)]
pub struct LaunchConfig {
    /// Per-plugin budget for `on_shutdown` / start-failure rollback (and the
    /// concurrent `on_draining` notification). A plugin exceeding it is
    /// skipped with a logged warning — it can never wedge the process.
    /// Default `5s`.
    pub drain_budget: Duration,
    /// Process-wide request deadline. Routes without `#[Timeout]` are
    /// cancelled (worker freed) and answered `504` past this. `ZERO`
    /// disables. Default `30s`.
    pub request_timeout: Duration,
    /// Hard cap on concurrent in-flight requests — beyond it requests are
    /// shed with `503` + `Retry-After` before any body is read (one atomic
    /// counter, no locks). `0` = unlimited. Default `0`.
    pub max_in_flight: usize,
    /// Request body cap for every entry point. Default 8 MiB.
    pub max_body_bytes: usize,
    /// Ceiling on `#[CacheTTL]` store entries — the key includes the query
    /// string, so an unbounded store is a memory-DoS vector. Default 10 000.
    pub cache_max_entries: usize,
    /// How often the cache sweeper reclaims expired entries. Default `30s`.
    pub cache_sweep_interval: Duration,
    /// After a shutdown signal, WebSocket clients get this long to finish
    /// before the server sends Close frames — otherwise live sockets keep
    /// the HTTP drain (and therefore plugin `on_shutdown`) waiting until
    /// the supervisor SIGKILLs. Default `10s`.
    pub ws_drain_deadline: Duration,
    /// CORS policy. `None` (default) mounts no CORS layer at all.
    pub cors: Option<crate::web::cors::CorsConfig>,
}

impl Default for LaunchConfig {
    fn default() -> Self {
        Self {
            drain_budget: Duration::from_secs(5),
            request_timeout: Duration::from_secs(30),
            max_in_flight: 0,
            max_body_bytes: 8 * 1024 * 1024,
            cache_max_entries: 10_000,
            cache_sweep_interval: Duration::from_secs(30),
            ws_drain_deadline: Duration::from_secs(10),
            cors: None,
        }
    }
}

pub struct App;

impl App {
    pub async fn launch<RootMod: Module>(addr: &str) -> std::io::Result<()> {
        let info = OpenApiInfo {
            title: "arcly-http service",
            version: env!("CARGO_PKG_VERSION"),
            ..Default::default()
        };
        Self::launch_with_info::<RootMod>(addr, info).await
    }

    pub async fn launch_named<RootMod: Module>(
        addr: &str,
        title: &'static str,
        version: &'static str,
    ) -> std::io::Result<()> {
        let info = OpenApiInfo {
            title,
            version,
            ..Default::default()
        };
        Self::launch_with_info::<RootMod>(addr, info).await
    }

    pub async fn launch_with_info<RootMod: Module>(
        addr: &str,
        info: OpenApiInfo,
    ) -> std::io::Result<()> {
        Self::launch_with_plugins::<RootMod>(addr, info, Vec::new()).await
    }

    /// Full launch contract with plugins. See the module docstring for the
    /// strict phase ordering.
    pub async fn launch_with_plugins<RootMod: Module>(
        addr: &str,
        info: OpenApiInfo,
        plugins: Vec<Box<dyn ArclyPlugin>>,
    ) -> std::io::Result<()> {
        Self::launch_configured::<RootMod>(addr, info, plugins, LaunchConfig::default()).await
    }

    /// `launch_with_plugins` with explicit [`LaunchConfig`] tunables.
    pub async fn launch_configured<RootMod: Module>(
        addr: &str,
        info: OpenApiInfo,
        mut plugins: Vec<Box<dyn ArclyPlugin>>,
        config: LaunchConfig,
    ) -> std::io::Result<()> {
        let _root: PhantomData<RootMod> = PhantomData;

        // ── 0. Walk the module DAG from RootMod ────────────────────
        let reachable_modules = collect_reachable_modules(RootMod::descriptor());
        let allowed_controllers: std::collections::HashSet<&'static str> = reachable_modules
            .iter()
            .flat_map(|m| m.controllers.iter().copied())
            .collect();

        // ── 1. Providers from reachable modules only ───────────────
        let mut b = DiContainerBuilder::new();
        for m in &reachable_modules {
            for p in m.providers {
                b.add_provider(p);
            }
        }

        // ── 2. Plugin on_init — they may queue providers + routes ──
        let mut plugin_ctx = ArclyPluginContext::new();
        for p in plugins.iter_mut() {
            plugin_ctx.current_plugin = p.name();
            if let Err(e) = p.on_init(&mut plugin_ctx).await {
                return Err(plugin_io_err(e));
            }
        }

        // ── 3. Apply queued provider closures ──────────────────────
        for f in plugin_ctx.pending_providers.drain(..) {
            f(&mut b);
        }

        // ── 4. Freeze the container — `&'static`, lock-free reads ──
        // The dynamic route table goes in first so services and plugins can
        // `Inject<DynamicRouteTable>` and mount `/_plugins/*` routes at runtime.
        b.register(crate::web::dynamic::DynamicRouteTable::new());
        let container = b.freeze();

        // ── 5. OpenAPI: build (scoped to the module DAG), then mutate ──
        let mut spec_value = build_spec_filtered(&info, Some(&allowed_controllers));
        for mutator in plugin_ctx.openapi_mutators.drain(..) {
            mutator(&mut spec_value);
        }
        // Serialize ONCE — /openapi.json then serves a static byte slice
        // instead of cloning + re-serializing a multi-MB Value per request.
        let spec_bytes: &'static [u8] = Box::leak(
            serde_json::to_vec(&spec_value)
                .unwrap_or_else(|e| {
                    // Spec is built from our own serde_json::Value — failure
                    // here is a framework bug; fail at boot, not per-request.
                    panic!("Arcly: OpenAPI spec serialization failed: {e}")
                })
                .into_boxed_slice(),
        );

        // ── 6. Mount routes (filtered by reachable controller set) ─
        // Plugin-registered global interceptors wrap every mounted route;
        // leak the list so route closures can hold a `&'static` slice.
        let globals: &'static [&'static dyn crate::web::interceptors::Interceptor] =
            Box::leak(std::mem::take(&mut plugin_ctx.global_interceptors).into_boxed_slice());
        // Boundary filters run before the body is read, on every entry point.
        let filters: &'static [&'static dyn crate::web::boundary::BoundaryFilter] =
            Box::leak(std::mem::take(&mut plugin_ctx.boundary_filters).into_boxed_slice());

        let mut router: axum::Router<&'static crate::core::engine::FrozenDiContainer> =
            axum::Router::new();
        let mut mounted: std::collections::HashSet<(&'static str, HttpMethod)> =
            std::collections::HashSet::new();
        // Framework-reserved routes, mounted below — plugins may not shadow them.
        mounted.insert(("/openapi.json", HttpMethod::GET));
        mounted.insert(("/docs", HttpMethod::GET));
        for rt in inventory::iter::<&'static RouteDescriptor> {
            // Empty `controller` = free-fn route → always mount.
            // Non-empty = must belong to a controller in the reachable DAG.
            if !rt.controller.is_empty() && !allowed_controllers.contains(rt.controller) {
                continue;
            }
            mounted.insert((rt.path, rt.method));
            router = router.route(rt.path, adapt(rt, globals, filters));
        }
        let mut app = router.with_state(container);
        for r in &plugin_ctx.extra_routes {
            // Surface collisions as a clean error naming the plugin, instead
            // of letting axum's `route()` panic deep inside launch.
            if !mounted.insert((r.path, r.method)) {
                return Err(plugin_io_err(PluginError::new(
                    r.plugin,
                    PluginStage::Init,
                    format!(
                        "route `{:?} {}` is already mounted by another route or plugin",
                        r.method, r.path
                    ),
                )));
            }
            app = app.route(r.path, build_plugin_route(container, r, globals, filters));
        }

        // Runtime-mutable plugin routes: one catch-all, dispatch via ArcSwap.
        app = app.route(
            "/_plugins/*rest",
            crate::web::dynamic::dynamic_dispatch_route(container, globals, filters),
        );

        // ── 6b. Mount real-time gateways (filtered by reachable module DAG) ─
        // One process-wide connection registry, leaked to `&'static` and shared
        // by every gateway upgrade route — sharded, lock-free on the hot path.
        let allowed_gateways: std::collections::HashSet<&'static str> = reachable_modules
            .iter()
            .flat_map(|m| m.gateways.iter().copied())
            .collect();
        let registry: &'static ConnectionRegistry = Box::leak(Box::new(ConnectionRegistry::new()));
        for gd in inventory::iter::<&'static GatewayDescriptor> {
            if !allowed_gateways.contains(gd.name) {
                continue;
            }
            let runtime = (gd.build)(container);
            app = app.route(gd.path, ws_route(runtime, registry, container));
        }

        let mut app = app
            .route(
                "/openapi.json",
                get(move || async move {
                    (
                        [(axum::http::header::CONTENT_TYPE, "application/json")],
                        spec_bytes,
                    )
                        .into_response()
                }),
            )
            .route(
                "/docs",
                get(|| async { Html(SWAGGER_UI_HTML).into_response() }),
            )
            .layer(axum::middleware::from_fn(
                crate::security::apply_security_headers,
            ));

        // ── 6c. Governance layers (outermost) ──────────────────────
        // CORS (when configured), then the governor: request-id, global
        // deadline, in-flight admission control. Layer order: the governor
        // is added last → wraps everything, including CORS preflights.
        if let Some(cors_cfg) = config.cors.clone() {
            let cors_cfg: &'static crate::web::cors::CorsConfig = Box::leak(Box::new(cors_cfg));
            app = app.layer(axum::middleware::from_fn(
                move |req: axum::extract::Request, next: axum::middleware::Next| {
                    crate::web::cors::apply_cors(cors_cfg, req, next)
                },
            ));
        }
        let gov = crate::web::governor::Governor::new(config.request_timeout, config.max_in_flight);
        let app = app.layer(axum::middleware::from_fn(
            move |req: axum::extract::Request, next: axum::middleware::Next| {
                crate::web::governor::govern(Arc::clone(&gov), req, next)
            },
        ));

        // ── 6d. Resource governance knobs + cache sweeper ──────────
        crate::web::boundary::set_max_body(config.max_body_bytes);
        crate::web::cache::set_capacity(config.cache_max_entries);
        crate::web::cache::spawn_sweeper(config.cache_sweep_interval);

        // ── 7. Bind ────────────────────────────────────────────────
        let listener = tokio::net::TcpListener::bind(addr).await?;

        // ── 8. Plugin on_start — bg tasks spawn here ───────────────
        //
        // If a plugin fails, roll back already-started plugins in reverse order
        // before propagating the error — prevents orphaned background tasks.
        // Each rollback `on_shutdown` gets the same 5-second per-plugin budget
        // used by the post-serve drain loop: a wedged plugin must not hang the
        // process or starve the remaining rollbacks.
        let plugins_arc: Arc<Vec<Box<dyn ArclyPlugin>>> = Arc::new(plugins);
        let mut started = 0usize;
        #[allow(clippy::explicit_counter_loop)] // counter outlives the loop for the error message
        for p in plugins_arc.iter() {
            if let Err(e) = p.on_start(container).await {
                for already in plugins_arc[..started].iter().rev() {
                    drain_plugin(already.as_ref(), container, "rollback", config.drain_budget)
                        .await;
                }
                return Err(plugin_io_err(e));
            }
            started += 1;
        }

        // ── 9. Serve with two-phase graceful shutdown ──────────────
        //
        // Phase A: SIGTERM/Ctrl-C → axum stops accepting + drains in-flight.
        // Phase B: AFTER axum has fully drained, run plugin `on_shutdown`s
        //          in reverse declaration order, wrapped in a 5s timeout.
        let plugins_for_draining = Arc::clone(&plugins_arc);
        let draining_budget = config.drain_budget;
        let ws_deadline = config.ws_drain_deadline;
        let serve = axum::serve(listener, app).with_graceful_shutdown(async move {
            shutdown_signal().await;
            tracing::info!("shutdown signal received — HTTP draining");
            // Live WebSockets would hold the drain open until clients leave
            // (i.e. forever) — give them `ws_drain_deadline`, then close.
            tokio::spawn(async move {
                tokio::time::sleep(ws_deadline).await;
                let closed = registry.close_all();
                if closed > 0 {
                    tracing::warn!(closed, "WS drain deadline reached — closed live sockets");
                }
            });
            // Notify plugins concurrently with the HTTP drain (spawned so the
            // listener closes immediately): stop consuming MQ/scheduler work
            // while in-flight HTTP requests finish. Cleanup stays in
            // `on_shutdown`, which runs only after the drain completes.
            tokio::spawn(async move {
                for p in plugins_for_draining.iter() {
                    match tokio::time::timeout(draining_budget, p.on_draining(container)).await {
                        Ok(Ok(())) => {}
                        Ok(Err(e)) => {
                            tracing::error!(plugin = p.name(), error = %e, "plugin draining error")
                        }
                        Err(_) => tracing::warn!(
                            plugin = p.name(),
                            budget = ?draining_budget,
                            "plugin on_draining exceeded budget"
                        ),
                    }
                }
            });
        });
        let serve_res = serve.await;

        // HTTP server has now fully stopped. Safe to drain plugins.
        tracing::info!(
            budget = ?config.drain_budget,
            "HTTP fully drained — running plugin on_shutdown (per-plugin budget)"
        );
        for p in plugins_arc.iter().rev() {
            drain_plugin(p.as_ref(), container, "shutdown", config.drain_budget).await;
        }
        serve_res
    }
}

/// Walk the module `imports` DAG breadth-first from the root, deduplicating
/// by descriptor pointer identity. Returns descriptors in a stable, root-first
/// traversal order.
fn collect_reachable_modules(root: &'static ModuleDescriptor) -> Vec<&'static ModuleDescriptor> {
    use std::collections::HashSet;
    let mut visited: HashSet<*const ModuleDescriptor> = HashSet::new();
    let mut queue: std::collections::VecDeque<&'static ModuleDescriptor> =
        std::collections::VecDeque::new();
    let mut order: Vec<&'static ModuleDescriptor> = Vec::new();
    queue.push_back(root);
    while let Some(m) = queue.pop_front() {
        if !visited.insert(m as *const _) {
            continue;
        }
        order.push(m);
        for getter in m.imports {
            queue.push_back(getter());
        }
    }
    order
}

/// Wait for either SIGINT (Ctrl-C) or SIGTERM (process supervisor / K8s / Docker).
///
/// On Windows only SIGINT is available; the cfg guard keeps the unix-specific
/// import out of non-unix builds.
#[cfg(unix)]
async fn shutdown_signal() {
    use tokio::signal::unix::{signal, SignalKind};
    match signal(SignalKind::terminate()) {
        Ok(mut sigterm) => {
            tokio::select! {
                _ = tokio::signal::ctrl_c() => {}
                _ = sigterm.recv() => {}
            }
        }
        Err(e) => {
            tracing::warn!(error = %e, "SIGTERM handler unavailable, falling back to SIGINT only");
            let _ = tokio::signal::ctrl_c().await;
        }
    }
}

#[cfg(not(unix))]
async fn shutdown_signal() {
    let _ = tokio::signal::ctrl_c().await;
}

/// Run one plugin's `on_shutdown` under a per-plugin timeout, logging (never
/// propagating) errors and timeouts. Used for both start-failure rollback
/// and post-serve drain so the two paths can't drift apart.
async fn drain_plugin(
    p: &dyn ArclyPlugin,
    container: &'static crate::core::engine::FrozenDiContainer,
    phase: &str,
    budget: Duration,
) {
    match tokio::time::timeout(budget, p.on_shutdown(container)).await {
        Ok(Ok(())) => {}
        Ok(Err(e)) => {
            tracing::error!(plugin = p.name(), phase, error = %e, "plugin shutdown error")
        }
        Err(_) => tracing::warn!(
            plugin = p.name(),
            phase,
            budget = ?budget,
            "plugin shutdown exceeded budget — skipped"
        ),
    }
}

fn plugin_io_err(e: PluginError) -> std::io::Error {
    let kind = match e.stage {
        PluginStage::Init => std::io::ErrorKind::InvalidInput,
        PluginStage::Start => std::io::ErrorKind::ConnectionRefused,
        PluginStage::Shutdown => std::io::ErrorKind::Other,
    };
    std::io::Error::new(kind, e)
}