arcly-http 0.2.2

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! Boundary adapter: converts an axum `Request` into the opaque
//! `RequestContext` and dispatches to the macro-generated thunk.
//!
//! This is the *only* place in the framework where axum extraction primitives
//! are touched outside of the launch path. `assemble_context` is the single
//! request→context pipeline — plugin routes (`web::plugin_routes`) reuse it,
//! so body limits, trace propagation, and credential extraction can never
//! drift between entry points.

use axum::body::Body;
use axum::extract::{RawPathParams, Request, State};
use axum::http::request::Parts;
use axum::response::Response;
use axum::routing::{on, MethodFilter, MethodRouter};
use smallvec::SmallVec;
use smol_str::SmolStr;

use crate::core::engine::{FrozenDiContainer, RouteDescriptor, RouteSpec};
use crate::observability::lean_telemetry::on_request_start;
use crate::web::context::RequestContext;

/// Maximum request body size — protects against memory exhaustion.
/// Default 8 MiB; configured via `LaunchConfig::max_body_bytes` and stored
/// in the frozen DI container, so the per-request cost is one lock-free
/// frozen-map probe and concurrent apps (tests!) can't clobber each other.
pub(crate) struct BodyLimit(pub usize);

const DEFAULT_MAX_BODY: usize = 8 * 1024 * 1024;

/// Pre-body request filter, registered by plugins via
/// `ArclyPluginContext::register_boundary_filter`.
///
/// Runs on every request *before the body is buffered* — the cheap
/// early-reject point. `Break(resp)` short-circuits the request without
/// paying for body read, auth extraction, or context assembly. Filters must
/// be synchronous and cheap (header checks, atomic counters); anything
/// needing async or the `RequestContext` belongs in an `Interceptor`.
pub trait BoundaryFilter: Send + Sync + 'static {
    fn before_body(&'static self, parts: &Parts) -> std::ops::ControlFlow<Response>;
}

/// Run registered boundary filters in order; first `Break` wins.
#[inline]
pub(crate) fn run_boundary_filters(
    filters: &'static [&'static dyn BoundaryFilter],
    parts: &Parts,
) -> Option<Response> {
    for f in filters {
        if let std::ops::ControlFlow::Break(resp) = f.before_body(parts) {
            return Some(resp);
        }
    }
    None
}

/// RAII guard that tracks the in-flight request count in the Prometheus gauge.
/// Pairs with `lean_telemetry::RequestGuard` which tracks the same count in the
/// raw atomic used by health endpoints.
pub(crate) struct InFlightGuard;
impl InFlightGuard {
    #[inline]
    pub(crate) fn new() -> Self {
        metrics::gauge!("http_requests_in_flight").increment(1.0);
        Self
    }
}
impl Drop for InFlightGuard {
    #[inline]
    fn drop(&mut self) {
        metrics::gauge!("http_requests_in_flight").decrement(1.0);
    }
}

/// The single request→`RequestContext` pipeline:
/// body read (capped) → W3C trace propagation → credential extraction
/// (Bearer / cookie / session) → context construction.
///
/// Every HTTP entry point — macro routes here, plugin routes in
/// `web::plugin_routes` — goes through this function.
///
/// Returns `Err(413)` when the body exceeds the cap. Truncating silently
/// (the previous behaviour) handed handlers an *empty* body for oversized
/// requests — a correctness hazard, not a defence.
pub(crate) async fn assemble_context(
    parts: Parts,
    body: Body,
    params: SmallVec<[(SmolStr, SmolStr); 4]>,
    container: &'static FrozenDiContainer,
    route_pattern: &'static str,
    route_spec: Option<&'static RouteSpec>,
) -> Result<RequestContext, Response> {
    let cap = container
        .try_get::<BodyLimit>()
        .map(|b| b.0)
        .unwrap_or(DEFAULT_MAX_BODY);
    let bytes = match axum::body::to_bytes(body, cap).await {
        Ok(b) => b,
        Err(_) => {
            metrics::counter!("http_requests_body_too_large_total").increment(1);
            return Err(Response::builder()
                .status(413)
                .body(Body::from("request body exceeds the configured limit"))
                .expect("static 413 response"));
        }
    };

    // One shared extraction for trace + tenant + credentials — identical to
    // what WebSocket handshakes and the consumer mesh run (`pipeline`).
    let provenance = crate::pipeline::Provenance::from_headers(&parts.headers, container).await;

    Ok(RequestContext::__new(
        parts.method,
        SmolStr::new(parts.uri.path()),
        SmolStr::new(parts.uri.query().unwrap_or("")),
        params,
        parts.headers,
        bytes,
        provenance.trace.trace_id,
        provenance.trace.span_id,
        provenance.trace.parent_span_id,
        container,
        route_pattern,
        route_spec,
    )
    .__with_claims(provenance.claims)
    .__with_session(provenance.session)
    .__with_tenant(provenance.tenant))
}

