Skip to main content

actus_server/
server.rs

1//! [`Server`] — the hyper-based HTTP server that owns the request lifecycle:
2//! routing, body limiting, the middleware chain, CORS, compression, and the
3//! HTTP-correctness stamping (`Allow`, `Vary`), with graceful shutdown.
4
5#[cfg(feature = "compression")]
6use crate::compression::CompressionLayer;
7use crate::cors::CorsLayer;
8use crate::error::ServerError;
9use crate::middleware::{Middleware, MiddlewareChain, Outcome};
10use crate::request::Request;
11use crate::router::Router;
12#[cfg(feature = "websocket")]
13use crate::websocket;
14#[cfg(feature = "websocket")]
15use actus_reply::ProblemDetails;
16use actus_reply::{Finalizer, ReplyData, WebError};
17use bytes::Bytes;
18#[cfg(feature = "websocket")]
19use http::{HeaderValue, StatusCode, header};
20use http_body_util::combinators::BoxBody;
21use hyper::body::Incoming;
22use hyper::service::service_fn;
23use hyper::{Request as HyperRequest, Response as HyperResponse};
24use std::future::Future;
25use std::net::SocketAddr;
26use std::sync::Arc;
27use std::time::Duration;
28use tokio::net::TcpListener;
29use tokio::sync::Semaphore;
30use tokio::task::JoinSet;
31use tracing::{Instrument, Level, error, info, span, warn};
32
33type ResponseBody = BoxBody<Bytes, WebError>;
34
35/// One kibibyte (1024 bytes). For readable byte-size limits, e.g.
36/// `#[controller(max_body_bytes = 4 * KIB)]`.
37pub const KIB: usize = 1024;
38/// One mebibyte (1024 × 1024 bytes), e.g. `Server::with_max_body_bytes(2 * MIB)`.
39pub const MIB: usize = 1024 * KIB;
40/// One gibibyte (1024 × 1024 × 1024 bytes).
41pub const GIB: usize = 1024 * MIB;
42
43/// Default cap on the request body Actus will buffer: **2 MiB** — a safe
44/// ceiling for the common case (JSON APIs, forms). Endpoints that accept larger
45/// bodies (uploads) opt in via [`Server::with_max_body_bytes`] or a
46/// per-controller `#[controller(max_body_bytes = …)]`. Matches axum's default.
47pub const DEFAULT_MAX_BODY_BYTES: usize = 2 * MIB;
48
49/// Default grace period for in-flight connections to finish after a
50/// shutdown signal: 30 seconds. Override with [`Server::with_drain_deadline`].
51pub const DEFAULT_DRAIN_DEADLINE: Duration = Duration::from_secs(30);
52
53/// The main Actus server.
54pub struct Server {
55    router: Arc<Router>,
56    middleware_chain: Arc<MiddlewareChain>,
57    finalizer: Arc<Finalizer>,
58    max_body_bytes: usize,
59    cors: Option<Arc<CorsLayer>>,
60    #[cfg(feature = "compression")]
61    compression: Option<Arc<CompressionLayer>>,
62    /// `Some(d)` caps each request's total time (parse → middleware →
63    /// handler → after-chain → finalize) at `d`; an over-budget request is
64    /// aborted and replied with `504 Gateway Timeout`. `None` disables the
65    /// per-request timer (the default).
66    request_timeout: Option<Duration>,
67    /// Grace period for in-flight connections to drain after shutdown.
68    drain_deadline: Duration,
69    /// Cap on concurrent connection tasks. `Some(n)` installs an
70    /// `Arc<Semaphore>` of `n` permits in the accept loop; while at
71    /// capacity, the loop pauses on permit acquisition and new SYNs queue
72    /// in the kernel's accept backlog (`SOMAXCONN`), at which point the
73    /// kernel drops them. `None` is unbounded.
74    max_connections: Option<usize>,
75    /// Cap on the total bytes being buffered across all in-flight body
76    /// reads. `Some(n)` installs a byte-permit semaphore; each
77    /// `collect_body_capped` reserves its per-request cap upfront and
78    /// releases the permits when the body is buffered or rejected. Refuses
79    /// excess requests with `503 Service Unavailable` (via `WebError::Busy`).
80    /// `None` is unbounded.
81    max_inflight_body_bytes: Option<Arc<Semaphore>>,
82    /// `Some(d)` is forwarded to hyper's `http1::Builder::header_read_timeout`
83    /// — bounds how long after starting to read request headers we'll wait
84    /// before dropping the connection. Catches slowloris and clients that
85    /// TCP-connect-and-send-nothing. `None` leaves hyper's default (none).
86    header_read_timeout: Option<Duration>,
87}
88
89impl Server {
90    /// Create a server for `router` with default settings: no middleware, no
91    /// CORS, the default body cap, and no DoS limits. Configure it with the
92    /// `with_*` builder methods, then call [`run`](Self::run).
93    pub fn new(router: Router) -> Self {
94        Self {
95            router: Arc::new(router),
96            middleware_chain: Arc::new(MiddlewareChain::new()),
97            finalizer: Arc::new(Finalizer::new()),
98            max_body_bytes: DEFAULT_MAX_BODY_BYTES,
99            cors: None,
100            #[cfg(feature = "compression")]
101            compression: None,
102            request_timeout: None,
103            drain_deadline: DEFAULT_DRAIN_DEADLINE,
104            max_connections: None,
105            max_inflight_body_bytes: None,
106            header_read_timeout: None,
107        }
108    }
109
110    /// Adds a middleware to the server's request processing chain.
111    pub fn with_middleware(mut self, middleware: impl Middleware + 'static) -> Self {
112        let mut chain = Arc::try_unwrap(self.middleware_chain).unwrap_or_else(|arc| (*arc).clone());
113        chain.add(middleware);
114        self.middleware_chain = Arc::new(chain);
115        self
116    }
117
118    /// Enables CORS with the given policy. The server then answers preflight
119    /// (`OPTIONS`) requests itself and adds the `Access-Control-*` headers to
120    /// every cross-origin response (including error responses). See
121    /// [`CorsLayer`].
122    pub fn with_cors(mut self, cors: CorsLayer) -> Self {
123        self.cors = Some(Arc::new(cors));
124        self
125    }
126
127    /// Enables response compression (gzip / brotli). For each response Actus
128    /// picks an encoding from the request's `Accept-Encoding` and compresses
129    /// buffered, compressible bodies above the layer's size threshold. See
130    /// [`CompressionLayer`]. *(Requires the `compression` feature.)*
131    #[cfg(feature = "compression")]
132    pub fn with_compression(mut self, layer: CompressionLayer) -> Self {
133        self.compression = Some(Arc::new(layer));
134        self
135    }
136
137    /// Caps the request body Actus will buffer (default
138    /// [`DEFAULT_MAX_BODY_BYTES`] = 2 MiB). A larger body is rejected with
139    /// `413 Payload Too Large` before it can exhaust memory — the limit
140    /// bounds buffered bytes, so it also covers chunked bodies that lie about
141    /// (or omit) `Content-Length`.
142    ///
143    /// `0` is accepted and means "reject every non-empty body" — typically
144    /// only useful on a strictly-GET surface that should never see a body.
145    pub fn with_max_body_bytes(mut self, max: usize) -> Self {
146        self.max_body_bytes = max;
147        self
148    }
149
150    /// Cap the total time any single request may take — body parse,
151    /// middleware `before`, handler, middleware `after`, and finalization
152    /// combined. An over-budget request is aborted (the handler's future
153    /// is dropped) and the client gets `504 Gateway Timeout`. No timeout
154    /// is set by default.
155    ///
156    /// **Scope.** The timer covers the request/response exchange. A
157    /// WebSocket upgrade succeeds inside the timer (the `101` is the
158    /// response); the post-upgrade conversation runs in its own task and
159    /// is not bound by this timeout.
160    ///
161    /// **Effect of an over-budget request.** When the timer elapses the
162    /// in-flight future is dropped, which cancels whatever the handler
163    /// was awaiting (DB query, channel recv, etc.). The 504 reply is
164    /// one-shot — the after-chain doesn't run on it (by definition,
165    /// some component upstream was unresponsive; running more risks
166    /// hanging again).
167    pub fn with_request_timeout(mut self, d: Duration) -> Self {
168        self.request_timeout = Some(d);
169        self
170    }
171
172    /// Override the grace period for in-flight connections after a
173    /// shutdown signal (default [`DEFAULT_DRAIN_DEADLINE`] = 30 s).
174    /// Anything still running at the deadline is hard-aborted via
175    /// `JoinSet::shutdown`. Use a longer value for surfaces that hold
176    /// long-lived connections (large file downloads, WebSockets);
177    /// a shorter value for fast-iteration dev workflows. `Duration::ZERO`
178    /// aborts every in-flight task immediately.
179    pub fn with_drain_deadline(mut self, d: Duration) -> Self {
180        self.drain_deadline = d;
181        self
182    }
183
184    /// Cap concurrent connection tasks at `n`. While the cap is held, the
185    /// accept loop pauses on permit acquisition; new SYNs queue in the
186    /// kernel's accept backlog and (once that fills, governed by
187    /// `SOMAXCONN`) get dropped at the OS level. No userland reject /
188    /// no-503-per-conn cost — the kernel handles the spillover.
189    ///
190    /// Each connection task holds its permit until it ends, including the
191    /// post-handshake WebSocket conversation. Size accordingly: a
192    /// `with_max_connections(N)` server can hold `N` long-lived WebSockets
193    /// *before* it stops accepting new connections of any kind.
194    ///
195    /// Unbounded by default (no semaphore installed).
196    pub fn with_max_connections(mut self, n: usize) -> Self {
197        self.max_connections = Some(n);
198        self
199    }
200
201    /// Cap the total bytes being buffered across all in-flight body reads
202    /// at `n`. Each request reserves its per-request cap (see
203    /// `with_max_body_bytes`) from this global budget upfront; if the
204    /// budget is exhausted, the request is refused with `503 Service
205    /// Unavailable` (via [`WebError::Busy`]) and a short `Retry-After`.
206    ///
207    /// Together with [`Self::with_max_connections`] this puts a hard
208    /// ceiling on the framework's memory under adversarial load:
209    /// `with_max_connections(C) * with_max_body_bytes(B)` is the *worst*
210    /// case absent this knob; with it, the ceiling is `min(C * B, this
211    /// value)`.
212    ///
213    /// Pre-reserving the per-request cap over-counts (a 1 KB request
214    /// reserves up to its full cap); the alternative — incremental
215    /// per-chunk byte accounting — is more code for the same effective
216    /// ceiling, and a request that has already started buffering can't
217    /// be sensibly aborted partway through anyway.
218    ///
219    /// `n` is clamped to `u32::MAX` internally (Tokio's `Semaphore`
220    /// permit count uses `u32`); for practical deployments this is no
221    /// limit (4 GiB).
222    ///
223    /// Unbounded by default.
224    pub fn with_max_inflight_body_bytes(mut self, n: usize) -> Self {
225        // u32 cap is a tokio Semaphore constraint, not a design choice.
226        let n_capped = n.min(u32::MAX as usize);
227        self.max_inflight_body_bytes = Some(Arc::new(Semaphore::new(n_capped)));
228        self
229    }
230
231    /// Bound how long after starting to read request headers we'll wait
232    /// before dropping the connection. Forwards to hyper's
233    /// `http1::Builder::header_read_timeout`. Catches slowloris (sending
234    /// headers one byte at a time) and clients that TCP-connect-and-send-
235    /// nothing — the most common file-descriptor-exhaustion attack on a
236    /// keep-alive HTTP server.
237    ///
238    /// Note: hyper 1.x doesn't have a separate "idle between requests"
239    /// timeout (after a complete request, an idle keep-alive connection
240    /// stays open until either side closes or the OS-level TCP keep-alive
241    /// fires). If that matters for your deployment, either disable
242    /// keep-alive entirely upstream of Actus or rely on the OS knobs.
243    ///
244    /// No timeout by default (hyper's default).
245    pub fn with_header_read_timeout(mut self, d: Duration) -> Self {
246        self.header_read_timeout = Some(d);
247        self
248    }
249
250    /// Runs the server on `127.0.0.1:port` (loopback only). For a different
251    /// bind address — e.g. `0.0.0.0:port` to accept connections from other
252    /// hosts in a container — use [`Server::run_on`].
253    ///
254    /// Listens for SIGTERM/SIGINT (Unix) or Ctrl-C (cross-platform) and
255    /// shuts down gracefully: stops accepting new connections, signals
256    /// in-flight connections to finish, and waits up to 30 seconds for
257    /// them to drain before returning.
258    pub async fn run(self, port: u16) -> Result<(), ServerError> {
259        self.run_on(SocketAddr::from(([127, 0, 0, 1], port))).await
260    }
261
262    /// Like [`Server::run`] but binds an arbitrary address. Pass
263    /// `0.0.0.0:port` (or `[::]:port`) to accept connections from other hosts.
264    pub async fn run_on(self, addr: SocketAddr) -> Result<(), ServerError> {
265        self.run_with_shutdown_on(addr, default_shutdown_signal())
266            .await
267    }
268
269    /// Like [`Server::run`] but with a custom shutdown trigger (a future that,
270    /// when it resolves, starts the graceful drain). Binds `127.0.0.1:port`;
271    /// see [`Server::run_with_shutdown_on`] for a custom bind address. Useful
272    /// for tests or for embedding the server in a larger supervision tree.
273    pub async fn run_with_shutdown(
274        self,
275        port: u16,
276        shutdown: impl Future<Output = ()> + Send + 'static,
277    ) -> Result<(), ServerError> {
278        self.run_with_shutdown_on(SocketAddr::from(([127, 0, 0, 1], port)), shutdown)
279            .await
280    }
281
282    /// The general form: bind `addr`, serve until `shutdown` resolves, then
283    /// drain. [`Server::run`], [`Server::run_on`], and
284    /// [`Server::run_with_shutdown`] are thin wrappers over this.
285    ///
286    /// **Drain bound.** Once `shutdown` resolves the server stops accepting
287    /// and signals every in-flight connection to wind down. The drain
288    /// deadline defaults to [`DEFAULT_DRAIN_DEADLINE`] (30 s); override
289    /// with [`Server::with_drain_deadline`]. Anything still running at
290    /// the deadline is hard-aborted via `JoinSet::shutdown`. In particular,
291    /// long-lived connections (WebSockets, slow downloads, kept-alive idle
292    /// clients) and any connection task that raced the shutdown notification
293    /// and missed it both get aborted at the deadline rather than draining
294    /// gracefully.
295    pub async fn run_with_shutdown_on(
296        self,
297        addr: SocketAddr,
298        shutdown: impl Future<Output = ()> + Send + 'static,
299    ) -> Result<(), ServerError> {
300        let listener = TcpListener::bind(addr).await?;
301        let addr = listener.local_addr().unwrap_or(addr);
302        info!("Server listening on http://{}", addr);
303
304        let app = Arc::new(self);
305        // Per-connection cancellation: once `Notify::notify_waiters` fires,
306        // every in-flight task wakes up and asks hyper to gracefully close
307        // its connection (finishing the current response, then exiting).
308        let notify = Arc::new(tokio::sync::Notify::new());
309        let mut tasks: JoinSet<()> = JoinSet::new();
310
311        // Optional cap on concurrent connections. When at-capacity the
312        // accept loop pauses on permit acquisition; new SYNs queue in the
313        // kernel accept backlog (SOMAXCONN) and get dropped at the OS
314        // level once that fills. Each spawned connection task moves its
315        // permit in; the permit releases when the task exits.
316        let conn_permits = app.max_connections.map(|n| Arc::new(Semaphore::new(n)));
317
318        tokio::pin!(shutdown);
319
320        loop {
321            tokio::select! {
322                // Accept-branch: acquire a connection permit first (when
323                // a cap is configured), then accept. The outer select
324                // races this against shutdown so a paused-at-capacity
325                // accept loop still notices the shutdown signal.
326                accept_with_permit = async {
327                    let permit = match &conn_permits {
328                        Some(s) => Some(s.clone().acquire_owned().await.expect("semaphore never closed")),
329                        None => None,
330                    };
331                    let result = listener.accept().await;
332                    (result, permit)
333                } => {
334                    let (accept_result, permit) = accept_with_permit;
335                    let (stream, _peer) = match accept_result {
336                        Ok(s) => s,
337                        Err(e) => {
338                            error!("accept error: {}", e);
339                            // permit released when this branch ends — fine
340                            continue;
341                        }
342                    };
343                    let io = hyper_util::rt::TokioIo::new(stream);
344                    let app = app.clone();
345                    let notify = notify.clone();
346                    let header_timeout = app.header_read_timeout;
347                    tasks.spawn(async move {
348                        // The permit (if any) lives for the connection's
349                        // lifetime; releasing happens at task drop.
350                        let _permit = permit;
351
352                        let mut builder = hyper::server::conn::http1::Builder::new();
353                        if let Some(d) = header_timeout {
354                            builder.header_read_timeout(d);
355                        }
356                        let conn = builder.serve_connection(
357                            io,
358                            service_fn(move |req| app.clone().handle_request(req)),
359                        );
360                        // With the `websocket` feature, allow `101 Switching
361                        // Protocols` responses to hand off the connection.
362                        #[cfg(feature = "websocket")]
363                        let conn = conn.with_upgrades();
364                        tokio::pin!(conn);
365                        tokio::select! {
366                            res = conn.as_mut() => {
367                                if let Err(err) = res {
368                                    error!("Error serving connection: {}", err);
369                                }
370                            }
371                            _ = notify.notified() => {
372                                conn.as_mut().graceful_shutdown();
373                                if let Err(err) = conn.await {
374                                    error!("Error during graceful shutdown: {}", err);
375                                }
376                            }
377                        }
378                    });
379                }
380                // Reap finished connection tasks so the `JoinSet` doesn't grow
381                // without bound over the server's lifetime — and so a panicked
382                // connection task is logged promptly, not only at shutdown.
383                joined = tasks.join_next(), if !tasks.is_empty() => {
384                    match joined {
385                        Some(Err(e)) if e.is_panic() => error!("Connection task panicked: {}", e),
386                        Some(Err(e)) => error!("Connection task failed: {}", e),
387                        Some(Ok(())) | None => {}
388                    }
389                }
390                _ = &mut shutdown => {
391                    info!("Shutdown signal received; draining in-flight requests");
392                    break;
393                }
394            }
395        }
396
397        // Stop accepting; signal connections to wind down.
398        drop(listener);
399        notify.notify_waiters();
400
401        // Drain. The grace period is configurable via
402        // `Server::with_drain_deadline` (default 30 s).
403        let drain_deadline = tokio::time::sleep(app.drain_deadline);
404        tokio::pin!(drain_deadline);
405        loop {
406            tokio::select! {
407                next = tasks.join_next() => {
408                    match next {
409                        Some(Ok(())) => {}
410                        Some(Err(e)) if e.is_panic() => {
411                            error!("Connection task panicked: {}", e);
412                        }
413                        Some(Err(e)) => {
414                            error!("Connection task failed: {}", e);
415                        }
416                        None => break,
417                    }
418                }
419                _ = &mut drain_deadline => {
420                    warn!("Drain deadline exceeded; aborting {} connection(s)", tasks.len());
421                    tasks.shutdown().await;
422                    break;
423                }
424            }
425        }
426
427        info!("Server shutdown complete");
428        Ok(())
429    }
430
431    /// Stamp the configured CORS response headers onto `response` (no-op when
432    /// CORS isn't enabled, or the request had no allowed `Origin`). Applied to
433    /// *every* outgoing response — success and error alike — so the browser
434    /// can read 4xx/5xx bodies.
435    fn with_cors_headers(
436        &self,
437        request: &Request,
438        mut response: HyperResponse<ResponseBody>,
439    ) -> HyperResponse<ResponseBody> {
440        if let Some(cors) = &self.cors {
441            cors.apply(&request.headers, response.headers_mut(), false);
442        }
443        response
444    }
445
446    /// Fulfil a `ReplyData::Upgrade` from a handler when the request was
447    /// genuinely a WebSocket handshake: send `101 Switching Protocols` and
448    /// spawn the handler on the upgraded connection. (The "handler returned
449    /// Upgrade but the request wasn't a handshake" case is rewritten to a
450    /// 426 reply in [`Self::finalize_reply`] before reaching this method, so
451    /// it can flow through the after-chain like any other error.)
452    ///
453    /// No CORS headers on the `101`: WebSocket handshakes are scoped by
454    /// browser origin checks (the handler inspects `Origin` itself before
455    /// calling `ws::upgrade`), not by the CORS protocol — `Access-Control-*`
456    /// on a `101` is meaningless to the browser.
457    #[cfg(feature = "websocket")]
458    async fn complete_ws_upgrade(
459        &self,
460        handler: Box<dyn std::any::Any + Send>,
461        ws_upgrade: (hyper::upgrade::OnUpgrade, HeaderValue),
462    ) -> HyperResponse<ResponseBody> {
463        // `ReplyData::Upgrade` is only constructible via `ws::upgrade(...)`,
464        // which always boxes an `UpgradeTask`. A failing downcast would mean
465        // a crate-internal invariant is broken; surface as a panic rather
466        // than silently producing a 500.
467        let task = handler
468            .downcast::<websocket::UpgradeTask>()
469            .expect("ReplyData::Upgrade always carries an UpgradeTask");
470        let (on_upgrade, accept) = ws_upgrade;
471        tokio::spawn(websocket::run_upgrade(on_upgrade, *task));
472        let mut resp = self.finalizer.build_response(ReplyData::Empty).await;
473        *resp.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
474        let h = resp.headers_mut();
475        h.insert(header::CONNECTION, HeaderValue::from_static("upgrade"));
476        h.insert(header::UPGRADE, HeaderValue::from_static("websocket"));
477        h.insert(header::SEC_WEBSOCKET_ACCEPT, accept);
478        resp
479    }
480
481    /// Build the error reply for `error` and route it through
482    /// [`finalize_reply`](Self::finalize_reply), so the after-chain,
483    /// compression, and CORS apply to errors exactly as they do to handler
484    /// successes. This is the canonical way to produce a `WebError`
485    /// response anywhere a `Request` exists.
486    async fn finalize_error(
487        &self,
488        error: WebError,
489        request: &Request,
490        #[cfg(feature = "websocket")] ws_upgrade: Option<(hyper::upgrade::OnUpgrade, HeaderValue)>,
491    ) -> HyperResponse<ResponseBody> {
492        let data = self.finalizer.error_to_reply(error);
493        self.finalize_reply(
494            data,
495            request,
496            #[cfg(feature = "websocket")]
497            ws_upgrade,
498        )
499        .await
500    }
501
502    /// Run the after-middleware chain, then turn the reply into a `Response`
503    /// via [`dispatch_reply`](Self::dispatch_reply).
504    ///
505    /// **After-chain runs on every reply with a body and a `Request`.** That
506    /// includes handler successes, `Outcome::Respond` short-circuits, *and*
507    /// every error the application produced (404 / 405 / 401 / 400 / a
508    /// handler-returned `Err(WebError)`, etc.). The README's promise that a
509    /// request-id stamper "still fires on a short-circuit" generalizes to
510    /// every reply — that's the contract this method enforces.
511    ///
512    /// **Exceptions** (the after-chain *doesn't* run):
513    /// - **101 Switching Protocols** — a WebSocket-handshake success has no
514    ///   HTTP body to decorate, and the upgrade machinery consumes the
515    ///   connection.
516    /// - **Pre-parse failures** — a request that fails before
517    ///   [`Request::from_hyper`] returns a skeleton (e.g. malformed HTTP
518    ///   from hyper itself) has no `Request` to give the hook. The body-cap
519    ///   413 and truncated-body 400 are *not* exceptions here: `from_hyper`
520    ///   now returns a skeleton `Request` on those, so they do run through
521    ///   the after-chain.
522    /// - **CORS preflight 204** — synthesized before middleware or routing;
523    ///   not an application request (see [`Self::handle_request`]).
524    async fn finalize_reply(
525        &self,
526        #[allow(unused_mut)] mut data: ReplyData,
527        request: &Request,
528        #[cfg(feature = "websocket")] ws_upgrade: Option<(hyper::upgrade::OnUpgrade, HeaderValue)>,
529    ) -> HyperResponse<ResponseBody> {
530        // If the handler returned `ws::upgrade(...)` but the request isn't a
531        // real WebSocket handshake, rewrite to a 426 error reply *here* so
532        // it flows through the same after-chain / compression / CORS path as
533        // any other error. Only the success-handshake path (Upgrade reply +
534        // ws_upgrade present) keeps the after-chain bypass — a 101 has no
535        // HTTP body to decorate.
536        #[cfg(feature = "websocket")]
537        if matches!(data, ReplyData::Upgrade(_)) && ws_upgrade.is_none() {
538            data = self.finalizer.error_to_reply(WebError::Problem(
539                ProblemDetails::new(StatusCode::UPGRADE_REQUIRED, "WebSocket Upgrade Required")
540                    .detail("this endpoint expects a WebSocket handshake"),
541            ));
542        }
543
544        let needs_after_chain = !matches!(data, ReplyData::Upgrade(_));
545        if needs_after_chain
546            && let Err(e) = self
547                .middleware_chain
548                .process_response(request, &mut data)
549                .await
550        {
551            // After-chain itself errored. Build a plain error response
552            // (no further after-chain — recursion prevention) so a buggy
553            // hook can't infinite-loop the request.
554            return self.with_cors_headers(request, self.finalizer.build_error(e).await);
555        }
556        self.dispatch_reply(
557            data,
558            request,
559            #[cfg(feature = "websocket")]
560            ws_upgrade,
561        )
562        .await
563    }
564
565    /// Turn a handler's (or a short-circuiting middleware's) `ReplyData` into a
566    /// fully processed response: WebSocket upgrade if it's an `Upgrade` reply;
567    /// otherwise compress (if enabled), finalize, and stamp CORS / `Vary`.
568    async fn dispatch_reply(
569        &self,
570        #[allow(unused_mut)] mut data: ReplyData,
571        request: &Request,
572        #[cfg(feature = "websocket")] ws_upgrade: Option<(hyper::upgrade::OnUpgrade, HeaderValue)>,
573    ) -> HyperResponse<ResponseBody> {
574        // A handler that returned `ws::upgrade(...)`: complete the handshake
575        // instead of finalizing a body. `finalize_reply` only lets us reach
576        // here for an `Upgrade` reply when the request *was* a real
577        // handshake (otherwise it rewrote the reply to a 426 error), so
578        // `ws_upgrade` is guaranteed `Some` on this branch.
579        #[cfg(feature = "websocket")]
580        if matches!(data, ReplyData::Upgrade(_)) {
581            let ReplyData::Upgrade(handler) = data else {
582                unreachable!()
583            };
584            let ws_upgrade =
585                ws_upgrade.expect("finalize_reply rewrites Upgrade-without-handshake to 426");
586            return self.complete_ws_upgrade(handler, ws_upgrade).await;
587        }
588        // Compression is the last transform — after response middleware,
589        // before the bytes leave. (Only buffered, compressible bodies above
590        // the threshold are touched.)
591        #[cfg(feature = "compression")]
592        if let Some(c) = &self.compression {
593            data = c.compress_reply(
594                data,
595                request
596                    .headers
597                    .get("accept-encoding")
598                    .and_then(|v| v.to_str().ok()),
599            );
600        }
601        let response = self.finalizer.build_response(data).await;
602        let response = self.with_cors_headers(request, response);
603        #[cfg(feature = "compression")]
604        let response = crate::compression::tag_vary_if_encoded(response);
605        response
606    }
607
608    /// Handles an individual incoming `hyper::Request`.
609    ///
610    /// Wraps [`handle_request_inner`](Self::handle_request_inner) in a
611    /// per-request timeout when one is configured (see
612    /// [`Server::with_request_timeout`]); a timed-out request gets a
613    /// one-shot `504 Gateway Timeout` (no after-chain, since by definition
614    /// something upstream was unresponsive).
615    ///
616    /// Every reply with a `Request` flows through
617    /// [`finalize_reply`](Self::finalize_reply) — handler successes,
618    /// `Outcome::Respond` short-circuits, and *every* error (middleware
619    /// `Err`, body parse failure, 404 / 405 from the router, handler-returned
620    /// `Err`, even the 413 / 400 from the body-cap path). The after-chain,
621    /// compression, and CORS apply uniformly. CORS preflight is the one
622    /// short-circuit that bypasses the pipeline — it's HTTP-protocol traffic
623    /// rather than an application request.
624    async fn handle_request(
625        self: Arc<Self>,
626        req: HyperRequest<Incoming>,
627    ) -> Result<HyperResponse<ResponseBody>, hyper::Error> {
628        let timeout = self.request_timeout;
629        let app = self.clone();
630        let inner = app.handle_request_inner(req);
631        match timeout {
632            None => inner.await,
633            Some(d) => match tokio::time::timeout(d, inner).await {
634                Ok(r) => r,
635                Err(_) => {
636                    warn!(timeout = ?d, "request exceeded configured timeout");
637                    Ok(self.finalizer.build_error(WebError::Timeout).await)
638                }
639            },
640        }
641    }
642
643    /// The actual request pipeline. Split from
644    /// [`handle_request`](Self::handle_request) so the latter can wrap it
645    /// in a timeout when one is configured.
646    ///
647    /// **Lifecycle order:**
648    ///
649    /// 1. capture WS upgrade (if request looks like a handshake)
650    /// 2. build the `Request` skeleton (no body yet)
651    /// 3. CORS preflight short-circuit (uses headers only)
652    /// 4. match controller — 404 short-circuits *without* buffering the
653    ///    body (efficiency win on adversarial bad-path requests); then stamp
654    ///    the matched controller's rate-limit class onto the request, so a
655    ///    `before` middleware (which only gets `&Request`) can read it
656    /// 5. buffer the body, capped per the resolved policy (today: server-
657    ///    wide; soon, per-controller / per-route)
658    /// 6. middleware `before`
659    /// 7. `to_params` (Content-Type-driven body parse)
660    /// 8. dispatch via the already-matched controller
661    /// 9. middleware `after` + finalize
662    ///
663    /// The "route before buffer" order is what lets the body cap depend on
664    /// the matched route — without it, the framework would have to commit
665    /// to a single cap before knowing where the request is headed.
666    async fn handle_request_inner(
667        self: Arc<Self>,
668        #[allow(unused_mut)] mut req: HyperRequest<Incoming>,
669    ) -> Result<HyperResponse<ResponseBody>, hyper::Error> {
670        let request_span = span!(Level::INFO, "request");
671        async move {
672            // 1. Capture the WS upgrade handshake (if any) before
673            //    `from_hyper_parts` consumes the request: the `OnUpgrade`
674            //    future and the derived `Sec-WebSocket-Accept`. (See
675            //    `websocket` module docs for why this happens up front.)
676            #[cfg(feature = "websocket")]
677            let ws_upgrade: Option<(hyper::upgrade::OnUpgrade, HeaderValue)> =
678                if websocket::is_upgrade_request(req.method(), req.headers()) {
679                    websocket::accept_key(req.headers())
680                        .map(|accept| (hyper::upgrade::on(&mut req), accept))
681                } else {
682                    None
683                };
684
685            // 2. Build the skeleton (method / path / query / headers); the
686            //    body stream is held aside for step 5.
687            let (mut request, body_stream) = Request::from_hyper_parts(req);
688
689            // 3. CORS preflight: synthesize the 204 ourselves before any
690            //    application-layer work. Preflights are HTTP-protocol
691            //    traffic; neither `before` nor `after` middleware runs on
692            //    them (see `CLAUDE.md` principle 1).
693            if let Some(cors) = &self.cors
694                && CorsLayer::is_preflight(&request.method, &request.headers)
695            {
696                let mut resp = self.finalizer.build_response(ReplyData::Empty).await;
697                cors.apply(&request.headers, resp.headers_mut(), true);
698                return Ok(resp);
699            }
700
701            // 4. Match controller. A path that hits nothing is 404 *before*
702            //    body buffering — a 10 MiB POST to a non-existent URL no
703            //    longer wastes 10 MiB of memory.
704            let route_match = match self.router.match_controller(&request.path_parts) {
705                Some(rm) => rm,
706                None => {
707                    return Ok(self
708                        .finalize_error(
709                            WebError::NotFound,
710                            &request,
711                            #[cfg(feature = "websocket")]
712                            ws_upgrade,
713                        )
714                        .await);
715                }
716            };
717
718            // 4b. Stamp the matched controller's rate-limit class onto the
719            //     request (the skeleton predates routing, so it was `None`).
720            //     This is the one piece of routing context a `before`
721            //     middleware can't otherwise see — it gets `&mut Request`,
722            //     not the matched controller. An application rate-limit
723            //     middleware reads `request.rate_limit_class` and applies its
724            //     own per-class policy; the framework owns the label and the
725            //     `429` response, not the limiter. (Set before the body
726            //     buffer so it survives `collect_body` and is present for the
727            //     whole pipeline, including the after-chain on error replies.)
728            request.rate_limit_class = route_match.controller.actus_rate_limit();
729
730            // 5. Resolve the effective body cap: the matched controller's
731            //    `#[controller(max_body_bytes = …)]` if it set one, otherwise
732            //    the server-wide `with_max_body_bytes` cap (default 2 MiB). A
733            //    future Phase 2 adds per-route overrides at the top of this
734            //    fall-through.
735            //
736            //    The error path returns the same skeleton so the after-chain
737            //    still has a `Request`.
738            let effective_cap = route_match
739                .controller
740                .actus_max_body_bytes()
741                .unwrap_or(self.max_body_bytes);
742            request = match request
743                .collect_body(
744                    body_stream,
745                    effective_cap,
746                    self.max_inflight_body_bytes.as_ref(),
747                )
748                .await
749            {
750                Ok(r) => r,
751                Err((request, e)) => {
752                    warn!("rejecting request before parse: {}", e);
753                    return Ok(self
754                        .finalize_error(
755                            e,
756                            &request,
757                            #[cfg(feature = "websocket")]
758                            ws_upgrade,
759                        )
760                        .await);
761                }
762            };
763
764            // 6. Middleware `before` chain.
765            let pre_data: Option<ReplyData> =
766                match self.middleware_chain.process_request(&mut request).await {
767                    Ok(Outcome::Continue) => None,
768                    Ok(Outcome::Respond(data)) => Some(data),
769                    Err(e) => {
770                        return Ok(self
771                            .finalize_error(
772                                e,
773                                &request,
774                                #[cfg(feature = "websocket")]
775                                ws_upgrade,
776                            )
777                            .await);
778                    }
779                };
780
781            // A `before` hook short-circuited with a reply — skip routing
782            // and the handler, but still run the after-chain.
783            if let Some(data) = pre_data {
784                return Ok(self
785                    .finalize_reply(
786                        data,
787                        &request,
788                        #[cfg(feature = "websocket")]
789                        ws_upgrade,
790                    )
791                    .await);
792            }
793
794            // 7. Body parse (JSON / form / opaque, per Content-Type).
795            //    Malformed body → 400 through the after-chain.
796            let params = match request.to_params() {
797                Ok(p) => p,
798                Err(e) => {
799                    return Ok(self
800                        .finalize_error(
801                            e,
802                            &request,
803                            #[cfg(feature = "websocket")]
804                            ws_upgrade,
805                        )
806                        .await);
807                }
808            };
809
810            // 8. Dispatch via the matched controller. 405 (verb mismatch
811            //    inside the controller) and handler-returned errors both
812            //    come back through here.
813            match route_match
814                .controller
815                .actus_dispatch(&route_match.action, params)
816                .await
817            {
818                Ok(data) => Ok(self
819                    .finalize_reply(
820                        data,
821                        &request,
822                        #[cfg(feature = "websocket")]
823                        ws_upgrade,
824                    )
825                    .await),
826                Err(e) => Ok(self
827                    .finalize_error(
828                        e,
829                        &request,
830                        #[cfg(feature = "websocket")]
831                        ws_upgrade,
832                    )
833                    .await),
834            }
835        }
836        .instrument(request_span)
837        .await
838    }
839}
840
841/// Default shutdown trigger: resolves on SIGTERM, SIGINT (Unix), or Ctrl-C
842/// (Windows). This is what [`Server::run`] uses; for tests or embedding,
843/// see [`Server::run_with_shutdown`].
844async fn default_shutdown_signal() {
845    #[cfg(unix)]
846    {
847        use tokio::signal::unix::{SignalKind, signal};
848        let mut sigterm = signal(SignalKind::terminate()).expect("install SIGTERM handler");
849        let mut sigint = signal(SignalKind::interrupt()).expect("install SIGINT handler");
850        tokio::select! {
851            _ = sigterm.recv() => info!("Received SIGTERM"),
852            _ = sigint.recv() => info!("Received SIGINT"),
853        }
854    }
855    #[cfg(not(unix))]
856    {
857        tokio::signal::ctrl_c()
858            .await
859            .expect("install Ctrl-C handler");
860        info!("Received Ctrl-C");
861    }
862}