use axum::body::Body;
use axum::extract::{RawPathParams, Request, State};
use axum::http::request::Parts;
use axum::response::Response;
use axum::routing::{on, MethodFilter, MethodRouter};
use smallvec::SmallVec;
use smol_str::SmolStr;
use crate::core::engine::{FrozenDiContainer, RouteDescriptor, RouteSpec};
use crate::observability::lean_telemetry::on_request_start;
use crate::web::context::RequestContext;
pub(crate) struct BodyLimit(pub usize);
const DEFAULT_MAX_BODY: usize = 8 * 1024 * 1024;
pub trait BoundaryFilter: Send + Sync + 'static {
fn before_body(&'static self, parts: &Parts) -> std::ops::ControlFlow<Response>;
}
#[inline]
pub(crate) fn run_boundary_filters(
filters: &'static [&'static dyn BoundaryFilter],
parts: &Parts,
) -> Option<Response> {
for f in filters {
if let std::ops::ControlFlow::Break(resp) = f.before_body(parts) {
return Some(resp);
}
}
None
}
pub(crate) struct InFlightGuard;
impl InFlightGuard {
#[inline]
pub(crate) fn new() -> Self {
metrics::gauge!("http_requests_in_flight").increment(1.0);
Self
}
}
impl Drop for InFlightGuard {
#[inline]
fn drop(&mut self) {
metrics::gauge!("http_requests_in_flight").decrement(1.0);
}
}
pub(crate) async fn assemble_context(
parts: Parts,
body: Body,
params: SmallVec<[(SmolStr, SmolStr); 4]>,
container: &'static FrozenDiContainer,
route_pattern: &'static str,
route_spec: Option<&'static RouteSpec>,
) -> Result<RequestContext, Response> {
let cap = container
.try_get::<BodyLimit>()
.map(|b| b.0)
.unwrap_or(DEFAULT_MAX_BODY);
let bytes = match axum::body::to_bytes(body, cap).await {
Ok(b) => b,
Err(_) => {
metrics::counter!("http_requests_body_too_large_total").increment(1);
return Err(Response::builder()
.status(413)
.body(Body::from("request body exceeds the configured limit"))
.expect("static 413 response"));
}
};
let provenance = crate::pipeline::Provenance::from_headers(&parts.headers, container).await;
Ok(RequestContext::__new(
parts.method,
SmolStr::new(parts.uri.path()),
SmolStr::new(parts.uri.query().unwrap_or("")),
params,
parts.headers,
bytes,
provenance.trace.trace_id,
provenance.trace.span_id,
provenance.trace.parent_span_id,
container,
route_pattern,
route_spec,
)
.__with_claims(provenance.claims)
.__with_session(provenance.session)
.__with_tenant(provenance.tenant))
}
#[allow(clippy::too_many_arguments)] pub(crate) async fn run_entry(
parts: Parts,
body: Body,
params: SmallVec<[(SmolStr, SmolStr); 4]>,
container: &'static FrozenDiContainer,
route_pattern: &'static str,
route_spec: Option<&'static RouteSpec>,
filters: &'static [&'static dyn BoundaryFilter],
chain: &std::sync::Arc<
dyn Fn(RequestContext) -> futures::future::BoxFuture<'static, Response> + Send + Sync,
>,
) -> Response {
if let Some(reject) = run_boundary_filters(filters, &parts) {
return reject;
}
let ctx =
match assemble_context(parts, body, params, container, route_pattern, route_spec).await {
Ok(ctx) => ctx,
Err(reject) => return reject,
};
let _telemetry = on_request_start();
let _in_flight = InFlightGuard::new();
chain(ctx).await
}
pub fn adapt(
rt: &'static RouteDescriptor,
globals: &'static [&'static dyn crate::web::interceptors::Interceptor],
filters: &'static [&'static dyn BoundaryFilter],
) -> MethodRouter<&'static FrozenDiContainer> {
let filter = MethodFilter::try_from(axum::http::Method::from(rt.method))
.expect("Arcly: unsupported HTTP method");
let chain = crate::web::interceptors::compose_chain(
globals,
std::sync::Arc::new(move |ctx| (rt.handler)(ctx)),
);
let handler = move |State(container): State<&'static FrozenDiContainer>,
raw_params: RawPathParams,
req: Request| {
let chain = chain.clone();
async move {
let params: SmallVec<[(SmolStr, SmolStr); 4]> = raw_params
.iter()
.map(|(k, v)| (SmolStr::new(k), SmolStr::new(v)))
.collect();
let (parts, body) = req.into_parts();
let resp: Response = run_entry(
parts,
body,
params,
container,
rt.path,
Some(rt.spec),
filters,
&chain,
)
.await;
let (mut p, b) = resp.into_parts();
if !rt.spec.sunset.is_empty() {
p.headers
.insert("deprecation", axum::http::HeaderValue::from_static("true"));
if let Ok(v) = axum::http::HeaderValue::from_str(rt.spec.sunset) {
p.headers.insert("sunset", v);
}
}
Response::from_parts(p, Body::new(b))
}
};
on(filter, handler)
}