arcly_http/web/
boundary.rs1use axum::body::Body;
11use axum::extract::{RawPathParams, Request, State};
12use axum::http::request::Parts;
13use axum::response::Response;
14use axum::routing::{on, MethodFilter, MethodRouter};
15use smallvec::SmallVec;
16use smol_str::SmolStr;
17
18use crate::core::engine::{FrozenDiContainer, RouteDescriptor, RouteSpec};
19use crate::observability::lean_telemetry::on_request_start;
20use crate::web::context::RequestContext;
21
22pub(crate) struct BodyLimit(pub usize);
27
28const DEFAULT_MAX_BODY: usize = 8 * 1024 * 1024;
29
30pub trait BoundaryFilter: Send + Sync + 'static {
39 fn before_body(&'static self, parts: &Parts) -> std::ops::ControlFlow<Response>;
40}
41
42#[inline]
44pub(crate) fn run_boundary_filters(
45 filters: &'static [&'static dyn BoundaryFilter],
46 parts: &Parts,
47) -> Option<Response> {
48 for f in filters {
49 if let std::ops::ControlFlow::Break(resp) = f.before_body(parts) {
50 return Some(resp);
51 }
52 }
53 None
54}
55
56pub(crate) struct InFlightGuard;
60impl InFlightGuard {
61 #[inline]
62 pub(crate) fn new() -> Self {
63 metrics::gauge!("http_requests_in_flight").increment(1.0);
64 Self
65 }
66}
67impl Drop for InFlightGuard {
68 #[inline]
69 fn drop(&mut self) {
70 metrics::gauge!("http_requests_in_flight").decrement(1.0);
71 }
72}
73
74pub(crate) async fn assemble_context(
85 parts: Parts,
86 body: Body,
87 params: SmallVec<[(SmolStr, SmolStr); 4]>,
88 container: &'static FrozenDiContainer,
89 route_pattern: &'static str,
90 route_spec: Option<&'static RouteSpec>,
91) -> Result<RequestContext, Response> {
92 let cap = container
93 .try_get::<BodyLimit>()
94 .map(|b| b.0)
95 .unwrap_or(DEFAULT_MAX_BODY);
96 let bytes = match axum::body::to_bytes(body, cap).await {
97 Ok(b) => b,
98 Err(_) => {
99 metrics::counter!("http_requests_body_too_large_total").increment(1);
100 return Err(Response::builder()
101 .status(413)
102 .body(Body::from("request body exceeds the configured limit"))
103 .expect("static 413 response"));
104 }
105 };
106
107 let provenance = crate::pipeline::Provenance::from_headers(&parts.headers, container).await;
110
111 Ok(RequestContext::__new(
112 parts.method,
113 SmolStr::new(parts.uri.path()),
114 SmolStr::new(parts.uri.query().unwrap_or("")),
115 params,
116 parts.headers,
117 bytes,
118 provenance.trace.trace_id,
119 provenance.trace.span_id,
120 provenance.trace.parent_span_id,
121 container,
122 route_pattern,
123 route_spec,
124 )
125 .__with_claims(provenance.claims)
126 .__with_session(provenance.session)
127 .__with_tenant(provenance.tenant))
128}
129
130#[allow(clippy::too_many_arguments)] pub(crate) async fn run_entry(
139 parts: Parts,
140 body: Body,
141 params: SmallVec<[(SmolStr, SmolStr); 4]>,
142 container: &'static FrozenDiContainer,
143 route_pattern: &'static str,
144 route_spec: Option<&'static RouteSpec>,
145 filters: &'static [&'static dyn BoundaryFilter],
146 chain: &std::sync::Arc<
147 dyn Fn(RequestContext) -> futures::future::BoxFuture<'static, Response> + Send + Sync,
148 >,
149) -> Response {
150 if let Some(reject) = run_boundary_filters(filters, &parts) {
151 return reject;
152 }
153 let ctx =
154 match assemble_context(parts, body, params, container, route_pattern, route_spec).await {
155 Ok(ctx) => ctx,
156 Err(reject) => return reject,
157 };
158
159 let _telemetry = on_request_start();
160 let _in_flight = InFlightGuard::new();
161 chain(ctx).await
162}
163
164pub fn adapt(
170 rt: &'static RouteDescriptor,
171 globals: &'static [&'static dyn crate::web::interceptors::Interceptor],
172 filters: &'static [&'static dyn BoundaryFilter],
173) -> MethodRouter<&'static FrozenDiContainer> {
174 let filter = MethodFilter::try_from(axum::http::Method::from(rt.method))
175 .expect("Arcly: unsupported HTTP method");
176
177 let chain = crate::web::interceptors::compose_chain(
178 globals,
179 std::sync::Arc::new(move |ctx| (rt.handler)(ctx)),
180 );
181
182 let handler = move |State(container): State<&'static FrozenDiContainer>,
183 raw_params: RawPathParams,
184 req: Request| {
185 let chain = chain.clone();
186 async move {
187 let params: SmallVec<[(SmolStr, SmolStr); 4]> = raw_params
188 .iter()
189 .map(|(k, v)| (SmolStr::new(k), SmolStr::new(v)))
190 .collect();
191
192 let (parts, body) = req.into_parts();
193 let resp: Response = run_entry(
194 parts,
195 body,
196 params,
197 container,
198 rt.path,
199 Some(rt.spec),
200 filters,
201 &chain,
202 )
203 .await;
204
205 let (mut p, b) = resp.into_parts();
206 if !rt.spec.sunset.is_empty() {
209 p.headers
210 .insert("deprecation", axum::http::HeaderValue::from_static("true"));
211 if let Ok(v) = axum::http::HeaderValue::from_str(rt.spec.sunset) {
212 p.headers.insert("sunset", v);
213 }
214 }
215 Response::from_parts(p, Body::new(b))
216 }
217 };
218
219 on(filter, handler)
220}