arcly-http 0.1.0

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! Launch contract.
//!
//! Boot phases (executed strictly in this order — order matters):
//!
//! 1. Collect inventory-registered providers + routes.
//! 2. Run each plugin's `on_init` with a *mutable* `ArclyPluginContext` —
//!    plugins may queue providers (`provide<T>`), routes, global
//!    interceptors, OpenAPI mutators.
//! 3. Apply queued provider closures to the `DiContainerBuilder`.
//! 4. **Freeze.** The container becomes `&'static`, lock-free for reads.
//! 5. Build the OpenAPI spec, run plugin spec-mutators, leak it as `&'static`.
//! 6. Mount macro-registered routes, then plugin-registered routes.
//! 7. Bind the listener.
//! 8. Run each plugin's `on_start(&container)` — background tasks spawn here.
//! 9. Serve. Ctrl-C / SIGTERM triggers axum's graceful shutdown — accepts
//!    stop, in-flight drain. **Only after** that completes do plugin
//!    `on_shutdown(&container)` calls run, wrapped in a 5-second timeout so
//!    a wedged plugin can never wedge the process.

use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;

use axum::response::{Html, IntoResponse, Json};
use axum::routing::get;

use crate::core::engine::{DiContainerBuilder, Module, ModuleDescriptor, RouteDescriptor};
use crate::core::plugins::{
    build_plugin_route, ArclyPlugin, ArclyPluginContext, PluginError, PluginStage,
};
use crate::openapi::{build_spec_filtered, OpenApiInfo, SWAGGER_UI_HTML};
use crate::realtime::gateway::GatewayDescriptor;
use crate::realtime::{ws_route, ConnectionRegistry};
use crate::web::boundary::adapt;

pub struct App;

impl App {
    pub async fn launch<RootMod: Module>(addr: &str) -> std::io::Result<()> {
        let info = OpenApiInfo {
            title: "arcly-http service",
            version: env!("CARGO_PKG_VERSION"),
            ..Default::default()
        };
        Self::launch_with_info::<RootMod>(addr, info).await
    }

    pub async fn launch_named<RootMod: Module>(
        addr: &str,
        title: &'static str,
        version: &'static str,
    ) -> std::io::Result<()> {
        let info = OpenApiInfo {
            title,
            version,
            ..Default::default()
        };
        Self::launch_with_info::<RootMod>(addr, info).await
    }

    pub async fn launch_with_info<RootMod: Module>(
        addr: &str,
        info: OpenApiInfo,
    ) -> std::io::Result<()> {
        Self::launch_with_plugins::<RootMod>(addr, info, Vec::new()).await
    }

    /// Full launch contract with plugins. See the module docstring for the
    /// strict phase ordering.
    pub async fn launch_with_plugins<RootMod: Module>(
        addr: &str,
        info: OpenApiInfo,
        mut plugins: Vec<Box<dyn ArclyPlugin>>,
    ) -> std::io::Result<()> {
        let _root: PhantomData<RootMod> = PhantomData;

        // ── 0. Walk the module DAG from RootMod ────────────────────
        let reachable_modules = collect_reachable_modules(RootMod::descriptor());
        let allowed_controllers: std::collections::HashSet<&'static str> = reachable_modules
            .iter()
            .flat_map(|m| m.controllers.iter().copied())
            .collect();

        // ── 1. Providers from reachable modules only ───────────────
        let mut b = DiContainerBuilder::new();
        for m in &reachable_modules {
            for p in m.providers {
                b.add_provider(p);
            }
        }

        // ── 2. Plugin on_init — they may queue providers + routes ──
        let mut plugin_ctx = ArclyPluginContext::new();
        for p in plugins.iter_mut() {
            if let Err(e) = p.on_init(&mut plugin_ctx).await {
                return Err(plugin_io_err(e));
            }
        }

        // ── 3. Apply queued provider closures ──────────────────────
        for f in plugin_ctx.pending_providers.drain(..) {
            f(&mut b);
        }

        // ── 4. Freeze the container — `&'static`, lock-free reads ──
        let container = b.freeze();

        // ── 5. OpenAPI: build (scoped to the module DAG), then mutate ──
        let mut spec_value = build_spec_filtered(&info, Some(&allowed_controllers));
        for mutator in plugin_ctx.openapi_mutators.drain(..) {
            mutator(&mut spec_value);
        }
        let spec: &'static serde_json::Value = Box::leak(Box::new(spec_value));

        // ── 6. Mount routes (filtered by reachable controller set) ─
        let mut router: axum::Router<&'static crate::core::engine::FrozenDiContainer> =
            axum::Router::new();
        for rt in inventory::iter::<&'static RouteDescriptor> {
            // Empty `controller` = free-fn route → always mount.
            // Non-empty = must belong to a controller in the reachable DAG.
            if !rt.controller.is_empty() && !allowed_controllers.contains(rt.controller) {
                continue;
            }
            router = router.route(rt.path, adapt(rt));
        }
        let mut app = router.with_state(container);
        for r in &plugin_ctx.extra_routes {
            app = app.route(r.path, build_plugin_route(container, r));
        }

