tako_rs_plugins/middleware/timeout.rs
1//! Per-request timeout middleware.
2//!
3//! Aborts the inner middleware chain when a configurable deadline is exceeded
4//! and returns `503 Service Unavailable` (or a caller-supplied status). The
5//! timer also covers any work the handler is still doing — `tokio::time::timeout`
6//! drops the inner future, which cancels in-flight async work tied to the
7//! request future tree.
8//!
9//! For per-route timeouts that bypass the middleware chain entirely, use
10//! [`Route::timeout`](tako_rs_core::route::Route::timeout) instead — this
11//! middleware exists for cases where the deadline is dynamic (per-tenant,
12//! per-IP, …) or composes with other middleware (e.g. retry).
13//!
14//! # Compio runtime
15//!
16//! The compio runtime ships `!Send` futures. The
17//! [`IntoMiddleware`](tako_rs_core::middleware::IntoMiddleware) contract is
18//! `+ Send + 'static`, which means we cannot host `compio::time::sleep` here —
19//! the wrapping `Box::pin(async move { ... })` would not satisfy `Send`. When
20//! the `compio` cargo feature is active, the
21//! [`IntoMiddleware`](tako_rs_core::middleware::IntoMiddleware) impl is gated
22//! off and `Timeout::into_middleware` is a compile error. Use
23//! [`Route::timeout`](tako_rs_core::route::Route::timeout) (per-route deadline,
24//! runtime-agnostic) on the compio path instead.
25//!
26//! # Examples
27//!
28//! ```rust,ignore
29//! use std::time::Duration;
30//! use tako::middleware::timeout::Timeout;
31//! use tako::middleware::IntoMiddleware;
32//!
33//! let mw = Timeout::new(Duration::from_secs(30)).into_middleware();
34//! ```
35
36#[cfg(not(feature = "compio"))]
37use std::future::Future;
38#[cfg(not(feature = "compio"))]
39use std::pin::Pin;
40use std::sync::Arc;
41use std::time::Duration;
42
43use http::StatusCode;
44#[cfg(not(feature = "compio"))]
45use tako_rs_core::body::TakoBody;
46#[cfg(not(feature = "compio"))]
47use tako_rs_core::middleware::IntoMiddleware;
48#[cfg(not(feature = "compio"))]
49use tako_rs_core::middleware::Next;
50use tako_rs_core::types::Request;
51#[cfg(not(feature = "compio"))]
52use tako_rs_core::types::Response;
53
54/// Per-request override closure for [`Timeout`].
55pub type TimeoutDynamicFn = Arc<dyn Fn(&Request) -> Option<Duration> + Send + Sync + 'static>;
56
57/// Per-request timeout middleware configuration.
58///
59/// All three fields stay populated even on the compio build so the struct
60/// remains constructible — there is just no [`IntoMiddleware`](tako_rs_core::middleware::IntoMiddleware)
61/// adapter for it. The `expect_used` allow keeps the compio compile clean
62/// while the fields wait for a `compio`-runtime adapter.
63#[cfg_attr(feature = "compio", allow(dead_code))]
64pub struct Timeout {
65 duration: Duration,
66 status: StatusCode,
67 dynamic: Option<TimeoutDynamicFn>,
68}
69
70impl Timeout {
71 /// Creates a timeout middleware with a static deadline.
72 pub fn new(duration: Duration) -> Self {
73 Self {
74 duration,
75 status: StatusCode::SERVICE_UNAVAILABLE,
76 dynamic: None,
77 }
78 }
79
80 /// Sets the response status used when the deadline elapses. Default: 503.
81 pub fn status(mut self, status: StatusCode) -> Self {
82 self.status = status;
83 self
84 }
85
86 /// Computes the deadline per request. Returning `None` disables the timeout
87 /// for that request.
88 pub fn dynamic<F>(mut self, f: F) -> Self
89 where
90 F: Fn(&Request) -> Option<Duration> + Send + Sync + 'static,
91 {
92 self.dynamic = Some(Arc::new(f));
93 self
94 }
95}
96
97#[cfg(not(feature = "compio"))]
98impl IntoMiddleware for Timeout {
99 fn into_middleware(
100 self,
101 ) -> impl Fn(Request, Next) -> Pin<Box<dyn Future<Output = Response> + Send + 'static>>
102 + Clone
103 + Send
104 + Sync
105 + 'static {
106 let default_duration = self.duration;
107 let status = self.status;
108 let dynamic = self.dynamic;
109
110 move |req: Request, next: Next| {
111 let dynamic = dynamic.clone();
112 Box::pin(async move {
113 // PMW-05: Per the documented `dynamic()` contract, the closure may
114 // return `None` to disable timeout for a specific request. The old
115 // logic `.and_then(|f| f(&req)).or(Some(default_duration))` overrode
116 // that intent — `None` always collapsed back to the default, so the
117 // `None => fut.await` match arm was unreachable. Match on the
118 // *closure presence* instead: if a dynamic fn was supplied, trust
119 // its decision (including a None per-request opt-out); if no
120 // dynamic fn, use the default.
121 let deadline = match dynamic.as_ref() {
122 Some(f) => f(&req),
123 None => Some(default_duration),
124 };
125
126 let fut = next.run(req);
127 match deadline {
128 Some(d) => match tokio::time::timeout(d, fut).await {
129 Ok(resp) => resp,
130 Err(_) => http::Response::builder()
131 .status(status)
132 .body(TakoBody::empty())
133 .expect("valid timeout response"),
134 },
135 None => fut.await,
136 }
137 })
138 }
139 }
140}