Skip to main content

arcly_http/web/
boundary.rs

1//! Boundary adapter: converts an axum `Request` into the opaque
2//! `RequestContext` and dispatches to the macro-generated thunk.
3//!
4//! This is the *only* place in the framework where axum extraction primitives
5//! are touched outside of the launch path. `assemble_context` is the single
6//! request→context pipeline — plugin routes (`web::plugin_routes`) reuse it,
7//! so body limits, trace propagation, and credential extraction can never
8//! drift between entry points.
9
10use axum::body::Body;
11use axum::extract::{RawPathParams, Request, State};
12use axum::http::request::Parts;
13use axum::response::Response;
14use axum::routing::{on, MethodFilter, MethodRouter};
15use smallvec::SmallVec;
16use smol_str::SmolStr;
17
18use crate::core::engine::{FrozenDiContainer, RouteDescriptor, RouteSpec};
19use crate::observability::lean_telemetry::on_request_start;
20use crate::web::context::RequestContext;
21
22/// Maximum request body size — protects against memory exhaustion.
23/// Default 8 MiB; configured via `LaunchConfig::max_body_bytes` and stored
24/// in the frozen DI container, so the per-request cost is one lock-free
25/// frozen-map probe and concurrent apps (tests!) can't clobber each other.
26pub(crate) struct BodyLimit(pub usize);
27
28const DEFAULT_MAX_BODY: usize = 8 * 1024 * 1024;
29
30/// Pre-body request filter, registered by plugins via
31/// `ArclyPluginContext::register_boundary_filter`.
32///
33/// Runs on every request *before the body is buffered* — the cheap
34/// early-reject point. `Break(resp)` short-circuits the request without
35/// paying for body read, auth extraction, or context assembly. Filters must
36/// be synchronous and cheap (header checks, atomic counters); anything
37/// needing async or the `RequestContext` belongs in an `Interceptor`.
38pub trait BoundaryFilter: Send + Sync + 'static {
39    fn before_body(&'static self, parts: &Parts) -> std::ops::ControlFlow<Response>;
40}
41
42/// Run registered boundary filters in order; first `Break` wins.
43#[inline]
44pub(crate) fn run_boundary_filters(
45    filters: &'static [&'static dyn BoundaryFilter],
46    parts: &Parts,
47) -> Option<Response> {
48    for f in filters {
49        if let std::ops::ControlFlow::Break(resp) = f.before_body(parts) {
50            return Some(resp);
51        }
52    }
53    None
54}
55
56/// RAII guard that tracks the in-flight request count in the Prometheus gauge.
57/// Pairs with `lean_telemetry::RequestGuard` which tracks the same count in the
58/// raw atomic used by health endpoints.
59pub(crate) struct InFlightGuard;
60impl InFlightGuard {
61    #[inline]
62    pub(crate) fn new() -> Self {
63        metrics::gauge!("http_requests_in_flight").increment(1.0);
64        Self
65    }
66}
67impl Drop for InFlightGuard {
68    #[inline]
69    fn drop(&mut self) {
70        metrics::gauge!("http_requests_in_flight").decrement(1.0);
71    }
72}
73
74/// The single request→`RequestContext` pipeline:
75/// body read (capped) → W3C trace propagation → credential extraction
76/// (Bearer / cookie / session) → context construction.
77///
78/// Every HTTP entry point — macro routes here, plugin routes in
79/// `web::plugin_routes` — goes through this function.
80///
81/// Returns `Err(413)` when the body exceeds the cap. Truncating silently
82/// (the previous behaviour) handed handlers an *empty* body for oversized
83/// requests — a correctness hazard, not a defence.
84pub(crate) async fn assemble_context(
85    parts: Parts,
86    body: Body,
87    params: SmallVec<[(SmolStr, SmolStr); 4]>,
88    container: &'static FrozenDiContainer,
89    route_pattern: &'static str,
90    route_spec: Option<&'static RouteSpec>,
91) -> Result<RequestContext, Response> {
92    let cap = container
93        .try_get::<BodyLimit>()
94        .map(|b| b.0)
95        .unwrap_or(DEFAULT_MAX_BODY);
96    let bytes = match axum::body::to_bytes(body, cap).await {
97        Ok(b) => b,
98        Err(_) => {
99            metrics::counter!("http_requests_body_too_large_total").increment(1);
100            return Err(Response::builder()
101                .status(413)
102                .body(Body::from("request body exceeds the configured limit"))
103                .expect("static 413 response"));
104        }
105    };
106
107    // One shared extraction for trace + tenant + credentials — identical to
108    // what WebSocket handshakes and the consumer mesh run (`pipeline`).
109    let provenance = crate::pipeline::Provenance::from_headers(&parts.headers, container).await;
110
111    Ok(RequestContext::__new(
112        parts.method,
113        SmolStr::new(parts.uri.path()),
114        SmolStr::new(parts.uri.query().unwrap_or("")),
115        params,
116        parts.headers,
117        bytes,
118        provenance.trace.trace_id,
119        provenance.trace.span_id,
120        provenance.trace.parent_span_id,
121        container,
122        route_pattern,
123        route_spec,
124    )
125    .__with_claims(provenance.claims)
126    .__with_session(provenance.session)
127    .__with_tenant(provenance.tenant))
128}
129
130/// The ONE entry sequence every HTTP mount point runs — macro routes,
131/// plugin routes, and dynamic `/_plugins` routes all call this instead of
132/// keeping their own copy (three copies previously meant fixes like the
133/// 413 body-cap landed three times):
134///
135/// boundary filters → context assembly (body cap → provenance) →
136/// telemetry guards → interceptor chain.
137#[allow(clippy::too_many_arguments)] // the full entry contract, one call site per mount kind
138pub(crate) async fn run_entry(
139    parts: Parts,
140    body: Body,
141    params: SmallVec<[(SmolStr, SmolStr); 4]>,
142    container: &'static FrozenDiContainer,
143    route_pattern: &'static str,
144    route_spec: Option<&'static RouteSpec>,
145    filters: &'static [&'static dyn BoundaryFilter],
146    chain: &std::sync::Arc<
147        dyn Fn(RequestContext) -> futures::future::BoxFuture<'static, Response> + Send + Sync,
148    >,
149) -> Response {
150    if let Some(reject) = run_boundary_filters(filters, &parts) {
151        return reject;
152    }
153    let ctx =
154        match assemble_context(parts, body, params, container, route_pattern, route_spec).await {
155            Ok(ctx) => ctx,
156            Err(reject) => return reject,
157        };
158
159    let _telemetry = on_request_start();
160    let _in_flight = InFlightGuard::new();
161    chain(ctx).await
162}
163
164/// Wrap a macro-generated route descriptor in an axum `MethodRouter`.
165/// The HTTP method filter is baked in so the caller can drop the result
166/// straight into `Router::route`.
167/// Plugin-registered global interceptors compose as the outermost layers
168/// around the handler (which already carries any `#[UseInterceptors]` chain).
169pub fn adapt(
170    rt: &'static RouteDescriptor,
171    globals: &'static [&'static dyn crate::web::interceptors::Interceptor],
172    filters: &'static [&'static dyn BoundaryFilter],
173) -> MethodRouter<&'static FrozenDiContainer> {
174    let filter = MethodFilter::try_from(axum::http::Method::from(rt.method))
175        .expect("Arcly: unsupported HTTP method");
176
177    let chain = crate::web::interceptors::compose_chain(
178        globals,
179        std::sync::Arc::new(move |ctx| (rt.handler)(ctx)),
180    );
181
182    let handler = move |State(container): State<&'static FrozenDiContainer>,
183                        raw_params: RawPathParams,
184                        req: Request| {
185        let chain = chain.clone();
186        async move {
187            let params: SmallVec<[(SmolStr, SmolStr); 4]> = raw_params
188                .iter()
189                .map(|(k, v)| (SmolStr::new(k), SmolStr::new(v)))
190                .collect();
191
192            let (parts, body) = req.into_parts();
193            let resp: Response = run_entry(
194                parts,
195                body,
196                params,
197                container,
198                rt.path,
199                Some(rt.spec),
200                filters,
201                &chain,
202            )
203            .await;
204
205            let (mut p, b) = resp.into_parts();
206            // RFC 8594: announce deprecation + sunset date on versioned routes
207            // marked #[Deprecated(sunset = "…")].
208            if !rt.spec.sunset.is_empty() {
209                p.headers
210                    .insert("deprecation", axum::http::HeaderValue::from_static("true"));
211                if let Ok(v) = axum::http::HeaderValue::from_str(rt.spec.sunset) {
212                    p.headers.insert("sunset", v);
213                }
214            }
215            Response::from_parts(p, Body::new(b))
216        }
217    };
218
219    on(filter, handler)
220}