actus_server/request.rs
1//! [`Request`] — the incoming HTTP request as seen by middleware, and the
2//! body-limiting / parameter-extraction that turns it into a handler's
3//! [`Params`].
4
5use actus_controller::{Params, Verb};
6use actus_reply::WebError;
7use bytes::Bytes;
8use http::HeaderMap;
9use http_body_util::{BodyExt, LengthLimitError, Limited};
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::Semaphore;
14
15/// An incoming HTTP request, as seen by middleware and used to build the
16/// typed [`Params`] handed to a handler.
17#[derive(Clone, Debug)]
18pub struct Request {
19 /// The HTTP method (`GET`, `POST`, …).
20 pub method: http::Method,
21 /// The request path split on `/` into non-empty segments (no leading
22 /// empty segment); e.g. `/api/users` → `["api", "users"]`.
23 pub path_parts: Vec<String>,
24 /// Query parameters as a multimap (each name → all its values, in order).
25 /// `application/x-www-form-urlencoded` body fields are appended into the
26 /// same map by [`Request::to_params`].
27 pub query_params: HashMap<String, Vec<String>>,
28 /// The raw request body. Empty for bodyless requests.
29 pub body: Bytes,
30 /// The request headers.
31 pub headers: HeaderMap,
32 /// The rate-limit *class* of the controller this request matched, as
33 /// declared by `#[controller(rate_limit = "…")]` (see
34 /// [`actus_controller::Controller::actus_rate_limit`]). `None` until the
35 /// server has matched a controller (it's set right after routing, before
36 /// the middleware `before` chain runs) and for controllers that declared
37 /// no class.
38 ///
39 /// This is the routing-derived projection a rate-limit [`crate::Middleware`]
40 /// reads to apply per-class policy — the framework supplies the label and
41 /// the `429` plumbing ([`WebError::TooManyRequests`]); the limiter and its
42 /// store stay application code. Unlike the wire-derived fields above, it's
43 /// populated by the framework rather than parsed from the request.
44 pub rate_limit_class: Option<&'static str>,
45}
46
47/// Fold `(name, value)` pairs into a multimap, preserving order for repeated
48/// keys (`a=1&a=2` → `{"a": ["1", "2"]}`).
49fn collect_pairs(pairs: Vec<(String, String)>) -> HashMap<String, Vec<String>> {
50 let mut map: HashMap<String, Vec<String>> = HashMap::new();
51 for (name, value) in pairs {
52 map.entry(name).or_default().push(value);
53 }
54 map
55}
56
57/// Buffer a request body, refusing to hold more than `max_bytes` in memory:
58/// an over-limit body becomes `WebError::PayloadTooLarge` (→ 413) instead of
59/// growing unbounded. Any other read failure (a truncated/aborted body) is a
60/// `400`.
61///
62/// If `budget` is `Some`, the per-request cap is *also* reserved from a
63/// shared semaphore — when the server-wide budget is exhausted, the request
64/// is refused with `WebError::Busy` (→ 503) and a short `Retry-After`. The
65/// reservation is conservative (pre-reserves `max_bytes` even for requests
66/// whose body turns out to be smaller); the alternative — per-chunk byte
67/// accounting — is more code for the same effective ceiling.
68async fn collect_body_capped<B>(
69 body: B,
70 max_bytes: usize,
71 budget: Option<&Arc<Semaphore>>,
72) -> Result<Bytes, WebError>
73where
74 B: hyper::body::Body<Data = Bytes>,
75 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
76{
77 // Reserve from the global byte budget before touching the body. Tokio
78 // `Semaphore::acquire_many` takes a `u32`; we cap accordingly. The
79 // permit stays alive for the duration of this function — when we
80 // return (Ok or Err) it's released.
81 let _permit = match budget {
82 Some(s) => {
83 let n = u32::try_from(max_bytes).unwrap_or(u32::MAX);
84 match s.clone().try_acquire_many_owned(n) {
85 Ok(p) => Some(p),
86 Err(_) => return Err(WebError::Busy(Some(Duration::from_secs(1)))),
87 }
88 }
89 None => None,
90 };
91
92 match Limited::new(body, max_bytes).collect().await {
93 Ok(collected) => Ok(collected.to_bytes()),
94 Err(e) if e.downcast_ref::<LengthLimitError>().is_some() => Err(WebError::PayloadTooLarge),
95 Err(e) => Err(WebError::BadRequest(format!(
96 "could not read request body: {e}"
97 ))),
98 }
99}
100
101impl Request {
102 /// Build the `Request` skeleton from a hyper request *without*
103 /// consuming the body. The body stream is returned alongside; the
104 /// caller passes it back to [`Request::collect_body`] once it has
105 /// resolved the right body-size cap (which may depend on the matched
106 /// route — see `Server::handle_request_inner`).
107 ///
108 /// This is the cheap half of [`Request::from_hyper`]: the per-request
109 /// allocations are just the `path_parts` Vec and `query_params`
110 /// HashMap; no IO happens. Use it when you need to inspect the
111 /// request shape before deciding how (or whether) to read the body.
112 pub fn from_hyper_parts(
113 req: hyper::Request<hyper::body::Incoming>,
114 ) -> (Self, hyper::body::Incoming) {
115 let (parts, body) = req.into_parts();
116
117 let path_parts: Vec<String> = parts
118 .uri
119 .path()
120 .trim_matches('/')
121 .split('/')
122 .map(String::from)
123 .filter(|s| !s.is_empty())
124 .collect();
125
126 let query_params = parts
127 .uri
128 .query()
129 .map(|q| {
130 collect_pairs(
131 serde_urlencoded::from_str::<Vec<(String, String)>>(q).unwrap_or_default(),
132 )
133 })
134 .unwrap_or_default();
135
136 let skeleton = Self {
137 method: parts.method,
138 path_parts,
139 query_params,
140 // Filled by `collect_body`. Left empty on error so the
141 // skeleton-on-error contract holds.
142 body: Bytes::new(),
143 headers: parts.headers,
144 // Stamped by the server once the controller is matched (the
145 // skeleton predates routing, so it starts `None`).
146 rate_limit_class: None,
147 };
148 (skeleton, body)
149 }
150
151 /// Consume the body stream into this skeleton, capped at
152 /// `max_body_bytes` and (optionally) reserved against the
153 /// framework-wide inflight-bytes budget.
154 ///
155 /// Returns `Err((self, err))` on body failure (413, 400 truncated,
156 /// 503 budget exhausted) — `self` is the same skeleton (with `body`
157 /// still empty) the caller passed in, so the error response still
158 /// has the request headers etc. and flows through the after-chain
159 /// like every other reply.
160 pub async fn collect_body(
161 mut self,
162 body: hyper::body::Incoming,
163 max_body_bytes: usize,
164 inflight_budget: Option<&Arc<Semaphore>>,
165 ) -> Result<Self, (Self, WebError)> {
166 match collect_body_capped(body, max_body_bytes, inflight_budget).await {
167 Ok(body_bytes) => {
168 self.body = body_bytes;
169 Ok(self)
170 }
171 Err(e) => Err((self, e)),
172 }
173 }
174
175 /// Creates a new `Request` from a `hyper::Request`, buffering its body
176 /// (capped at `max_body_bytes` — see `Server::with_max_body_bytes`).
177 ///
178 /// Convenience wrapper around [`Request::from_hyper_parts`] +
179 /// [`Request::collect_body`]: useful for tests and for callers that
180 /// don't need to inspect the request shape before deciding the cap.
181 /// The server uses the two-step form directly so it can resolve the
182 /// cap from the matched controller (Phase 1) or route (Phase 2)
183 /// before reading the body.
184 ///
185 /// Returns `Err((skeleton, err))` when body collection fails (413
186 /// over the cap, 400 truncated, or 503 when the framework-level
187 /// `with_max_inflight_body_bytes` budget is exhausted). The returned
188 /// `Request` skeleton is populated with method / path / query /
189 /// headers — only `body` is empty — so the caller can route the
190 /// error through the normal reply pipeline.
191 pub async fn from_hyper(
192 req: hyper::Request<hyper::body::Incoming>,
193 max_body_bytes: usize,
194 inflight_budget: Option<&Arc<Semaphore>>,
195 ) -> Result<Self, (Self, WebError)> {
196 let (skeleton, body) = Self::from_hyper_parts(req);
197 skeleton
198 .collect_body(body, max_body_bytes, inflight_budget)
199 .await
200 }
201
202 /// Converts the request into a `Params` object for controller methods.
203 ///
204 /// Body handling is content-type discriminated. Each non-empty body
205 /// must declare its type via `Content-Type`; the four outcomes are:
206 ///
207 /// * `application/json` (or `application/...+json`) → parse JSON;
208 /// `body = Some(value)`, `raw_body = original bytes`. A parse
209 /// error becomes `WebError::BadRequest`.
210 /// * `application/x-www-form-urlencoded` → parse into fields and
211 /// *append* them to the query multimap (so a form field with the
212 /// same name as a query parameter accumulates rather than
213 /// clobbering); `body = None`, `raw_body = bytes`. A parse error
214 /// becomes `WebError::BadRequest`.
215 /// * any other content-type (`application/octet-stream`,
216 /// `application/zip`, …) → `body = None`, `raw_body = bytes`.
217 /// The handler reads `params.body_bytes()` directly.
218 /// * empty body → `body = None`, `raw_body = Bytes::new()`. The
219 /// content-type header is ignored when there's nothing to parse.
220 ///
221 /// A non-empty body with **no** `Content-Type` header is rejected:
222 /// "discipline tightening" per design — the framework refuses to
223 /// guess, and the handler never sees a body whose shape it can't
224 /// trust. (This is a behavior change from the previous
225 /// auto-JSON-sniff path; auto-sniff silently dropped binary
226 /// payloads on the floor.)
227 pub fn to_params(&self) -> Result<Params, WebError> {
228 let mut all_params = self.query_params.clone();
229
230 let content_type = self
231 .headers
232 .get(http::header::CONTENT_TYPE)
233 .and_then(|v| v.to_str().ok())
234 .map(|s| {
235 s.split(';')
236 .next()
237 .unwrap_or("")
238 .trim()
239 .to_ascii_lowercase()
240 });
241
242 let json_body = if self.body.is_empty() {
243 None
244 } else {
245 match content_type.as_deref() {
246 Some(ct) if is_json_content_type(ct) => {
247 Some(serde_json::from_slice(&self.body).map_err(|e| {
248 WebError::BadRequest(format!("body is not valid JSON: {e}"))
249 })?)
250 }
251 Some("application/x-www-form-urlencoded") => {
252 let form_pairs: Vec<(String, String)> =
253 serde_urlencoded::from_bytes(&self.body).map_err(|e| {
254 WebError::BadRequest(format!("body is not valid form-urlencoded: {e}"))
255 })?;
256 for (name, value) in form_pairs {
257 all_params.entry(name).or_default().push(value);
258 }
259 None
260 }
261 Some(_) => None,
262 None => {
263 return Err(WebError::BadRequest(
264 "non-empty request body requires a Content-Type header".into(),
265 ));
266 }
267 }
268 };
269
270 // Propagate headers as a lowercase-keyed multimap so controllers can
271 // do case-insensitive lookup *and* see every value when a header
272 // appears more than once (`Forwarded`, `Via`, etc. — common when a
273 // proxy chain prepends one entry per hop). Values that aren't valid
274 // UTF-8 are skipped. Receipt order within a name is preserved.
275 let mut headers: HashMap<String, Vec<String>> = HashMap::new();
276 for (name, value) in self.headers.iter() {
277 if let Ok(v) = value.to_str() {
278 headers
279 .entry(name.as_str().to_ascii_lowercase())
280 .or_default()
281 .push(v.to_string());
282 }
283 }
284
285 Ok(Params::new(
286 method_to_verb(&self.method),
287 all_params,
288 json_body,
289 self.body.clone(),
290 headers,
291 ))
292 }
293}
294
295/// Returns true for the JSON media types we parse: bare
296/// `application/json` and the `application/<subtype>+json`
297/// structured-suffix family (per RFC 6839 §3.1) so callers can use
298/// e.g. `application/vnd.example+json` without surprise.
299fn is_json_content_type(ct: &str) -> bool {
300 ct == "application/json" || ct.ends_with("+json")
301}
302
303fn method_to_verb(method: &http::Method) -> Verb {
304 match method {
305 m if m == http::Method::GET => Verb::GET,
306 m if m == http::Method::POST => Verb::POST,
307 m if m == http::Method::PUT => Verb::PUT,
308 m if m == http::Method::DELETE => Verb::DELETE,
309 m if m == http::Method::PATCH => Verb::PATCH,
310 m if m == http::Method::HEAD => Verb::HEAD,
311 m if m == http::Method::OPTIONS => Verb::OPTIONS,
312 _ => Verb::GET,
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319 use http::HeaderValue;
320
321 fn req(content_type: Option<&str>, body: &[u8]) -> Request {
322 let mut headers = HeaderMap::new();
323 if let Some(ct) = content_type {
324 headers.insert(
325 http::header::CONTENT_TYPE,
326 HeaderValue::from_str(ct).unwrap(),
327 );
328 }
329 Request {
330 method: http::Method::POST,
331 path_parts: vec!["whatever".into()],
332 query_params: HashMap::new(),
333 body: Bytes::copy_from_slice(body),
334 headers,
335 rate_limit_class: None,
336 }
337 }
338
339 #[test]
340 fn empty_body_no_content_type_is_ok() {
341 let params = req(None, b"").to_params().expect("empty + no CT is fine");
342 assert!(params.body_bytes().is_empty());
343 }
344
345 #[test]
346 fn json_body_is_parsed() {
347 let params = req(Some("application/json"), br#"{"x":1}"#)
348 .to_params()
349 .expect("valid JSON");
350 // Round-trip through the parsed view.
351 let v = params.json_body().expect("body present");
352 assert_eq!(v["x"], 1);
353 // Raw bytes are also preserved.
354 assert_eq!(params.body_bytes().as_ref(), br#"{"x":1}"#);
355 }
356
357 #[test]
358 fn json_body_with_charset_param_is_parsed() {
359 // `Content-Type: application/json; charset=utf-8` is canonical.
360 // The discriminator splits on `;` and trims, so the parameter is
361 // ignored as long as the media type is json.
362 let params = req(Some("application/json; charset=utf-8"), br#"{"x":1}"#)
363 .to_params()
364 .expect("valid JSON with charset");
365 assert_eq!(params.json_body().unwrap()["x"], 1);
366 }
367
368 #[test]
369 fn vendor_plus_json_subtype_is_parsed() {
370 // RFC 6839 structured suffix: `application/vnd.example+json` is
371 // semantically JSON. We support that family for compatibility
372 // with API conventions that use vendor media types.
373 let params = req(Some("application/vnd.example+json"), br#"{"x":1}"#)
374 .to_params()
375 .expect("valid +json");
376 assert_eq!(params.json_body().unwrap()["x"], 1);
377 }
378
379 #[test]
380 fn malformed_json_is_a_400() {
381 match req(Some("application/json"), b"not json").to_params() {
382 Err(WebError::BadRequest(msg)) => assert!(msg.contains("valid JSON"), "msg = {msg:?}"),
383 Err(other) => panic!("expected BadRequest, got {other:?}"),
384 Ok(_) => panic!("must reject malformed JSON"),
385 }
386 }
387
388 #[test]
389 fn form_body_merges_into_query() {
390 let params = req(
391 Some("application/x-www-form-urlencoded"),
392 b"username=alice&kind=admin",
393 )
394 .to_params()
395 .expect("valid form");
396 // Form fields are merged into the query map; `body` (parsed
397 // JSON view) is `None` because the body wasn't JSON.
398 assert!(params.json_body().is_err());
399 // Raw body still travels for handlers that want it.
400 assert_eq!(params.body_bytes().as_ref(), b"username=alice&kind=admin");
401 }
402
403 #[test]
404 fn binary_body_round_trips() {
405 let zip_bytes = b"PK\x03\x04\x00\x00fake-zip";
406 let params = req(Some("application/zip"), zip_bytes)
407 .to_params()
408 .expect("opaque CT is fine");
409 // Not parsed as JSON — the parsed view is empty.
410 assert!(params.json_body().is_err());
411 // Raw bytes are preserved verbatim for `params.body_bytes()`.
412 assert_eq!(params.body_bytes().as_ref(), zip_bytes);
413 }
414
415 #[test]
416 fn non_empty_body_without_content_type_is_a_400() {
417 match req(None, b"some payload").to_params() {
418 Err(WebError::BadRequest(msg)) => {
419 assert!(msg.contains("Content-Type"), "msg = {msg:?}")
420 }
421 Err(other) => panic!("expected BadRequest, got {other:?}"),
422 Ok(_) => panic!("must reject non-empty body without Content-Type"),
423 }
424 }
425
426 #[test]
427 fn collect_pairs_keeps_repeated_keys_in_order() {
428 let m = collect_pairs(vec![
429 ("tag".into(), "a".into()),
430 ("page".into(), "1".into()),
431 ("tag".into(), "b".into()),
432 ]);
433 assert_eq!(m.get("tag").unwrap(), &["a".to_string(), "b".to_string()]);
434 assert_eq!(m.get("page").unwrap(), &["1".to_string()]);
435 }
436
437 #[tokio::test]
438 async fn body_within_limit_round_trips() {
439 let body = http_body_util::Full::new(Bytes::from_static(b"hello"));
440 assert_eq!(
441 collect_body_capped(body, 1024, None).await.unwrap(),
442 Bytes::from_static(b"hello")
443 );
444 }
445
446 #[tokio::test]
447 async fn body_over_limit_is_413() {
448 let body = http_body_util::Full::new(Bytes::from(vec![0u8; 2048]));
449 match collect_body_capped(body, 1024, None).await {
450 Err(WebError::PayloadTooLarge) => {}
451 other => panic!("expected PayloadTooLarge, got {other:?}"),
452 }
453 }
454
455 #[tokio::test]
456 async fn body_refused_when_inflight_budget_exhausted() {
457 // Budget = 100 bytes total. First request reserves its per-request
458 // cap of 80 bytes → ok. Second request also wants 80 bytes → 503.
459 let budget = Arc::new(Semaphore::new(100));
460 // Reserve some so we know the budget is partially used. Hold the
461 // permit to simulate an in-flight body.
462 let _hold = budget.clone().try_acquire_many_owned(80).expect("acquire");
463 let body = http_body_util::Full::new(Bytes::from_static(b"hello"));
464 match collect_body_capped(body, 80, Some(&budget)).await {
465 Err(WebError::Busy(Some(d))) => assert_eq!(d, Duration::from_secs(1)),
466 other => panic!("expected Busy, got {other:?}"),
467 }
468 // After the held permit drops, a request fits again.
469 drop(_hold);
470 let body = http_body_util::Full::new(Bytes::from_static(b"hello"));
471 assert!(collect_body_capped(body, 80, Some(&budget)).await.is_ok());
472 }
473
474 #[test]
475 fn duplicate_request_headers_survive_into_params() {
476 // hyper's HeaderMap preserves duplicates; `to_params` must too. A
477 // proxy chain stamping `Forwarded` twice is the canonical example.
478 let mut headers = HeaderMap::new();
479 headers.append(
480 http::HeaderName::from_static("forwarded"),
481 HeaderValue::from_static("for=1.2.3.4"),
482 );
483 headers.append(
484 http::HeaderName::from_static("forwarded"),
485 HeaderValue::from_static("for=10.0.0.1"),
486 );
487 let request = Request {
488 method: http::Method::GET,
489 path_parts: vec![],
490 query_params: HashMap::new(),
491 body: Bytes::new(),
492 headers,
493 rate_limit_class: None,
494 };
495 let params = request.to_params().expect("ok");
496 assert_eq!(params.header("Forwarded"), Some("for=1.2.3.4"));
497 assert_eq!(
498 params.header_all("Forwarded"),
499 ["for=1.2.3.4", "for=10.0.0.1"]
500 );
501 }
502
503 #[test]
504 fn form_body_appends_to_query_multimap_instead_of_overwriting() {
505 let mut query = HashMap::new();
506 query.insert("tag".to_string(), vec!["from-query".to_string()]);
507 let request = Request {
508 method: http::Method::POST,
509 path_parts: vec!["whatever".into()],
510 query_params: query,
511 body: Bytes::from_static(b"tag=from-body¬e=hi"),
512 headers: {
513 let mut h = HeaderMap::new();
514 h.insert(
515 http::header::CONTENT_TYPE,
516 HeaderValue::from_static("application/x-www-form-urlencoded"),
517 );
518 h
519 },
520 rate_limit_class: None,
521 };
522 let params = request.to_params().expect("valid form");
523 // Query value survives; the form field with the same name is appended.
524 assert_eq!(params.get_all("tag").unwrap(), ["from-query", "from-body"]);
525 // First-wins for the scalar view.
526 assert_eq!(params.require("tag").unwrap(), "from-query");
527 assert_eq!(params.get_all("note").unwrap(), ["hi"]);
528 }
529}