Skip to main content

tako_rs_plugins/middleware/
timeout.rs

1//! Per-request timeout middleware.
2//!
3//! Aborts the inner middleware chain when a configurable deadline is exceeded
4//! and returns `503 Service Unavailable` (or a caller-supplied status). The
5//! timer also covers any work the handler is still doing — `tokio::time::timeout`
6//! drops the inner future, which cancels in-flight async work tied to the
7//! request future tree.
8//!
9//! For per-route timeouts that bypass the middleware chain entirely, use
10//! [`Route::timeout`](tako_rs_core::route::Route::timeout) instead — this
11//! middleware exists for cases where the deadline is dynamic (per-tenant,
12//! per-IP, …) or composes with other middleware (e.g. retry).
13//!
14//! # Compio runtime
15//!
16//! The compio runtime ships `!Send` futures. The
17//! [`IntoMiddleware`](tako_rs_core::middleware::IntoMiddleware) contract is
18//! `+ Send + 'static`, which means we cannot host `compio::time::sleep` here —
19//! the wrapping `Box::pin(async move { ... })` would not satisfy `Send`. When
20//! the `compio` cargo feature is active, the
21//! [`IntoMiddleware`](tako_rs_core::middleware::IntoMiddleware) impl is gated
22//! off and `Timeout::into_middleware` is a compile error. Use
23//! [`Route::timeout`](tako_rs_core::route::Route::timeout) (per-route deadline,
24//! runtime-agnostic) on the compio path instead.
25//!
26//! # Examples
27//!
28//! ```rust,ignore
29//! use std::time::Duration;
30//! use tako::middleware::timeout::Timeout;
31//! use tako::middleware::IntoMiddleware;
32//!
33//! let mw = Timeout::new(Duration::from_secs(30)).into_middleware();
34//! ```
35
36#[cfg(not(feature = "compio"))]
37use std::future::Future;
38#[cfg(not(feature = "compio"))]
39use std::pin::Pin;
40use std::sync::Arc;
41use std::time::Duration;
42
43use http::StatusCode;
44#[cfg(not(feature = "compio"))]
45use tako_rs_core::body::TakoBody;
46#[cfg(not(feature = "compio"))]
47use tako_rs_core::middleware::IntoMiddleware;
48#[cfg(not(feature = "compio"))]
49use tako_rs_core::middleware::Next;
50use tako_rs_core::types::Request;
51#[cfg(not(feature = "compio"))]
52use tako_rs_core::types::Response;
53
54/// Per-request override closure for [`Timeout`].
55pub type TimeoutDynamicFn = Arc<dyn Fn(&Request) -> Option<Duration> + Send + Sync + 'static>;
56
57/// Per-request timeout middleware configuration.
58///
59/// All three fields stay populated even on the compio build so the struct
60/// remains constructible — there is just no [`IntoMiddleware`](tako_rs_core::middleware::IntoMiddleware)
61/// adapter for it. The `expect_used` allow keeps the compio compile clean
62/// while the fields wait for a `compio`-runtime adapter.
63#[cfg_attr(feature = "compio", allow(dead_code))]
64pub struct Timeout {
65  duration: Duration,
66  status: StatusCode,
67  dynamic: Option<TimeoutDynamicFn>,
68}
69
70impl Timeout {
71  /// Creates a timeout middleware with a static deadline.
72  pub fn new(duration: Duration) -> Self {
73    Self {
74      duration,
75      status: StatusCode::SERVICE_UNAVAILABLE,
76      dynamic: None,
77    }
78  }
79
80  /// Sets the response status used when the deadline elapses. Default: 503.
81  pub fn status(mut self, status: StatusCode) -> Self {
82    self.status = status;
83    self
84  }
85
86  /// Computes the deadline per request. Returning `None` disables the timeout
87  /// for that request.
88  pub fn dynamic<F>(mut self, f: F) -> Self
89  where
90    F: Fn(&Request) -> Option<Duration> + Send + Sync + 'static,
91  {
92    self.dynamic = Some(Arc::new(f));
93    self
94  }
95}
96
97#[cfg(not(feature = "compio"))]
98impl IntoMiddleware for Timeout {
99  fn into_middleware(
100    self,
101  ) -> impl Fn(Request, Next) -> Pin<Box<dyn Future<Output = Response> + Send + 'static>>
102  + Clone
103  + Send
104  + Sync
105  + 'static {
106    let default_duration = self.duration;
107    let status = self.status;
108    let dynamic = self.dynamic;
109
110    move |req: Request, next: Next| {
111      let dynamic = dynamic.clone();
112      Box::pin(async move {
113        // PMW-05: Per the documented `dynamic()` contract, the closure may
114        // return `None` to disable timeout for a specific request. The old
115        // logic `.and_then(|f| f(&req)).or(Some(default_duration))` overrode
116        // that intent — `None` always collapsed back to the default, so the
117        // `None => fut.await` match arm was unreachable. Match on the
118        // *closure presence* instead: if a dynamic fn was supplied, trust
119        // its decision (including a None per-request opt-out); if no
120        // dynamic fn, use the default.
121        let deadline = match dynamic.as_ref() {
122          Some(f) => f(&req),
123          None => Some(default_duration),
124        };
125
126        let fut = next.run(req);
127        match deadline {
128          Some(d) => match tokio::time::timeout(d, fut).await {
129            Ok(resp) => resp,
130            Err(_) => http::Response::builder()
131              .status(status)
132              .body(TakoBody::empty())
133              .expect("valid timeout response"),
134          },
135          None => fut.await,
136        }
137      })
138    }
139  }
140}