/// The ONE entry sequence every HTTP mount point runs — macro routes,
/// plugin routes, and dynamic `/_plugins` routes all call this instead of
/// keeping their own copy (three copies previously meant fixes like the
/// 413 body-cap landed three times):
///
/// boundary filters → context assembly (body cap → provenance) →
/// telemetry guards → interceptor chain.
#[allow(clippy::too_many_arguments)] // the full entry contract, one call site per mount kind
pub(crate) async fn run_entry(
    parts: Parts,
    body: Body,
    params: SmallVec<[(SmolStr, SmolStr); 4]>,
    container: &'static FrozenDiContainer,
    route_pattern: &'static str,
    route_spec: Option<&'static RouteSpec>,
    filters: &'static [&'static dyn BoundaryFilter],
    chain: &std::sync::Arc<
        dyn Fn(RequestContext) -> futures::future::BoxFuture<'static, Response> + Send + Sync,
    >,
) -> Response {
    if let Some(reject) = run_boundary_filters(filters, &parts) {
        return reject;
    }
    let ctx =
        match assemble_context(parts, body, params, container, route_pattern, route_spec).await {
            Ok(ctx) => ctx,
            Err(reject) => return reject,
        };

    let _telemetry = on_request_start();
    let _in_flight = InFlightGuard::new();
    chain(ctx).await
}

/// Wrap a macro-generated route descriptor in an axum `MethodRouter`.
/// The HTTP method filter is baked in so the caller can drop the result
/// straight into `Router::route`.
/// Plugin-registered global interceptors compose as the outermost layers
/// around the handler (which already carries any `#[UseInterceptors]` chain).
pub fn adapt(
    rt: &'static RouteDescriptor,
    globals: &'static [&'static dyn crate::web::interceptors::Interceptor],
    filters: &'static [&'static dyn BoundaryFilter],
) -> MethodRouter<&'static FrozenDiContainer> {
    let filter = MethodFilter::try_from(axum::http::Method::from(rt.method))
        .expect("Arcly: unsupported HTTP method");

    let chain = crate::web::interceptors::compose_chain(
        globals,
        std::sync::Arc::new(move |ctx| (rt.handler)(ctx)),
    );

    let handler = move |State(container): State<&'static FrozenDiContainer>,
                        raw_params: RawPathParams,
                        req: Request| {
        let chain = chain.clone();
        async move {
            let params: SmallVec<[(SmolStr, SmolStr); 4]> = raw_params
                .iter()
                .map(|(k, v)| (SmolStr::new(k), SmolStr::new(v)))
                .collect();

            let (parts, body) = req.into_parts();
            let resp: Response = run_entry(
                parts,
                body,
                params,
                container,
                rt.path,
                Some(rt.spec),
                filters,
                &chain,
            )
            .await;

            let (mut p, b) = resp.into_parts();
            // RFC 8594: announce deprecation + sunset date on versioned routes
            // marked #[Deprecated(sunset = "…")].
            if !rt.spec.sunset.is_empty() {
                p.headers
                    .insert("deprecation", axum::http::HeaderValue::from_static("true"));
                if let Ok(v) = axum::http::HeaderValue::from_str(rt.spec.sunset) {
                    p.headers.insert("sunset", v);
                }
            }
            Response::from_parts(p, Body::new(b))
        }
    };

    on(filter, handler)
}