        // ── 6b. Mount real-time gateways (filtered by reachable module DAG) ─
        // One process-wide connection registry, leaked to `&'static` and shared
        // by every gateway upgrade route — sharded, lock-free on the hot path.
        let allowed_gateways: std::collections::HashSet<&'static str> = reachable_modules
            .iter()
            .flat_map(|m| m.gateways.iter().copied())
            .collect();
        let registry: &'static ConnectionRegistry = Box::leak(Box::new(ConnectionRegistry::new()));
        for gd in inventory::iter::<&'static GatewayDescriptor> {
            if !allowed_gateways.contains(gd.name) {
                continue;
            }
            let runtime = (gd.build)(container);
            app = app.route(gd.path, ws_route(runtime, registry, container));
        }

        let app = app
            .route(
                "/openapi.json",
                get(move || async move { Json(spec.clone()) }),
            )
            .route(
                "/docs",
                get(|| async { Html(SWAGGER_UI_HTML).into_response() }),
            )
            .layer(axum::middleware::from_fn(
                crate::security::apply_security_headers,
            ));

        // ── 7. Bind ────────────────────────────────────────────────
        let listener = tokio::net::TcpListener::bind(addr).await?;

        // ── 8. Plugin on_start — bg tasks spawn here ───────────────
        //
        // If a plugin fails, roll back already-started plugins in reverse order
        // before propagating the error — prevents orphaned background tasks.
        // The rollback is wrapped in the same 5-second budget used by the
        // post-serve drain loop: a wedged on_shutdown must not hang the process.
        let plugins_arc: Arc<Vec<Box<dyn ArclyPlugin>>> = Arc::new(plugins);
        let rollback_budget = Duration::from_secs(5);
        let mut started = 0usize;
        #[allow(clippy::explicit_counter_loop)] // counter outlives the loop for the error message
        for p in plugins_arc.iter() {
            if let Err(e) = p.on_start(container).await {
                let already_started = Arc::clone(&plugins_arc);
                let rolled = tokio::time::timeout(rollback_budget, async move {
                    for already in already_started[..started].iter().rev() {
                        if let Err(se) = already.on_shutdown(container).await {
                            eprintln!("[arcly] plugin `{}` cleanup error: {}", already.name(), se);
                        }
                    }
                })
                .await;
                if rolled.is_err() {
                    eprintln!("[arcly] plugin rollback exceeded {rollback_budget:?} — forced");
                }
                return Err(plugin_io_err(e));
            }
            started += 1;
        }

        // ── 9. Serve with two-phase graceful shutdown ──────────────
        //
        // Phase A: SIGTERM/Ctrl-C → axum stops accepting + drains in-flight.
        // Phase B: AFTER axum has fully drained, run plugin `on_shutdown`s
        //          in reverse declaration order, wrapped in a 5s timeout.
        let serve = axum::serve(listener, app).with_graceful_shutdown(async {
            shutdown_signal().await;
            eprintln!("[arcly] shutdown signal received — HTTP draining");
        });
        let serve_res = serve.await;

        // HTTP server has now fully stopped. Safe to drain plugins.
        eprintln!("[arcly] HTTP fully drained — running plugin on_shutdown (5s budget)");
        let drain_budget = Duration::from_secs(5);
        let plugins_for_drain = Arc::clone(&plugins_arc);
        let drained = tokio::time::timeout(drain_budget, async move {
            for p in plugins_for_drain.iter().rev() {
                if let Err(e) = p.on_shutdown(container).await {
                    eprintln!("[arcly] plugin `{}` shutdown error: {}", p.name(), e);
                }
            }
        })
        .await;
        if drained.is_err() {
            eprintln!("[arcly] plugin drain exceeded {drain_budget:?} — forced exit");
        }
        serve_res
    }
}

/// Walk the module `imports` DAG breadth-first from the root, deduplicating
/// by descriptor pointer identity. Returns descriptors in a stable, root-first
/// traversal order.
fn collect_reachable_modules(root: &'static ModuleDescriptor) -> Vec<&'static ModuleDescriptor> {
    use std::collections::HashSet;
    let mut visited: HashSet<*const ModuleDescriptor> = HashSet::new();
    let mut queue: std::collections::VecDeque<&'static ModuleDescriptor> =
        std::collections::VecDeque::new();
    let mut order: Vec<&'static ModuleDescriptor> = Vec::new();
    queue.push_back(root);
    while let Some(m) = queue.pop_front() {
        if !visited.insert(m as *const _) {
            continue;
        }
        order.push(m);
        for getter in m.imports {
            queue.push_back(getter());
        }
    }
    order
}

/// Wait for either SIGINT (Ctrl-C) or SIGTERM (process supervisor / K8s / Docker).
///
/// On Windows only SIGINT is available; the cfg guard keeps the unix-specific
/// import out of non-unix builds.
#[cfg(unix)]
async fn shutdown_signal() {
    use tokio::signal::unix::{signal, SignalKind};
    match signal(SignalKind::terminate()) {
        Ok(mut sigterm) => {
            tokio::select! {
                _ = tokio::signal::ctrl_c() => {}
                _ = sigterm.recv() => {}
            }
        }
        Err(e) => {
            eprintln!("[arcly] SIGTERM handler unavailable ({e}), falling back to SIGINT only");
            let _ = tokio::signal::ctrl_c().await;
        }
    }
}

#[cfg(not(unix))]
async fn shutdown_signal() {
    let _ = tokio::signal::ctrl_c().await;
}

fn plugin_io_err(e: PluginError) -> std::io::Error {
    let kind = match e.stage {
        PluginStage::Init => std::io::ErrorKind::InvalidInput,
        PluginStage::Start => std::io::ErrorKind::ConnectionRefused,
        PluginStage::Shutdown => std::io::ErrorKind::Other,
    };
    std::io::Error::new(kind, e)
}