actus_server/server.rs
1//! [`Server`] — the hyper-based HTTP server that owns the request lifecycle:
2//! routing, body limiting, the middleware chain, CORS, compression, and the
3//! HTTP-correctness stamping (`Allow`, `Vary`), with graceful shutdown.
4
5#[cfg(feature = "compression")]
6use crate::compression::CompressionLayer;
7use crate::cors::CorsLayer;
8use crate::error::ServerError;
9use crate::middleware::{Middleware, MiddlewareChain, Outcome};
10use crate::request::Request;
11use crate::router::Router;
12#[cfg(feature = "websocket")]
13use crate::websocket;
14#[cfg(feature = "websocket")]
15use actus_reply::ProblemDetails;
16use actus_reply::{Finalizer, ReplyData, WebError};
17use bytes::Bytes;
18#[cfg(feature = "websocket")]
19use http::{HeaderValue, StatusCode, header};
20use http_body_util::combinators::BoxBody;
21use hyper::body::Incoming;
22use hyper::service::service_fn;
23use hyper::{Request as HyperRequest, Response as HyperResponse};
24use std::future::Future;
25use std::net::SocketAddr;
26use std::sync::Arc;
27use std::time::Duration;
28use tokio::net::TcpListener;
29use tokio::sync::Semaphore;
30use tokio::task::JoinSet;
31use tracing::{Instrument, Level, error, info, span, warn};
32
33type ResponseBody = BoxBody<Bytes, WebError>;
34
35/// One kibibyte (1024 bytes). For readable byte-size limits, e.g.
36/// `#[controller(max_body_bytes = 4 * KIB)]`.
37pub const KIB: usize = 1024;
38/// One mebibyte (1024 × 1024 bytes), e.g. `Server::with_max_body_bytes(2 * MIB)`.
39pub const MIB: usize = 1024 * KIB;
40/// One gibibyte (1024 × 1024 × 1024 bytes).
41pub const GIB: usize = 1024 * MIB;
42
43/// Default cap on the request body Actus will buffer: **2 MiB** — a safe
44/// ceiling for the common case (JSON APIs, forms). Endpoints that accept larger
45/// bodies (uploads) opt in via [`Server::with_max_body_bytes`] or a
46/// per-controller `#[controller(max_body_bytes = …)]`. Matches axum's default.
47pub const DEFAULT_MAX_BODY_BYTES: usize = 2 * MIB;
48
49/// Default grace period for in-flight connections to finish after a
50/// shutdown signal: 30 seconds. Override with [`Server::with_drain_deadline`].
51pub const DEFAULT_DRAIN_DEADLINE: Duration = Duration::from_secs(30);
52
53/// The main Actus server.
54pub struct Server {
55 router: Arc<Router>,
56 middleware_chain: Arc<MiddlewareChain>,
57 finalizer: Arc<Finalizer>,
58 max_body_bytes: usize,
59 cors: Option<Arc<CorsLayer>>,
60 #[cfg(feature = "compression")]
61 compression: Option<Arc<CompressionLayer>>,
62 /// `Some(d)` caps each request's total time (parse → middleware →
63 /// handler → after-chain → finalize) at `d`; an over-budget request is
64 /// aborted and replied with `504 Gateway Timeout`. `None` disables the
65 /// per-request timer (the default).
66 request_timeout: Option<Duration>,
67 /// Grace period for in-flight connections to drain after shutdown.
68 drain_deadline: Duration,
69 /// Cap on concurrent connection tasks. `Some(n)` installs an
70 /// `Arc<Semaphore>` of `n` permits in the accept loop; while at
71 /// capacity, the loop pauses on permit acquisition and new SYNs queue
72 /// in the kernel's accept backlog (`SOMAXCONN`), at which point the
73 /// kernel drops them. `None` is unbounded.
74 max_connections: Option<usize>,
75 /// Cap on the total bytes being buffered across all in-flight body
76 /// reads. `Some(n)` installs a byte-permit semaphore; each
77 /// `collect_body_capped` reserves its per-request cap upfront and
78 /// releases the permits when the body is buffered or rejected. Refuses
79 /// excess requests with `503 Service Unavailable` (via `WebError::Busy`).
80 /// `None` is unbounded.
81 max_inflight_body_bytes: Option<Arc<Semaphore>>,
82 /// `Some(d)` is forwarded to hyper's `http1::Builder::header_read_timeout`
83 /// — bounds how long after starting to read request headers we'll wait
84 /// before dropping the connection. Catches slowloris and clients that
85 /// TCP-connect-and-send-nothing. `None` leaves hyper's default (none).
86 header_read_timeout: Option<Duration>,
87}
88
89impl Server {
90 /// Create a server for `router` with default settings: no middleware, no
91 /// CORS, the default body cap, and no DoS limits. Configure it with the
92 /// `with_*` builder methods, then call [`run`](Self::run).
93 pub fn new(router: Router) -> Self {
94 Self {
95 router: Arc::new(router),
96 middleware_chain: Arc::new(MiddlewareChain::new()),
97 finalizer: Arc::new(Finalizer::new()),
98 max_body_bytes: DEFAULT_MAX_BODY_BYTES,
99 cors: None,
100 #[cfg(feature = "compression")]
101 compression: None,
102 request_timeout: None,
103 drain_deadline: DEFAULT_DRAIN_DEADLINE,
104 max_connections: None,
105 max_inflight_body_bytes: None,
106 header_read_timeout: None,
107 }
108 }
109
110 /// Adds a middleware to the server's request processing chain.
111 pub fn with_middleware(mut self, middleware: impl Middleware + 'static) -> Self {
112 let mut chain = Arc::try_unwrap(self.middleware_chain).unwrap_or_else(|arc| (*arc).clone());
113 chain.add(middleware);
114 self.middleware_chain = Arc::new(chain);
115 self
116 }
117
118 /// Enables CORS with the given policy. The server then answers preflight
119 /// (`OPTIONS`) requests itself and adds the `Access-Control-*` headers to
120 /// every cross-origin response (including error responses). See
121 /// [`CorsLayer`].
122 pub fn with_cors(mut self, cors: CorsLayer) -> Self {
123 self.cors = Some(Arc::new(cors));
124 self
125 }
126
127 /// Enables response compression (gzip / brotli). For each response Actus
128 /// picks an encoding from the request's `Accept-Encoding` and compresses
129 /// buffered, compressible bodies above the layer's size threshold. See
130 /// [`CompressionLayer`]. *(Requires the `compression` feature.)*
131 #[cfg(feature = "compression")]
132 pub fn with_compression(mut self, layer: CompressionLayer) -> Self {
133 self.compression = Some(Arc::new(layer));
134 self
135 }
136
137 /// Caps the request body Actus will buffer (default
138 /// [`DEFAULT_MAX_BODY_BYTES`] = 2 MiB). A larger body is rejected with
139 /// `413 Payload Too Large` before it can exhaust memory — the limit
140 /// bounds buffered bytes, so it also covers chunked bodies that lie about
141 /// (or omit) `Content-Length`.
142 ///
143 /// `0` is accepted and means "reject every non-empty body" — typically
144 /// only useful on a strictly-GET surface that should never see a body.
145 pub fn with_max_body_bytes(mut self, max: usize) -> Self {
146 self.max_body_bytes = max;
147 self
148 }
149
150 /// Cap the total time any single request may take — body parse,
151 /// middleware `before`, handler, middleware `after`, and finalization
152 /// combined. An over-budget request is aborted (the handler's future
153 /// is dropped) and the client gets `504 Gateway Timeout`. No timeout
154 /// is set by default.
155 ///
156 /// **Scope.** The timer covers the request/response exchange. A
157 /// WebSocket upgrade succeeds inside the timer (the `101` is the
158 /// response); the post-upgrade conversation runs in its own task and
159 /// is not bound by this timeout.
160 ///
161 /// **Effect of an over-budget request.** When the timer elapses the
162 /// in-flight future is dropped, which cancels whatever the handler
163 /// was awaiting (DB query, channel recv, etc.). The 504 reply is
164 /// one-shot — the after-chain doesn't run on it (by definition,
165 /// some component upstream was unresponsive; running more risks
166 /// hanging again).
167 pub fn with_request_timeout(mut self, d: Duration) -> Self {
168 self.request_timeout = Some(d);
169 self
170 }
171
172 /// Override the grace period for in-flight connections after a
173 /// shutdown signal (default [`DEFAULT_DRAIN_DEADLINE`] = 30 s).
174 /// Anything still running at the deadline is hard-aborted via
175 /// `JoinSet::shutdown`. Use a longer value for surfaces that hold
176 /// long-lived connections (large file downloads, WebSockets);
177 /// a shorter value for fast-iteration dev workflows. `Duration::ZERO`
178 /// aborts every in-flight task immediately.
179 pub fn with_drain_deadline(mut self, d: Duration) -> Self {
180 self.drain_deadline = d;
181 self
182 }
183
184 /// Cap concurrent connection tasks at `n`. While the cap is held, the
185 /// accept loop pauses on permit acquisition; new SYNs queue in the
186 /// kernel's accept backlog and (once that fills, governed by
187 /// `SOMAXCONN`) get dropped at the OS level. No userland reject /
188 /// no-503-per-conn cost — the kernel handles the spillover.
189 ///
190 /// Each connection task holds its permit until it ends, including the
191 /// post-handshake WebSocket conversation. Size accordingly: a
192 /// `with_max_connections(N)` server can hold `N` long-lived WebSockets
193 /// *before* it stops accepting new connections of any kind.
194 ///
195 /// Unbounded by default (no semaphore installed).
196 pub fn with_max_connections(mut self, n: usize) -> Self {
197 self.max_connections = Some(n);
198 self
199 }
200
201 /// Cap the total bytes being buffered across all in-flight body reads
202 /// at `n`. Each request reserves its per-request cap (see
203 /// `with_max_body_bytes`) from this global budget upfront; if the
204 /// budget is exhausted, the request is refused with `503 Service
205 /// Unavailable` (via [`WebError::Busy`]) and a short `Retry-After`.
206 ///
207 /// Together with [`Self::with_max_connections`] this puts a hard
208 /// ceiling on the framework's memory under adversarial load:
209 /// `with_max_connections(C) * with_max_body_bytes(B)` is the *worst*
210 /// case absent this knob; with it, the ceiling is `min(C * B, this
211 /// value)`.
212 ///
213 /// Pre-reserving the per-request cap over-counts (a 1 KB request
214 /// reserves up to its full cap); the alternative — incremental
215 /// per-chunk byte accounting — is more code for the same effective
216 /// ceiling, and a request that has already started buffering can't
217 /// be sensibly aborted partway through anyway.
218 ///
219 /// `n` is clamped to `u32::MAX` internally (Tokio's `Semaphore`
220 /// permit count uses `u32`); for practical deployments this is no
221 /// limit (4 GiB).
222 ///
223 /// Unbounded by default.
224 pub fn with_max_inflight_body_bytes(mut self, n: usize) -> Self {
225 // u32 cap is a tokio Semaphore constraint, not a design choice.
226 let n_capped = n.min(u32::MAX as usize);
227 self.max_inflight_body_bytes = Some(Arc::new(Semaphore::new(n_capped)));
228 self
229 }
230
231 /// Bound how long after starting to read request headers we'll wait
232 /// before dropping the connection. Forwards to hyper's
233 /// `http1::Builder::header_read_timeout`. Catches slowloris (sending
234 /// headers one byte at a time) and clients that TCP-connect-and-send-
235 /// nothing — the most common file-descriptor-exhaustion attack on a
236 /// keep-alive HTTP server.
237 ///
238 /// Note: hyper 1.x doesn't have a separate "idle between requests"
239 /// timeout (after a complete request, an idle keep-alive connection
240 /// stays open until either side closes or the OS-level TCP keep-alive
241 /// fires). If that matters for your deployment, either disable
242 /// keep-alive entirely upstream of Actus or rely on the OS knobs.
243 ///
244 /// No timeout by default (hyper's default).
245 pub fn with_header_read_timeout(mut self, d: Duration) -> Self {
246 self.header_read_timeout = Some(d);
247 self
248 }
249
250 /// Runs the server on `127.0.0.1:port` (loopback only). For a different
251 /// bind address — e.g. `0.0.0.0:port` to accept connections from other
252 /// hosts in a container — use [`Server::run_on`].
253 ///
254 /// Listens for SIGTERM/SIGINT (Unix) or Ctrl-C (cross-platform) and
255 /// shuts down gracefully: stops accepting new connections, signals
256 /// in-flight connections to finish, and waits up to 30 seconds for
257 /// them to drain before returning.
258 pub async fn run(self, port: u16) -> Result<(), ServerError> {
259 self.run_on(SocketAddr::from(([127, 0, 0, 1], port))).await
260 }
261
262 /// Like [`Server::run`] but binds an arbitrary address. Pass
263 /// `0.0.0.0:port` (or `[::]:port`) to accept connections from other hosts.
264 pub async fn run_on(self, addr: SocketAddr) -> Result<(), ServerError> {
265 self.run_with_shutdown_on(addr, default_shutdown_signal())
266 .await
267 }
268
269 /// Like [`Server::run`] but with a custom shutdown trigger (a future that,
270 /// when it resolves, starts the graceful drain). Binds `127.0.0.1:port`;
271 /// see [`Server::run_with_shutdown_on`] for a custom bind address. Useful
272 /// for tests or for embedding the server in a larger supervision tree.
273 pub async fn run_with_shutdown(
274 self,
275 port: u16,
276 shutdown: impl Future<Output = ()> + Send + 'static,
277 ) -> Result<(), ServerError> {
278 self.run_with_shutdown_on(SocketAddr::from(([127, 0, 0, 1], port)), shutdown)
279 .await
280 }
281
282 /// The general form: bind `addr`, serve until `shutdown` resolves, then
283 /// drain. [`Server::run`], [`Server::run_on`], and
284 /// [`Server::run_with_shutdown`] are thin wrappers over this.
285 ///
286 /// **Drain bound.** Once `shutdown` resolves the server stops accepting
287 /// and signals every in-flight connection to wind down. The drain
288 /// deadline defaults to [`DEFAULT_DRAIN_DEADLINE`] (30 s); override
289 /// with [`Server::with_drain_deadline`]. Anything still running at
290 /// the deadline is hard-aborted via `JoinSet::shutdown`. In particular,
291 /// long-lived connections (WebSockets, slow downloads, kept-alive idle
292 /// clients) and any connection task that raced the shutdown notification
293 /// and missed it both get aborted at the deadline rather than draining
294 /// gracefully.
295 pub async fn run_with_shutdown_on(
296 self,
297 addr: SocketAddr,
298 shutdown: impl Future<Output = ()> + Send + 'static,
299 ) -> Result<(), ServerError> {
300 let listener = TcpListener::bind(addr).await?;
301 let addr = listener.local_addr().unwrap_or(addr);
302 info!("Server listening on http://{}", addr);
303
304 let app = Arc::new(self);
305 // Per-connection cancellation: once `Notify::notify_waiters` fires,
306 // every in-flight task wakes up and asks hyper to gracefully close
307 // its connection (finishing the current response, then exiting).
308 let notify = Arc::new(tokio::sync::Notify::new());
309 let mut tasks: JoinSet<()> = JoinSet::new();
310
311 // Optional cap on concurrent connections. When at-capacity the
312 // accept loop pauses on permit acquisition; new SYNs queue in the
313 // kernel accept backlog (SOMAXCONN) and get dropped at the OS
314 // level once that fills. Each spawned connection task moves its
315 // permit in; the permit releases when the task exits.
316 let conn_permits = app.max_connections.map(|n| Arc::new(Semaphore::new(n)));
317
318 tokio::pin!(shutdown);
319
320 loop {
321 tokio::select! {
322 // Accept-branch: acquire a connection permit first (when
323 // a cap is configured), then accept. The outer select
324 // races this against shutdown so a paused-at-capacity
325 // accept loop still notices the shutdown signal.
326 accept_with_permit = async {
327 let permit = match &conn_permits {
328 Some(s) => Some(s.clone().acquire_owned().await.expect("semaphore never closed")),
329 None => None,
330 };
331 let result = listener.accept().await;
332 (result, permit)
333 } => {
334 let (accept_result, permit) = accept_with_permit;
335 let (stream, _peer) = match accept_result {
336 Ok(s) => s,
337 Err(e) => {
338 error!("accept error: {}", e);
339 // permit released when this branch ends — fine
340 continue;
341 }
342 };
343 let io = hyper_util::rt::TokioIo::new(stream);
344 let app = app.clone();
345 let notify = notify.clone();
346 let header_timeout = app.header_read_timeout;
347 tasks.spawn(async move {
348 // The permit (if any) lives for the connection's
349 // lifetime; releasing happens at task drop.
350 let _permit = permit;
351
352 let mut builder = hyper::server::conn::http1::Builder::new();
353 if let Some(d) = header_timeout {
354 builder.header_read_timeout(d);
355 }
356 let conn = builder.serve_connection(
357 io,
358 service_fn(move |req| app.clone().handle_request(req)),
359 );
360 // With the `websocket` feature, allow `101 Switching
361 // Protocols` responses to hand off the connection.
362 #[cfg(feature = "websocket")]
363 let conn = conn.with_upgrades();
364 tokio::pin!(conn);
365 tokio::select! {
366 res = conn.as_mut() => {
367 if let Err(err) = res {
368 error!("Error serving connection: {}", err);
369 }
370 }
371 _ = notify.notified() => {
372 conn.as_mut().graceful_shutdown();
373 if let Err(err) = conn.await {
374 error!("Error during graceful shutdown: {}", err);
375 }
376 }
377 }
378 });
379 }
380 // Reap finished connection tasks so the `JoinSet` doesn't grow
381 // without bound over the server's lifetime — and so a panicked
382 // connection task is logged promptly, not only at shutdown.
383 joined = tasks.join_next(), if !tasks.is_empty() => {
384 match joined {
385 Some(Err(e)) if e.is_panic() => error!("Connection task panicked: {}", e),
386 Some(Err(e)) => error!("Connection task failed: {}", e),
387 Some(Ok(())) | None => {}
388 }
389 }
390 _ = &mut shutdown => {
391 info!("Shutdown signal received; draining in-flight requests");
392 break;
393 }
394 }
395 }
396
397 // Stop accepting; signal connections to wind down.
398 drop(listener);
399 notify.notify_waiters();
400
401 // Drain. The grace period is configurable via
402 // `Server::with_drain_deadline` (default 30 s).
403 let drain_deadline = tokio::time::sleep(app.drain_deadline);
404 tokio::pin!(drain_deadline);
405 loop {
406 tokio::select! {
407 next = tasks.join_next() => {
408 match next {
409 Some(Ok(())) => {}
410 Some(Err(e)) if e.is_panic() => {
411 error!("Connection task panicked: {}", e);
412 }
413 Some(Err(e)) => {
414 error!("Connection task failed: {}", e);
415 }
416 None => break,
417 }
418 }
419 _ = &mut drain_deadline => {
420 warn!("Drain deadline exceeded; aborting {} connection(s)", tasks.len());
421 tasks.shutdown().await;
422 break;
423 }
424 }
425 }
426
427 info!("Server shutdown complete");
428 Ok(())
429 }
430
431 /// Stamp the configured CORS response headers onto `response` (no-op when
432 /// CORS isn't enabled, or the request had no allowed `Origin`). Applied to
433 /// *every* outgoing response — success and error alike — so the browser
434 /// can read 4xx/5xx bodies.
435 fn with_cors_headers(
436 &self,
437 request: &Request,
438 mut response: HyperResponse<ResponseBody>,
439 ) -> HyperResponse<ResponseBody> {
440 if let Some(cors) = &self.cors {
441 cors.apply(&request.headers, response.headers_mut(), false);
442 }
443 response
444 }
445
446 /// Fulfil a `ReplyData::Upgrade` from a handler when the request was
447 /// genuinely a WebSocket handshake: send `101 Switching Protocols` and
448 /// spawn the handler on the upgraded connection. (The "handler returned
449 /// Upgrade but the request wasn't a handshake" case is rewritten to a
450 /// 426 reply in [`Self::finalize_reply`] before reaching this method, so
451 /// it can flow through the after-chain like any other error.)
452 ///
453 /// No CORS headers on the `101`: WebSocket handshakes are scoped by
454 /// browser origin checks (the handler inspects `Origin` itself before
455 /// calling `ws::upgrade`), not by the CORS protocol — `Access-Control-*`
456 /// on a `101` is meaningless to the browser.
457 #[cfg(feature = "websocket")]
458 async fn complete_ws_upgrade(
459 &self,
460 handler: Box<dyn std::any::Any + Send>,
461 ws_upgrade: (hyper::upgrade::OnUpgrade, HeaderValue),
462 ) -> HyperResponse<ResponseBody> {
463 // `ReplyData::Upgrade` is only constructible via `ws::upgrade(...)`,
464 // which always boxes an `UpgradeTask`. A failing downcast would mean
465 // a crate-internal invariant is broken; surface as a panic rather
466 // than silently producing a 500.
467 let task = handler
468 .downcast::<websocket::UpgradeTask>()
469 .expect("ReplyData::Upgrade always carries an UpgradeTask");
470 let (on_upgrade, accept) = ws_upgrade;
471 tokio::spawn(websocket::run_upgrade(on_upgrade, *task));
472 let mut resp = self.finalizer.build_response(ReplyData::Empty).await;
473 *resp.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
474 let h = resp.headers_mut();
475 h.insert(header::CONNECTION, HeaderValue::from_static("upgrade"));
476 h.insert(header::UPGRADE, HeaderValue::from_static("websocket"));
477 h.insert(header::SEC_WEBSOCKET_ACCEPT, accept);
478 resp
479 }
480
481 /// Build the error reply for `error` and route it through
482 /// [`finalize_reply`](Self::finalize_reply), so the after-chain,
483 /// compression, and CORS apply to errors exactly as they do to handler
484 /// successes. This is the canonical way to produce a `WebError`
485 /// response anywhere a `Request` exists.
486 async fn finalize_error(
487 &self,
488 error: WebError,
489 request: &Request,
490 #[cfg(feature = "websocket")] ws_upgrade: Option<(hyper::upgrade::OnUpgrade, HeaderValue)>,
491 ) -> HyperResponse<ResponseBody> {
492 let data = self.finalizer.error_to_reply(error);
493 self.finalize_reply(
494 data,
495 request,
496 #[cfg(feature = "websocket")]
497 ws_upgrade,
498 )
499 .await
500 }
501
502 /// Run the after-middleware chain, then turn the reply into a `Response`
503 /// via [`dispatch_reply`](Self::dispatch_reply).
504 ///
505 /// **After-chain runs on every reply with a body and a `Request`.** That
506 /// includes handler successes, `Outcome::Respond` short-circuits, *and*
507 /// every error the application produced (404 / 405 / 401 / 400 / a
508 /// handler-returned `Err(WebError)`, etc.). The README's promise that a
509 /// request-id stamper "still fires on a short-circuit" generalizes to
510 /// every reply — that's the contract this method enforces.
511 ///
512 /// **Exceptions** (the after-chain *doesn't* run):
513 /// - **101 Switching Protocols** — a WebSocket-handshake success has no
514 /// HTTP body to decorate, and the upgrade machinery consumes the
515 /// connection.
516 /// - **Pre-parse failures** — a request that fails before
517 /// [`Request::from_hyper`] returns a skeleton (e.g. malformed HTTP
518 /// from hyper itself) has no `Request` to give the hook. The body-cap
519 /// 413 and truncated-body 400 are *not* exceptions here: `from_hyper`
520 /// now returns a skeleton `Request` on those, so they do run through
521 /// the after-chain.
522 /// - **CORS preflight 204** — synthesized before middleware or routing;
523 /// not an application request (see [`Self::handle_request`]).
524 async fn finalize_reply(
525 &self,
526 #[allow(unused_mut)] mut data: ReplyData,
527 request: &Request,
528 #[cfg(feature = "websocket")] ws_upgrade: Option<(hyper::upgrade::OnUpgrade, HeaderValue)>,
529 ) -> HyperResponse<ResponseBody> {
530 // If the handler returned `ws::upgrade(...)` but the request isn't a
531 // real WebSocket handshake, rewrite to a 426 error reply *here* so
532 // it flows through the same after-chain / compression / CORS path as
533 // any other error. Only the success-handshake path (Upgrade reply +
534 // ws_upgrade present) keeps the after-chain bypass — a 101 has no
535 // HTTP body to decorate.
536 #[cfg(feature = "websocket")]
537 if matches!(data, ReplyData::Upgrade(_)) && ws_upgrade.is_none() {
538 data = self.finalizer.error_to_reply(WebError::Problem(
539 ProblemDetails::new(StatusCode::UPGRADE_REQUIRED, "WebSocket Upgrade Required")
540 .detail("this endpoint expects a WebSocket handshake"),
541 ));
542 }
543
544 let needs_after_chain = !matches!(data, ReplyData::Upgrade(_));
545 if needs_after_chain
546 && let Err(e) = self
547 .middleware_chain
548 .process_response(request, &mut data)
549 .await
550 {
551 // After-chain itself errored. Build a plain error response
552 // (no further after-chain — recursion prevention) so a buggy
553 // hook can't infinite-loop the request.
554 return self.with_cors_headers(request, self.finalizer.build_error(e).await);
555 }
556 self.dispatch_reply(
557 data,
558 request,
559 #[cfg(feature = "websocket")]
560 ws_upgrade,
561 )
562 .await
563 }
564
565 /// Turn a handler's (or a short-circuiting middleware's) `ReplyData` into a
566 /// fully processed response: WebSocket upgrade if it's an `Upgrade` reply;
567 /// otherwise compress (if enabled), finalize, and stamp CORS / `Vary`.
568 async fn dispatch_reply(
569 &self,
570 #[allow(unused_mut)] mut data: ReplyData,
571 request: &Request,
572 #[cfg(feature = "websocket")] ws_upgrade: Option<(hyper::upgrade::OnUpgrade, HeaderValue)>,
573 ) -> HyperResponse<ResponseBody> {
574 // A handler that returned `ws::upgrade(...)`: complete the handshake
575 // instead of finalizing a body. `finalize_reply` only lets us reach
576 // here for an `Upgrade` reply when the request *was* a real
577 // handshake (otherwise it rewrote the reply to a 426 error), so
578 // `ws_upgrade` is guaranteed `Some` on this branch.
579 #[cfg(feature = "websocket")]
580 if matches!(data, ReplyData::Upgrade(_)) {
581 let ReplyData::Upgrade(handler) = data else {
582 unreachable!()
583 };
584 let ws_upgrade =
585 ws_upgrade.expect("finalize_reply rewrites Upgrade-without-handshake to 426");
586 return self.complete_ws_upgrade(handler, ws_upgrade).await;
587 }
588 // Compression is the last transform — after response middleware,
589 // before the bytes leave. (Only buffered, compressible bodies above
590 // the threshold are touched.)
591 #[cfg(feature = "compression")]
592 if let Some(c) = &self.compression {
593 data = c.compress_reply(
594 data,
595 request
596 .headers
597 .get("accept-encoding")
598 .and_then(|v| v.to_str().ok()),
599 );
600 }
601 let response = self.finalizer.build_response(data).await;
602 let response = self.with_cors_headers(request, response);
603 #[cfg(feature = "compression")]
604 let response = crate::compression::tag_vary_if_encoded(response);
605 response
606 }
607
608 /// Handles an individual incoming `hyper::Request`.
609 ///
610 /// Wraps [`handle_request_inner`](Self::handle_request_inner) in a
611 /// per-request timeout when one is configured (see
612 /// [`Server::with_request_timeout`]); a timed-out request gets a
613 /// one-shot `504 Gateway Timeout` (no after-chain, since by definition
614 /// something upstream was unresponsive).
615 ///
616 /// Every reply with a `Request` flows through
617 /// [`finalize_reply`](Self::finalize_reply) — handler successes,
618 /// `Outcome::Respond` short-circuits, and *every* error (middleware
619 /// `Err`, body parse failure, 404 / 405 from the router, handler-returned
620 /// `Err`, even the 413 / 400 from the body-cap path). The after-chain,
621 /// compression, and CORS apply uniformly. CORS preflight is the one
622 /// short-circuit that bypasses the pipeline — it's HTTP-protocol traffic
623 /// rather than an application request.
624 async fn handle_request(
625 self: Arc<Self>,
626 req: HyperRequest<Incoming>,
627 ) -> Result<HyperResponse<ResponseBody>, hyper::Error> {
628 let timeout = self.request_timeout;
629 let app = self.clone();
630 let inner = app.handle_request_inner(req);
631 match timeout {
632 None => inner.await,
633 Some(d) => match tokio::time::timeout(d, inner).await {
634 Ok(r) => r,
635 Err(_) => {
636 warn!(timeout = ?d, "request exceeded configured timeout");
637 Ok(self.finalizer.build_error(WebError::Timeout).await)
638 }
639 },
640 }
641 }
642
643 /// The actual request pipeline. Split from
644 /// [`handle_request`](Self::handle_request) so the latter can wrap it
645 /// in a timeout when one is configured.
646 ///
647 /// **Lifecycle order:**
648 ///
649 /// 1. capture WS upgrade (if request looks like a handshake)
650 /// 2. build the `Request` skeleton (no body yet)
651 /// 3. CORS preflight short-circuit (uses headers only)
652 /// 4. match controller — 404 short-circuits *without* buffering the
653 /// body (efficiency win on adversarial bad-path requests); then stamp
654 /// the matched controller's rate-limit class onto the request, so a
655 /// `before` middleware (which only gets `&Request`) can read it
656 /// 5. buffer the body, capped per the resolved policy (today: server-
657 /// wide; soon, per-controller / per-route)
658 /// 6. middleware `before`
659 /// 7. `to_params` (Content-Type-driven body parse)
660 /// 8. dispatch via the already-matched controller
661 /// 9. middleware `after` + finalize
662 ///
663 /// The "route before buffer" order is what lets the body cap depend on
664 /// the matched route — without it, the framework would have to commit
665 /// to a single cap before knowing where the request is headed.
666 async fn handle_request_inner(
667 self: Arc<Self>,
668 #[allow(unused_mut)] mut req: HyperRequest<Incoming>,
669 ) -> Result<HyperResponse<ResponseBody>, hyper::Error> {
670 let request_span = span!(Level::INFO, "request");
671 async move {
672 // 1. Capture the WS upgrade handshake (if any) before
673 // `from_hyper_parts` consumes the request: the `OnUpgrade`
674 // future and the derived `Sec-WebSocket-Accept`. (See
675 // `websocket` module docs for why this happens up front.)
676 #[cfg(feature = "websocket")]
677 let ws_upgrade: Option<(hyper::upgrade::OnUpgrade, HeaderValue)> =
678 if websocket::is_upgrade_request(req.method(), req.headers()) {
679 websocket::accept_key(req.headers())
680 .map(|accept| (hyper::upgrade::on(&mut req), accept))
681 } else {
682 None
683 };
684
685 // 2. Build the skeleton (method / path / query / headers); the
686 // body stream is held aside for step 5.
687 let (mut request, body_stream) = Request::from_hyper_parts(req);
688
689 // 3. CORS preflight: synthesize the 204 ourselves before any
690 // application-layer work. Preflights are HTTP-protocol
691 // traffic; neither `before` nor `after` middleware runs on
692 // them (see `CLAUDE.md` principle 1).
693 if let Some(cors) = &self.cors
694 && CorsLayer::is_preflight(&request.method, &request.headers)
695 {
696 let mut resp = self.finalizer.build_response(ReplyData::Empty).await;
697 cors.apply(&request.headers, resp.headers_mut(), true);
698 return Ok(resp);
699 }
700
701 // 4. Match controller. A path that hits nothing is 404 *before*
702 // body buffering — a 10 MiB POST to a non-existent URL no
703 // longer wastes 10 MiB of memory.
704 let route_match = match self.router.match_controller(&request.path_parts) {
705 Some(rm) => rm,
706 None => {
707 return Ok(self
708 .finalize_error(
709 WebError::NotFound,
710 &request,
711 #[cfg(feature = "websocket")]
712 ws_upgrade,
713 )
714 .await);
715 }
716 };
717
718 // 4b. Stamp the matched controller's rate-limit class onto the
719 // request (the skeleton predates routing, so it was `None`).
720 // This is the one piece of routing context a `before`
721 // middleware can't otherwise see — it gets `&mut Request`,
722 // not the matched controller. An application rate-limit
723 // middleware reads `request.rate_limit_class` and applies its
724 // own per-class policy; the framework owns the label and the
725 // `429` response, not the limiter. (Set before the body
726 // buffer so it survives `collect_body` and is present for the
727 // whole pipeline, including the after-chain on error replies.)
728 request.rate_limit_class = route_match.controller.actus_rate_limit();
729
730 // 5. Resolve the effective body cap: the matched controller's
731 // `#[controller(max_body_bytes = …)]` if it set one, otherwise
732 // the server-wide `with_max_body_bytes` cap (default 2 MiB). A
733 // future Phase 2 adds per-route overrides at the top of this
734 // fall-through.
735 //
736 // The error path returns the same skeleton so the after-chain
737 // still has a `Request`.
738 let effective_cap = route_match
739 .controller
740 .actus_max_body_bytes()
741 .unwrap_or(self.max_body_bytes);
742 request = match request
743 .collect_body(
744 body_stream,
745 effective_cap,
746 self.max_inflight_body_bytes.as_ref(),
747 )
748 .await
749 {
750 Ok(r) => r,
751 Err((request, e)) => {
752 warn!("rejecting request before parse: {}", e);
753 return Ok(self
754 .finalize_error(
755 e,
756 &request,
757 #[cfg(feature = "websocket")]
758 ws_upgrade,
759 )
760 .await);
761 }
762 };
763
764 // 6. Middleware `before` chain.
765 let pre_data: Option<ReplyData> =
766 match self.middleware_chain.process_request(&mut request).await {
767 Ok(Outcome::Continue) => None,
768 Ok(Outcome::Respond(data)) => Some(data),
769 Err(e) => {
770 return Ok(self
771 .finalize_error(
772 e,
773 &request,
774 #[cfg(feature = "websocket")]
775 ws_upgrade,
776 )
777 .await);
778 }
779 };
780
781 // A `before` hook short-circuited with a reply — skip routing
782 // and the handler, but still run the after-chain.
783 if let Some(data) = pre_data {
784 return Ok(self
785 .finalize_reply(
786 data,
787 &request,
788 #[cfg(feature = "websocket")]
789 ws_upgrade,
790 )
791 .await);
792 }
793
794 // 7. Body parse (JSON / form / opaque, per Content-Type).
795 // Malformed body → 400 through the after-chain.
796 let params = match request.to_params() {
797 Ok(p) => p,
798 Err(e) => {
799 return Ok(self
800 .finalize_error(
801 e,
802 &request,
803 #[cfg(feature = "websocket")]
804 ws_upgrade,
805 )
806 .await);
807 }
808 };
809
810 // 8. Dispatch via the matched controller. 405 (verb mismatch
811 // inside the controller) and handler-returned errors both
812 // come back through here.
813 match route_match
814 .controller
815 .actus_dispatch(&route_match.action, params)
816 .await
817 {
818 Ok(data) => Ok(self
819 .finalize_reply(
820 data,
821 &request,
822 #[cfg(feature = "websocket")]
823 ws_upgrade,
824 )
825 .await),
826 Err(e) => Ok(self
827 .finalize_error(
828 e,
829 &request,
830 #[cfg(feature = "websocket")]
831 ws_upgrade,
832 )
833 .await),
834 }
835 }
836 .instrument(request_span)
837 .await
838 }
839}
840
841/// Default shutdown trigger: resolves on SIGTERM, SIGINT (Unix), or Ctrl-C
842/// (Windows). This is what [`Server::run`] uses; for tests or embedding,
843/// see [`Server::run_with_shutdown`].
844async fn default_shutdown_signal() {
845 #[cfg(unix)]
846 {
847 use tokio::signal::unix::{SignalKind, signal};
848 let mut sigterm = signal(SignalKind::terminate()).expect("install SIGTERM handler");
849 let mut sigint = signal(SignalKind::interrupt()).expect("install SIGINT handler");
850 tokio::select! {
851 _ = sigterm.recv() => info!("Received SIGTERM"),
852 _ = sigint.recv() => info!("Received SIGINT"),
853 }
854 }
855 #[cfg(not(unix))]
856 {
857 tokio::signal::ctrl_c()
858 .await
859 .expect("install Ctrl-C handler");
860 info!("Received Ctrl-C");
861 }
862}