Skip to main content

actus_server/
request.rs

1//! [`Request`] — the incoming HTTP request as seen by middleware, and the
2//! body-limiting / parameter-extraction that turns it into a handler's
3//! [`Params`].
4
5use actus_controller::{Params, Verb};
6use actus_reply::WebError;
7use bytes::Bytes;
8use http::HeaderMap;
9use http_body_util::{BodyExt, LengthLimitError, Limited};
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::Semaphore;
14
15/// An incoming HTTP request, as seen by middleware and used to build the
16/// typed [`Params`] handed to a handler.
17#[derive(Clone, Debug)]
18pub struct Request {
19    /// The HTTP method (`GET`, `POST`, …).
20    pub method: http::Method,
21    /// The request path split on `/` into non-empty segments (no leading
22    /// empty segment); e.g. `/api/users` → `["api", "users"]`.
23    pub path_parts: Vec<String>,
24    /// Query parameters as a multimap (each name → all its values, in order).
25    /// `application/x-www-form-urlencoded` body fields are appended into the
26    /// same map by [`Request::to_params`].
27    pub query_params: HashMap<String, Vec<String>>,
28    /// The raw request body. Empty for bodyless requests.
29    pub body: Bytes,
30    /// The request headers.
31    pub headers: HeaderMap,
32    /// The rate-limit *class* of the controller this request matched, as
33    /// declared by `#[controller(rate_limit = "…")]` (see
34    /// [`actus_controller::Controller::actus_rate_limit`]). `None` until the
35    /// server has matched a controller (it's set right after routing, before
36    /// the middleware `before` chain runs) and for controllers that declared
37    /// no class.
38    ///
39    /// This is the routing-derived projection a rate-limit [`crate::Middleware`]
40    /// reads to apply per-class policy — the framework supplies the label and
41    /// the `429` plumbing ([`WebError::TooManyRequests`]); the limiter and its
42    /// store stay application code. Unlike the wire-derived fields above, it's
43    /// populated by the framework rather than parsed from the request.
44    pub rate_limit_class: Option<&'static str>,
45}
46
47/// Fold `(name, value)` pairs into a multimap, preserving order for repeated
48/// keys (`a=1&a=2` → `{"a": ["1", "2"]}`).
49fn collect_pairs(pairs: Vec<(String, String)>) -> HashMap<String, Vec<String>> {
50    let mut map: HashMap<String, Vec<String>> = HashMap::new();
51    for (name, value) in pairs {
52        map.entry(name).or_default().push(value);
53    }
54    map
55}
56
57/// Buffer a request body, refusing to hold more than `max_bytes` in memory:
58/// an over-limit body becomes `WebError::PayloadTooLarge` (→ 413) instead of
59/// growing unbounded. Any other read failure (a truncated/aborted body) is a
60/// `400`.
61///
62/// If `budget` is `Some`, the per-request cap is *also* reserved from a
63/// shared semaphore — when the server-wide budget is exhausted, the request
64/// is refused with `WebError::Busy` (→ 503) and a short `Retry-After`. The
65/// reservation is conservative (pre-reserves `max_bytes` even for requests
66/// whose body turns out to be smaller); the alternative — per-chunk byte
67/// accounting — is more code for the same effective ceiling.
68async fn collect_body_capped<B>(
69    body: B,
70    max_bytes: usize,
71    budget: Option<&Arc<Semaphore>>,
72) -> Result<Bytes, WebError>
73where
74    B: hyper::body::Body<Data = Bytes>,
75    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
76{
77    // Reserve from the global byte budget before touching the body. Tokio
78    // `Semaphore::acquire_many` takes a `u32`; we cap accordingly. The
79    // permit stays alive for the duration of this function — when we
80    // return (Ok or Err) it's released.
81    let _permit = match budget {
82        Some(s) => {
83            let n = u32::try_from(max_bytes).unwrap_or(u32::MAX);
84            match s.clone().try_acquire_many_owned(n) {
85                Ok(p) => Some(p),
86                Err(_) => return Err(WebError::Busy(Some(Duration::from_secs(1)))),
87            }
88        }
89        None => None,
90    };
91
92    match Limited::new(body, max_bytes).collect().await {
93        Ok(collected) => Ok(collected.to_bytes()),
94        Err(e) if e.downcast_ref::<LengthLimitError>().is_some() => Err(WebError::PayloadTooLarge),
95        Err(e) => Err(WebError::BadRequest(format!(
96            "could not read request body: {e}"
97        ))),
98    }
99}
100
101impl Request {
102    /// Build the `Request` skeleton from a hyper request *without*
103    /// consuming the body. The body stream is returned alongside; the
104    /// caller passes it back to [`Request::collect_body`] once it has
105    /// resolved the right body-size cap (which may depend on the matched
106    /// route — see `Server::handle_request_inner`).
107    ///
108    /// This is the cheap half of [`Request::from_hyper`]: the per-request
109    /// allocations are just the `path_parts` Vec and `query_params`
110    /// HashMap; no IO happens. Use it when you need to inspect the
111    /// request shape before deciding how (or whether) to read the body.
112    pub fn from_hyper_parts(
113        req: hyper::Request<hyper::body::Incoming>,
114    ) -> (Self, hyper::body::Incoming) {
115        let (parts, body) = req.into_parts();
116
117        let path_parts: Vec<String> = parts
118            .uri
119            .path()
120            .trim_matches('/')
121            .split('/')
122            .map(String::from)
123            .filter(|s| !s.is_empty())
124            .collect();
125
126        let query_params = parts
127            .uri
128            .query()
129            .map(|q| {
130                collect_pairs(
131                    serde_urlencoded::from_str::<Vec<(String, String)>>(q).unwrap_or_default(),
132                )
133            })
134            .unwrap_or_default();
135
136        let skeleton = Self {
137            method: parts.method,
138            path_parts,
139            query_params,
140            // Filled by `collect_body`. Left empty on error so the
141            // skeleton-on-error contract holds.
142            body: Bytes::new(),
143            headers: parts.headers,
144            // Stamped by the server once the controller is matched (the
145            // skeleton predates routing, so it starts `None`).
146            rate_limit_class: None,
147        };
148        (skeleton, body)
149    }
150
151    /// Consume the body stream into this skeleton, capped at
152    /// `max_body_bytes` and (optionally) reserved against the
153    /// framework-wide inflight-bytes budget.
154    ///
155    /// Returns `Err((self, err))` on body failure (413, 400 truncated,
156    /// 503 budget exhausted) — `self` is the same skeleton (with `body`
157    /// still empty) the caller passed in, so the error response still
158    /// has the request headers etc. and flows through the after-chain
159    /// like every other reply.
160    pub async fn collect_body(
161        mut self,
162        body: hyper::body::Incoming,
163        max_body_bytes: usize,
164        inflight_budget: Option<&Arc<Semaphore>>,
165    ) -> Result<Self, (Self, WebError)> {
166        match collect_body_capped(body, max_body_bytes, inflight_budget).await {
167            Ok(body_bytes) => {
168                self.body = body_bytes;
169                Ok(self)
170            }
171            Err(e) => Err((self, e)),
172        }
173    }
174
175    /// Creates a new `Request` from a `hyper::Request`, buffering its body
176    /// (capped at `max_body_bytes` — see `Server::with_max_body_bytes`).
177    ///
178    /// Convenience wrapper around [`Request::from_hyper_parts`] +
179    /// [`Request::collect_body`]: useful for tests and for callers that
180    /// don't need to inspect the request shape before deciding the cap.
181    /// The server uses the two-step form directly so it can resolve the
182    /// cap from the matched controller (Phase 1) or route (Phase 2)
183    /// before reading the body.
184    ///
185    /// Returns `Err((skeleton, err))` when body collection fails (413
186    /// over the cap, 400 truncated, or 503 when the framework-level
187    /// `with_max_inflight_body_bytes` budget is exhausted). The returned
188    /// `Request` skeleton is populated with method / path / query /
189    /// headers — only `body` is empty — so the caller can route the
190    /// error through the normal reply pipeline.
191    pub async fn from_hyper(
192        req: hyper::Request<hyper::body::Incoming>,
193        max_body_bytes: usize,
194        inflight_budget: Option<&Arc<Semaphore>>,
195    ) -> Result<Self, (Self, WebError)> {
196        let (skeleton, body) = Self::from_hyper_parts(req);
197        skeleton
198            .collect_body(body, max_body_bytes, inflight_budget)
199            .await
200    }
201
202    /// Converts the request into a `Params` object for controller methods.
203    ///
204    /// Body handling is content-type discriminated. Each non-empty body
205    /// must declare its type via `Content-Type`; the four outcomes are:
206    ///
207    /// * `application/json` (or `application/...+json`) → parse JSON;
208    ///   `body = Some(value)`, `raw_body = original bytes`. A parse
209    ///   error becomes `WebError::BadRequest`.
210    /// * `application/x-www-form-urlencoded` → parse into fields and
211    ///   *append* them to the query multimap (so a form field with the
212    ///   same name as a query parameter accumulates rather than
213    ///   clobbering); `body = None`, `raw_body = bytes`. A parse error
214    ///   becomes `WebError::BadRequest`.
215    /// * any other content-type (`application/octet-stream`,
216    ///   `application/zip`, …) → `body = None`, `raw_body = bytes`.
217    ///   The handler reads `params.body_bytes()` directly.
218    /// * empty body → `body = None`, `raw_body = Bytes::new()`. The
219    ///   content-type header is ignored when there's nothing to parse.
220    ///
221    /// A non-empty body with **no** `Content-Type` header is rejected:
222    /// "discipline tightening" per design — the framework refuses to
223    /// guess, and the handler never sees a body whose shape it can't
224    /// trust. (This is a behavior change from the previous
225    /// auto-JSON-sniff path; auto-sniff silently dropped binary
226    /// payloads on the floor.)
227    pub fn to_params(&self) -> Result<Params, WebError> {
228        let mut all_params = self.query_params.clone();
229
230        let content_type = self
231            .headers
232            .get(http::header::CONTENT_TYPE)
233            .and_then(|v| v.to_str().ok())
234            .map(|s| {
235                s.split(';')
236                    .next()
237                    .unwrap_or("")
238                    .trim()
239                    .to_ascii_lowercase()
240            });
241
242        let json_body = if self.body.is_empty() {
243            None
244        } else {
245            match content_type.as_deref() {
246                Some(ct) if is_json_content_type(ct) => {
247                    Some(serde_json::from_slice(&self.body).map_err(|e| {
248                        WebError::BadRequest(format!("body is not valid JSON: {e}"))
249                    })?)
250                }
251                Some("application/x-www-form-urlencoded") => {
252                    let form_pairs: Vec<(String, String)> =
253                        serde_urlencoded::from_bytes(&self.body).map_err(|e| {
254                            WebError::BadRequest(format!("body is not valid form-urlencoded: {e}"))
255                        })?;
256                    for (name, value) in form_pairs {
257                        all_params.entry(name).or_default().push(value);
258                    }
259                    None
260                }
261                Some(_) => None,
262                None => {
263                    return Err(WebError::BadRequest(
264                        "non-empty request body requires a Content-Type header".into(),
265                    ));
266                }
267            }
268        };
269
270        // Propagate headers as a lowercase-keyed multimap so controllers can
271        // do case-insensitive lookup *and* see every value when a header
272        // appears more than once (`Forwarded`, `Via`, etc. — common when a
273        // proxy chain prepends one entry per hop). Values that aren't valid
274        // UTF-8 are skipped. Receipt order within a name is preserved.
275        let mut headers: HashMap<String, Vec<String>> = HashMap::new();
276        for (name, value) in self.headers.iter() {
277            if let Ok(v) = value.to_str() {
278                headers
279                    .entry(name.as_str().to_ascii_lowercase())
280                    .or_default()
281                    .push(v.to_string());
282            }
283        }
284
285        Ok(Params::new(
286            method_to_verb(&self.method),
287            all_params,
288            json_body,
289            self.body.clone(),
290            headers,
291        ))
292    }
293}
294
295/// Returns true for the JSON media types we parse: bare
296/// `application/json` and the `application/<subtype>+json`
297/// structured-suffix family (per RFC 6839 §3.1) so callers can use
298/// e.g. `application/vnd.example+json` without surprise.
299fn is_json_content_type(ct: &str) -> bool {
300    ct == "application/json" || ct.ends_with("+json")
301}
302
303fn method_to_verb(method: &http::Method) -> Verb {
304    match method {
305        m if m == http::Method::GET => Verb::GET,
306        m if m == http::Method::POST => Verb::POST,
307        m if m == http::Method::PUT => Verb::PUT,
308        m if m == http::Method::DELETE => Verb::DELETE,
309        m if m == http::Method::PATCH => Verb::PATCH,
310        m if m == http::Method::HEAD => Verb::HEAD,
311        m if m == http::Method::OPTIONS => Verb::OPTIONS,
312        _ => Verb::GET,
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319    use http::HeaderValue;
320
321    fn req(content_type: Option<&str>, body: &[u8]) -> Request {
322        let mut headers = HeaderMap::new();
323        if let Some(ct) = content_type {
324            headers.insert(
325                http::header::CONTENT_TYPE,
326                HeaderValue::from_str(ct).unwrap(),
327            );
328        }
329        Request {
330            method: http::Method::POST,
331            path_parts: vec!["whatever".into()],
332            query_params: HashMap::new(),
333            body: Bytes::copy_from_slice(body),
334            headers,
335            rate_limit_class: None,
336        }
337    }
338
339    #[test]
340    fn empty_body_no_content_type_is_ok() {
341        let params = req(None, b"").to_params().expect("empty + no CT is fine");
342        assert!(params.body_bytes().is_empty());
343    }
344
345    #[test]
346    fn json_body_is_parsed() {
347        let params = req(Some("application/json"), br#"{"x":1}"#)
348            .to_params()
349            .expect("valid JSON");
350        // Round-trip through the parsed view.
351        let v = params.json_body().expect("body present");
352        assert_eq!(v["x"], 1);
353        // Raw bytes are also preserved.
354        assert_eq!(params.body_bytes().as_ref(), br#"{"x":1}"#);
355    }
356
357    #[test]
358    fn json_body_with_charset_param_is_parsed() {
359        // `Content-Type: application/json; charset=utf-8` is canonical.
360        // The discriminator splits on `;` and trims, so the parameter is
361        // ignored as long as the media type is json.
362        let params = req(Some("application/json; charset=utf-8"), br#"{"x":1}"#)
363            .to_params()
364            .expect("valid JSON with charset");
365        assert_eq!(params.json_body().unwrap()["x"], 1);
366    }
367
368    #[test]
369    fn vendor_plus_json_subtype_is_parsed() {
370        // RFC 6839 structured suffix: `application/vnd.example+json` is
371        // semantically JSON. We support that family for compatibility
372        // with API conventions that use vendor media types.
373        let params = req(Some("application/vnd.example+json"), br#"{"x":1}"#)
374            .to_params()
375            .expect("valid +json");
376        assert_eq!(params.json_body().unwrap()["x"], 1);
377    }
378
379    #[test]
380    fn malformed_json_is_a_400() {
381        match req(Some("application/json"), b"not json").to_params() {
382            Err(WebError::BadRequest(msg)) => assert!(msg.contains("valid JSON"), "msg = {msg:?}"),
383            Err(other) => panic!("expected BadRequest, got {other:?}"),
384            Ok(_) => panic!("must reject malformed JSON"),
385        }
386    }
387
388    #[test]
389    fn form_body_merges_into_query() {
390        let params = req(
391            Some("application/x-www-form-urlencoded"),
392            b"username=alice&kind=admin",
393        )
394        .to_params()
395        .expect("valid form");
396        // Form fields are merged into the query map; `body` (parsed
397        // JSON view) is `None` because the body wasn't JSON.
398        assert!(params.json_body().is_err());
399        // Raw body still travels for handlers that want it.
400        assert_eq!(params.body_bytes().as_ref(), b"username=alice&kind=admin");
401    }
402
403    #[test]
404    fn binary_body_round_trips() {
405        let zip_bytes = b"PK\x03\x04\x00\x00fake-zip";
406        let params = req(Some("application/zip"), zip_bytes)
407            .to_params()
408            .expect("opaque CT is fine");
409        // Not parsed as JSON — the parsed view is empty.
410        assert!(params.json_body().is_err());
411        // Raw bytes are preserved verbatim for `params.body_bytes()`.
412        assert_eq!(params.body_bytes().as_ref(), zip_bytes);
413    }
414
415    #[test]
416    fn non_empty_body_without_content_type_is_a_400() {
417        match req(None, b"some payload").to_params() {
418            Err(WebError::BadRequest(msg)) => {
419                assert!(msg.contains("Content-Type"), "msg = {msg:?}")
420            }
421            Err(other) => panic!("expected BadRequest, got {other:?}"),
422            Ok(_) => panic!("must reject non-empty body without Content-Type"),
423        }
424    }
425
426    #[test]
427    fn collect_pairs_keeps_repeated_keys_in_order() {
428        let m = collect_pairs(vec![
429            ("tag".into(), "a".into()),
430            ("page".into(), "1".into()),
431            ("tag".into(), "b".into()),
432        ]);
433        assert_eq!(m.get("tag").unwrap(), &["a".to_string(), "b".to_string()]);
434        assert_eq!(m.get("page").unwrap(), &["1".to_string()]);
435    }
436
437    #[tokio::test]
438    async fn body_within_limit_round_trips() {
439        let body = http_body_util::Full::new(Bytes::from_static(b"hello"));
440        assert_eq!(
441            collect_body_capped(body, 1024, None).await.unwrap(),
442            Bytes::from_static(b"hello")
443        );
444    }
445
446    #[tokio::test]
447    async fn body_over_limit_is_413() {
448        let body = http_body_util::Full::new(Bytes::from(vec![0u8; 2048]));
449        match collect_body_capped(body, 1024, None).await {
450            Err(WebError::PayloadTooLarge) => {}
451            other => panic!("expected PayloadTooLarge, got {other:?}"),
452        }
453    }
454
455    #[tokio::test]
456    async fn body_refused_when_inflight_budget_exhausted() {
457        // Budget = 100 bytes total. First request reserves its per-request
458        // cap of 80 bytes → ok. Second request also wants 80 bytes → 503.
459        let budget = Arc::new(Semaphore::new(100));
460        // Reserve some so we know the budget is partially used. Hold the
461        // permit to simulate an in-flight body.
462        let _hold = budget.clone().try_acquire_many_owned(80).expect("acquire");
463        let body = http_body_util::Full::new(Bytes::from_static(b"hello"));
464        match collect_body_capped(body, 80, Some(&budget)).await {
465            Err(WebError::Busy(Some(d))) => assert_eq!(d, Duration::from_secs(1)),
466            other => panic!("expected Busy, got {other:?}"),
467        }
468        // After the held permit drops, a request fits again.
469        drop(_hold);
470        let body = http_body_util::Full::new(Bytes::from_static(b"hello"));
471        assert!(collect_body_capped(body, 80, Some(&budget)).await.is_ok());
472    }
473
474    #[test]
475    fn duplicate_request_headers_survive_into_params() {
476        // hyper's HeaderMap preserves duplicates; `to_params` must too. A
477        // proxy chain stamping `Forwarded` twice is the canonical example.
478        let mut headers = HeaderMap::new();
479        headers.append(
480            http::HeaderName::from_static("forwarded"),
481            HeaderValue::from_static("for=1.2.3.4"),
482        );
483        headers.append(
484            http::HeaderName::from_static("forwarded"),
485            HeaderValue::from_static("for=10.0.0.1"),
486        );
487        let request = Request {
488            method: http::Method::GET,
489            path_parts: vec![],
490            query_params: HashMap::new(),
491            body: Bytes::new(),
492            headers,
493            rate_limit_class: None,
494        };
495        let params = request.to_params().expect("ok");
496        assert_eq!(params.header("Forwarded"), Some("for=1.2.3.4"));
497        assert_eq!(
498            params.header_all("Forwarded"),
499            ["for=1.2.3.4", "for=10.0.0.1"]
500        );
501    }
502
503    #[test]
504    fn form_body_appends_to_query_multimap_instead_of_overwriting() {
505        let mut query = HashMap::new();
506        query.insert("tag".to_string(), vec!["from-query".to_string()]);
507        let request = Request {
508            method: http::Method::POST,
509            path_parts: vec!["whatever".into()],
510            query_params: query,
511            body: Bytes::from_static(b"tag=from-body&note=hi"),
512            headers: {
513                let mut h = HeaderMap::new();
514                h.insert(
515                    http::header::CONTENT_TYPE,
516                    HeaderValue::from_static("application/x-www-form-urlencoded"),
517                );
518                h
519            },
520            rate_limit_class: None,
521        };
522        let params = request.to_params().expect("valid form");
523        // Query value survives; the form field with the same name is appended.
524        assert_eq!(params.get_all("tag").unwrap(), ["from-query", "from-body"]);
525        // First-wins for the scalar view.
526        assert_eq!(params.require("tag").unwrap(), "from-query");
527        assert_eq!(params.get_all("note").unwrap(), ["hi"]);
528    }
529}