Skip to main content

arcly_http_core/web/
boundary.rs

1//! Boundary adapter: converts an axum `Request` into the opaque
2//! `RequestContext` and dispatches to the macro-generated thunk.
3//!
4//! This is the *only* place in the framework where axum extraction primitives
5//! are touched outside of the launch path. `assemble_context` is the single
6//! request→context pipeline — plugin routes (`web::plugin_routes`) reuse it,
7//! so body limits, trace propagation, and credential extraction can never
8//! drift between entry points.
9
10use axum::body::Body;
11use axum::extract::{RawPathParams, Request, State};
12use axum::http::request::Parts;
13use axum::response::Response;
14use axum::routing::{on, MethodFilter, MethodRouter};
15use smallvec::SmallVec;
16use smol_str::SmolStr;
17
18use crate::core::engine::{FrozenDiContainer, RouteDescriptor, RouteSpec};
19use crate::observability::lean_telemetry::on_request_start;
20use crate::web::context::RequestContext;
21
22/// Maximum request body size — protects against memory exhaustion.
23/// Default 8 MiB; configured via `LaunchConfig::max_body_bytes` and stored
24/// in the frozen DI container, so the per-request cost is one lock-free
25/// frozen-map probe and concurrent apps (tests!) can't clobber each other.
26#[doc(hidden)]
27pub struct BodyLimit(pub usize);
28
29const DEFAULT_MAX_BODY: usize = 8 * 1024 * 1024;
30
31/// Pre-body request filter, registered by plugins via
32/// `ArclyPluginContext::register_boundary_filter`.
33///
34/// Runs on every request *before the body is buffered* — the cheap
35/// early-reject point. `Break(resp)` short-circuits the request without
36/// paying for body read, auth extraction, or context assembly. Filters must
37/// be synchronous and cheap (header checks, atomic counters); anything
38/// needing async or the `RequestContext` belongs in an `Interceptor`.
39pub trait BoundaryFilter: Send + Sync + 'static {
40    /// `parts` is [`crate::http::RequestParts`] — implementors never need to
41    /// name `axum`.
42    fn before_body(
43        &'static self,
44        parts: &crate::http::RequestParts,
45    ) -> std::ops::ControlFlow<Response>;
46}
47
48/// Run registered boundary filters in order; first `Break` wins.
49#[inline]
50pub(crate) fn run_boundary_filters(
51    filters: &'static [&'static dyn BoundaryFilter],
52    parts: &Parts,
53) -> Option<Response> {
54    for f in filters {
55        if let std::ops::ControlFlow::Break(resp) = f.before_body(parts) {
56            return Some(resp);
57        }
58    }
59    None
60}
61
62/// RAII guard that tracks the in-flight request count in the Prometheus gauge.
63/// Pairs with `lean_telemetry::RequestGuard` which tracks the same count in the
64/// raw atomic used by health endpoints.
65pub(crate) struct InFlightGuard;
66impl InFlightGuard {
67    #[inline]
68    pub(crate) fn new() -> Self {
69        metrics::gauge!("http_requests_in_flight").increment(1.0);
70        Self
71    }
72}
73impl Drop for InFlightGuard {
74    #[inline]
75    fn drop(&mut self) {
76        metrics::gauge!("http_requests_in_flight").decrement(1.0);
77    }
78}
79
80/// The single request→`RequestContext` pipeline:
81/// body read (capped) → W3C trace propagation → credential extraction
82/// (Bearer / cookie / session) → context construction.
83///
84/// Every HTTP entry point — macro routes here, plugin routes in
85/// `web::plugin_routes` — goes through this function.
86///
87/// Returns `Err(413)` when the body exceeds the cap. Truncating silently
88/// (the previous behaviour) handed handlers an *empty* body for oversized
89/// requests — a correctness hazard, not a defence.
90#[doc(hidden)]
91pub async fn assemble_context(
92    parts: Parts,
93    body: Body,
94    params: SmallVec<[(SmolStr, SmolStr); 4]>,
95    container: std::sync::Arc<FrozenDiContainer>,
96    route_pattern: &'static str,
97    route_spec: Option<&'static RouteSpec>,
98) -> Result<RequestContext, Response> {
99    let cap = container
100        .try_get::<BodyLimit>()
101        .map(|b| b.0)
102        .unwrap_or(DEFAULT_MAX_BODY);
103    let bytes = match axum::body::to_bytes(body, cap).await {
104        Ok(b) => b,
105        Err(_) => {
106            metrics::counter!("http_requests_body_too_large_total").increment(1);
107            return Err(Response::builder()
108                .status(413)
109                .body(Body::from("request body exceeds the configured limit"))
110                .expect("static 413 response"));
111        }
112    };
113
114    // One shared extraction for trace + tenant + credentials — identical to
115    // what WebSocket handshakes and the consumer mesh run (`pipeline`).
116    let provenance = crate::pipeline::Provenance::from_headers(&parts.headers, &container).await;
117
118    // Peer IP: prefer a trusted `x-forwarded-for` (first hop) for proxied
119    // deployments, else the connection's peer address (present when the server
120    // was started with connection info via `into_make_service_with_connect_info`).
121    let client_ip = parts
122        .headers
123        .get("x-forwarded-for")
124        .and_then(|v| v.to_str().ok())
125        .and_then(|s| s.split(',').next())
126        .and_then(|s| s.trim().parse::<std::net::IpAddr>().ok())
127        .or_else(|| {
128            parts
129                .extensions
130                .get::<axum::extract::ConnectInfo<std::net::SocketAddr>>()
131                .map(|ci| ci.0.ip())
132        });
133
134    Ok(RequestContext::__new(
135        parts.method,
136        SmolStr::new(parts.uri.path()),
137        SmolStr::new(parts.uri.query().unwrap_or("")),
138        params,
139        parts.headers,
140        bytes,
141        provenance.trace.trace_id,
142        provenance.trace.span_id,
143        provenance.trace.parent_span_id,
144        container,
145        route_pattern,
146        route_spec,
147    )
148    .__with_claims(provenance.claims)
149    .__with_session(provenance.session)
150    .__with_tenant(provenance.tenant)
151    .__with_client_ip(client_ip))
152}
153
154/// The ONE entry sequence every HTTP mount point runs — macro routes,
155/// plugin routes, and dynamic `/_plugins` routes all call this instead of
156/// keeping their own copy (three copies previously meant fixes like the
157/// 413 body-cap landed three times):
158///
159/// boundary filters → context assembly (body cap → provenance) →
160/// telemetry guards → interceptor chain.
161#[allow(clippy::too_many_arguments)] // the full entry contract, one call site per mount kind
162pub(crate) async fn run_entry(
163    parts: Parts,
164    body: Body,
165    params: SmallVec<[(SmolStr, SmolStr); 4]>,
166    container: std::sync::Arc<FrozenDiContainer>,
167    route_pattern: &'static str,
168    route_spec: Option<&'static RouteSpec>,
169    filters: &'static [&'static dyn BoundaryFilter],
170    chain: &std::sync::Arc<
171        dyn Fn(RequestContext) -> futures::future::BoxFuture<'static, Response> + Send + Sync,
172    >,
173) -> Response {
174    if let Some(reject) = run_boundary_filters(filters, &parts) {
175        return reject;
176    }
177    let ctx =
178        match assemble_context(parts, body, params, container, route_pattern, route_spec).await {
179            Ok(ctx) => ctx,
180            Err(reject) => return reject,
181        };
182
183    let _telemetry = on_request_start();
184    let _in_flight = InFlightGuard::new();
185    chain(ctx).await
186}
187
188/// Wrap a macro-generated route descriptor in an axum `MethodRouter`.
189/// The HTTP method filter is baked in so the caller can drop the result
190/// straight into `Router::route`.
191/// Plugin-registered global interceptors compose as the outermost layers
192/// around the handler (which already carries any `#[UseInterceptors]` chain).
193pub fn adapt(
194    rt: &'static RouteDescriptor,
195    globals: &'static [&'static dyn crate::web::interceptors::Interceptor],
196    filters: &'static [&'static dyn BoundaryFilter],
197) -> MethodRouter<std::sync::Arc<FrozenDiContainer>> {
198    let filter = MethodFilter::try_from(axum::http::Method::from(rt.method))
199        .expect("Arcly: unsupported HTTP method");
200
201    let chain = crate::web::interceptors::compose_chain(
202        globals,
203        std::sync::Arc::new(move |ctx| (rt.handler)(ctx)),
204    );
205
206    let handler = move |State(container): State<std::sync::Arc<FrozenDiContainer>>,
207                        raw_params: RawPathParams,
208                        req: Request| {
209        let chain = chain.clone();
210        async move {
211            let params: SmallVec<[(SmolStr, SmolStr); 4]> = raw_params
212                .iter()
213                .map(|(k, v)| (SmolStr::new(k), SmolStr::new(v)))
214                .collect();
215
216            let (parts, body) = req.into_parts();
217            let resp: Response = run_entry(
218                parts,
219                body,
220                params,
221                container,
222                rt.path,
223                Some(rt.spec),
224                filters,
225                &chain,
226            )
227            .await;
228
229            let (mut p, b) = resp.into_parts();
230            // RFC 8594: announce deprecation + sunset date on versioned routes
231            // marked #[Deprecated(sunset = "…")].
232            if !rt.spec.sunset.is_empty() {
233                p.headers
234                    .insert("deprecation", axum::http::HeaderValue::from_static("true"));
235                if let Ok(v) = axum::http::HeaderValue::from_str(rt.spec.sunset) {
236                    p.headers.insert("sunset", v);
237                }
238            }
239            Response::from_parts(p, Body::new(b))
240        }
241    };
242
243    on(filter, handler)
244}