arcly_http_core/web/
boundary.rs1use axum::body::Body;
11use axum::extract::{RawPathParams, Request, State};
12use axum::http::request::Parts;
13use axum::response::Response;
14use axum::routing::{on, MethodFilter, MethodRouter};
15use smallvec::SmallVec;
16use smol_str::SmolStr;
17
18use crate::core::engine::{FrozenDiContainer, RouteDescriptor, RouteSpec};
19use crate::observability::lean_telemetry::on_request_start;
20use crate::web::context::RequestContext;
21
22#[doc(hidden)]
27pub struct BodyLimit(pub usize);
28
29const DEFAULT_MAX_BODY: usize = 8 * 1024 * 1024;
30
31pub trait BoundaryFilter: Send + Sync + 'static {
40 fn before_body(
43 &'static self,
44 parts: &crate::http::RequestParts,
45 ) -> std::ops::ControlFlow<Response>;
46}
47
48#[inline]
50pub(crate) fn run_boundary_filters(
51 filters: &'static [&'static dyn BoundaryFilter],
52 parts: &Parts,
53) -> Option<Response> {
54 for f in filters {
55 if let std::ops::ControlFlow::Break(resp) = f.before_body(parts) {
56 return Some(resp);
57 }
58 }
59 None
60}
61
62pub(crate) struct InFlightGuard;
66impl InFlightGuard {
67 #[inline]
68 pub(crate) fn new() -> Self {
69 metrics::gauge!("http_requests_in_flight").increment(1.0);
70 Self
71 }
72}
73impl Drop for InFlightGuard {
74 #[inline]
75 fn drop(&mut self) {
76 metrics::gauge!("http_requests_in_flight").decrement(1.0);
77 }
78}
79
80#[doc(hidden)]
91pub async fn assemble_context(
92 parts: Parts,
93 body: Body,
94 params: SmallVec<[(SmolStr, SmolStr); 4]>,
95 container: std::sync::Arc<FrozenDiContainer>,
96 route_pattern: &'static str,
97 route_spec: Option<&'static RouteSpec>,
98) -> Result<RequestContext, Response> {
99 let cap = container
100 .try_get::<BodyLimit>()
101 .map(|b| b.0)
102 .unwrap_or(DEFAULT_MAX_BODY);
103 let bytes = match axum::body::to_bytes(body, cap).await {
104 Ok(b) => b,
105 Err(_) => {
106 metrics::counter!("http_requests_body_too_large_total").increment(1);
107 return Err(Response::builder()
108 .status(413)
109 .body(Body::from("request body exceeds the configured limit"))
110 .expect("static 413 response"));
111 }
112 };
113
114 let provenance = crate::pipeline::Provenance::from_headers(&parts.headers, &container).await;
117
118 let client_ip = parts
122 .headers
123 .get("x-forwarded-for")
124 .and_then(|v| v.to_str().ok())
125 .and_then(|s| s.split(',').next())
126 .and_then(|s| s.trim().parse::<std::net::IpAddr>().ok())
127 .or_else(|| {
128 parts
129 .extensions
130 .get::<axum::extract::ConnectInfo<std::net::SocketAddr>>()
131 .map(|ci| ci.0.ip())
132 });
133
134 Ok(RequestContext::__new(
135 parts.method,
136 SmolStr::new(parts.uri.path()),
137 SmolStr::new(parts.uri.query().unwrap_or("")),
138 params,
139 parts.headers,
140 bytes,
141 provenance.trace.trace_id,
142 provenance.trace.span_id,
143 provenance.trace.parent_span_id,
144 container,
145 route_pattern,
146 route_spec,
147 )
148 .__with_claims(provenance.claims)
149 .__with_session(provenance.session)
150 .__with_tenant(provenance.tenant)
151 .__with_client_ip(client_ip))
152}
153
154#[allow(clippy::too_many_arguments)] pub(crate) async fn run_entry(
163 parts: Parts,
164 body: Body,
165 params: SmallVec<[(SmolStr, SmolStr); 4]>,
166 container: std::sync::Arc<FrozenDiContainer>,
167 route_pattern: &'static str,
168 route_spec: Option<&'static RouteSpec>,
169 filters: &'static [&'static dyn BoundaryFilter],
170 chain: &std::sync::Arc<
171 dyn Fn(RequestContext) -> futures::future::BoxFuture<'static, Response> + Send + Sync,
172 >,
173) -> Response {
174 if let Some(reject) = run_boundary_filters(filters, &parts) {
175 return reject;
176 }
177 let ctx =
178 match assemble_context(parts, body, params, container, route_pattern, route_spec).await {
179 Ok(ctx) => ctx,
180 Err(reject) => return reject,
181 };
182
183 let _telemetry = on_request_start();
184 let _in_flight = InFlightGuard::new();
185 chain(ctx).await
186}
187
188pub fn adapt(
194 rt: &'static RouteDescriptor,
195 globals: &'static [&'static dyn crate::web::interceptors::Interceptor],
196 filters: &'static [&'static dyn BoundaryFilter],
197) -> MethodRouter<std::sync::Arc<FrozenDiContainer>> {
198 let filter = MethodFilter::try_from(axum::http::Method::from(rt.method))
199 .expect("Arcly: unsupported HTTP method");
200
201 let chain = crate::web::interceptors::compose_chain(
202 globals,
203 std::sync::Arc::new(move |ctx| (rt.handler)(ctx)),
204 );
205
206 let handler = move |State(container): State<std::sync::Arc<FrozenDiContainer>>,
207 raw_params: RawPathParams,
208 req: Request| {
209 let chain = chain.clone();
210 async move {
211 let params: SmallVec<[(SmolStr, SmolStr); 4]> = raw_params
212 .iter()
213 .map(|(k, v)| (SmolStr::new(k), SmolStr::new(v)))
214 .collect();
215
216 let (parts, body) = req.into_parts();
217 let resp: Response = run_entry(
218 parts,
219 body,
220 params,
221 container,
222 rt.path,
223 Some(rt.spec),
224 filters,
225 &chain,
226 )
227 .await;
228
229 let (mut p, b) = resp.into_parts();
230 if !rt.spec.sunset.is_empty() {
233 p.headers
234 .insert("deprecation", axum::http::HeaderValue::from_static("true"));
235 if let Ok(v) = axum::http::HeaderValue::from_str(rt.spec.sunset) {
236 p.headers.insert("sunset", v);
237 }
238 }
239 Response::from_parts(p, Body::new(b))
240 }
241 };
242
243 on(filter, handler)
244}