rmcp_server_kit/cancel.rs
1//! Cancellation primitives that detach in-flight async work on the
2//! cancel/timeout branch instead of dropping it mid-`.await`.
3//!
4//! # Motivation
5//!
6//! `tokio::select!` arms that race a long-running future against
7//! [`tokio_util::sync::CancellationToken`] or `tokio::time::sleep` drop
8//! the losing future when another branch wins. For work that owns a
9//! remote-side resource (an SSH channel, an in-flight HTTP body, a DB
10//! transaction), dropping mid-flight leaves that resource half-open
11//! until some outer lifetime ends -- the inner future's own
12//! `close().await` calls only run on its own early-return paths, never
13//! on outer Drop.
14//!
15//! The fix: hand the future to [`tokio::spawn`] so it owns its own task
16//! frame. Race the resulting [`tokio::task::JoinHandle`] -- not the
17//! future itself -- against the cancel/timeout sources. When the
18//! cancel/timeout branch wins, the `JoinHandle` is dropped (NOT
19//! `.abort()`); the detached task keeps running to completion and
20//! drives the inner close path. The client gets its cancel/timeout
21//! response immediately, and the remote-side resource is released as
22//! soon as the spawned future finishes its current work.
23//!
24//! # Semantics
25//!
26//! - **Pre-cancel check**: if the token is already cancelled at entry,
27//! the future is NEVER spawned. Returns
28//! [`DetachOutcome::Cancelled`](crate::cancel::DetachOutcome::Cancelled)
29//! immediately. Avoids starting expensive (often mutating) work for
30//! requests the client has already abandoned.
31//! - **Completion wins on tie**: if the spawned future and a
32//! cancel/timeout signal are both ready in the same poll, the
33//! [`DetachOutcome::Completed`](crate::cancel::DetachOutcome::Completed)
34//! arm wins. This prevents reporting cancel/timeout for an operation
35//! that actually succeeded (especially harmful for mutating tools
36//! where the client might then retry).
37//! - **Panic surfacing**: a panic in the spawned future is exposed as
38//! [`DetachOutcome::Panicked`](crate::cancel::DetachOutcome::Panicked)
39//! carrying the [`tokio::task::JoinError`]. Callers decide how to
40//! translate it; the helper does not fold it into Cancelled/TimedOut.
41//!
42//! # Lifetime
43//!
44//! Spawned tasks live on the tokio runtime. They are bounded by:
45//! 1. The future's own completion (normal exit -- desired path).
46//! 2. Tokio runtime shutdown (unavoidable -- TCP teardown forces the
47//! remote side to release resources regardless).
48//!
49//! They are NOT bounded by the request handler that started them, by
50//! [`CancellationToken`](tokio_util::sync::CancellationToken) cancel,
51//! or by any [`tokio::task::JoinHandle`] the caller might hold. That
52//! is the entire point.
53//!
54//! # Caller obligations
55//!
56//! Detached tasks can accumulate if the inner future hangs forever
57//! (dead channel, wedged HTTP body, deadlock, missing protocol-level
58//! timeout). Callers MUST ensure the inner future has its own
59//! eventual-completion guarantee. The `timeout` argument here is a
60//! **response timeout**, not an operation timeout: it bounds how long
61//! the client waits, not how long the work runs.
62//!
63//! # Caveats
64//!
65//! These properties of the calling context are **lost** when work
66//! detaches onto the runtime:
67//!
68//! - **Task-local RBAC scope**. The helper does NOT propagate RBAC
69//! task-locals into the spawned future. Inside the detached task,
70//! the accessors [`crate::rbac::current_role`],
71//! [`crate::rbac::current_identity`], [`crate::rbac::current_token`],
72//! and [`crate::rbac::current_sub`] will return `None` even if the
73//! originating request was authenticated. This is intentional:
74//! detached work should finish or close already-authorized
75//! resources, not initiate fresh RBAC-gated operations. Holding
76//! secrets and tokens alive past the request boundary would extend
77//! credential lifetime past the request that authorized them.
78//!
79//! If a caller genuinely needs RBAC context inside detached work
80//! (e.g. emitting an audit event that names the originating
81//! identity), it MUST capture the values before the spawn and rebind
82//! them with [`crate::rbac::with_rbac_scope`]:
83//!
84//! ```no_run
85//! use rmcp_server_kit::{cancel, rbac};
86//! use std::time::Duration;
87//! use tokio_util::sync::CancellationToken;
88//!
89//! # async fn example(ct: CancellationToken) {
90//! // Capture BEFORE spawn.
91//! let role = rbac::current_role().unwrap_or_default();
92//! let identity = rbac::current_identity().unwrap_or_default();
93//! let token = rbac::current_token().unwrap_or_else(|| {
94//! use rmcp_server_kit::secret::SecretString;
95//! SecretString::new(String::new().into())
96//! });
97//! let sub = rbac::current_sub().unwrap_or_default();
98//!
99//! let fut = async move {
100//! rbac::with_rbac_scope(role, identity, token, sub, async {
101//! // Detached work here can call current_role() etc.
102//! })
103//! .await;
104//! };
105//!
106//! let _ = cancel::run_with_cancel_and_timeout(fut, &ct, Some(Duration::from_secs(5))).await;
107//! # }
108//! ```
109//!
110//! - **Tracing span**: the originating request's span IS preserved.
111//! The helper wraps the spawned future in
112//! `.instrument(tracing::Span::current())`, so log lines from the
113//! detached task remain attached to the request span (matching the
114//! convention in [`crate::tool_hooks`]).
115
116use std::time::Duration;
117
118use tokio_util::sync::CancellationToken;
119use tracing::Instrument;
120
121/// Outcome of [`run_with_cancel_and_timeout`].
122///
123/// [`Self::Completed`] carries the future's own return value.
124/// [`Self::Cancelled`] and [`Self::TimedOut`] indicate the future was
125/// detached (still running on the tokio runtime) and the caller should
126/// return a cancel/timeout response to the client immediately.
127/// [`Self::Panicked`] indicates the spawned future panicked; the
128/// [`tokio::task::JoinError`] is exposed so the caller can surface it.
129#[derive(Debug)]
130#[non_exhaustive]
131#[must_use = "DetachOutcome carries the operation result; ignoring it discards either the value or the cancel/timeout signal"]
132pub enum DetachOutcome<T> {
133 /// The spawned future ran to completion and returned a value.
134 Completed(T),
135 /// The cancellation token fired before the future completed. The
136 /// future was detached onto the runtime and keeps running.
137 Cancelled,
138 /// The `timeout` budget elapsed before the future completed. The
139 /// future was detached onto the runtime and keeps running.
140 TimedOut,
141 /// The spawned future panicked. Carries the underlying
142 /// [`tokio::task::JoinError`] so the caller can decide how to
143 /// surface the panic (typical: log + return an internal-error
144 /// tool response).
145 Panicked(tokio::task::JoinError),
146}
147
148/// Race a `'static` future against client cancellation and an optional
149/// timeout, detaching the future on cancel/timeout so it can complete
150/// its own cleanup path.
151///
152/// # Pre-condition
153///
154/// If `ct.is_cancelled()` already holds at entry, this returns
155/// [`DetachOutcome::Cancelled`] without spawning `fut`. The future will
156/// NEVER start in that case -- do not begin expensive or mutating work
157/// for already-abandoned requests.
158///
159/// # Behavior
160///
161/// Otherwise spawns `fut` onto the tokio runtime (wrapped in
162/// `.instrument(Span::current())` so its log lines stay attached to
163/// the originating request span) and races its
164/// [`tokio::task::JoinHandle`] against `ct` and (optionally)
165/// `tokio::time::sleep(timeout)`. On tie, completion wins (the handle
166/// arm comes first under `biased;`). On cancel/timeout the spawned
167/// task keeps running to completion -- dropping the [`tokio::task::JoinHandle`]
168/// is a no-op for the task.
169///
170/// # Requirements on `fut`
171///
172/// `fut` MUST be cleanup-safe under self-driven completion: when
173/// detached, only the future's own internal Drop and early-return
174/// paths run; nothing else will help it clean up.
175///
176/// # Cancel-safety
177///
178/// // cancel-safe under composition: the inner future is moved into a
179/// // `tokio::spawn` before we await anything, so the cancel/timeout
180/// // arms can only ever drop the `JoinHandle` (a no-op for the task)
181/// // -- never the inner future itself.
182#[must_use = "DetachOutcome must be inspected to distinguish completion from cancel/timeout/panic"]
183pub async fn run_with_cancel_and_timeout<F, T>(
184 fut: F,
185 ct: &CancellationToken,
186 timeout: Option<Duration>,
187) -> DetachOutcome<T>
188where
189 F: Future<Output = T> + Send + 'static,
190 T: Send + 'static,
191{
192 // Pre-cancel check: never start work for already-abandoned requests.
193 if ct.is_cancelled() {
194 return DetachOutcome::Cancelled;
195 }
196
197 let mut handle = tokio::spawn(fut.instrument(tracing::Span::current()));
198
199 // `biased;` evaluates arms top-down. The `&mut handle` arm comes
200 // FIRST so a ready completion wins over a simultaneously-ready
201 // cancel/timeout. Dropping the `JoinHandle` does NOT abort the
202 // task -- the spawned future runs to its own completion and cleans
203 // up via its own Drop / early-return paths.
204 if let Some(t) = timeout {
205 tokio::select! {
206 biased;
207 joined = &mut handle => map_join(joined),
208 () = ct.cancelled() => DetachOutcome::Cancelled,
209 () = tokio::time::sleep(t) => DetachOutcome::TimedOut,
210 }
211 } else {
212 tokio::select! {
213 biased;
214 joined = &mut handle => map_join(joined),
215 () = ct.cancelled() => DetachOutcome::Cancelled,
216 }
217 }
218}
219
220/// Translate a [`tokio::task::JoinHandle`] result into a
221/// [`DetachOutcome`]. Panics are surfaced distinctly via
222/// [`DetachOutcome::Panicked`] so the caller can distinguish them from
223/// cancel/timeout -- do not fold panics into the cancel path, that
224/// loses real failure info.
225fn map_join<T>(joined: Result<T, tokio::task::JoinError>) -> DetachOutcome<T> {
226 match joined {
227 Ok(v) => DetachOutcome::Completed(v),
228 Err(join_err) => DetachOutcome::Panicked(join_err),
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 #![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
235
236 use std::sync::{
237 Arc,
238 atomic::{AtomicBool, AtomicU32, Ordering},
239 };
240
241 use tokio::time::Duration;
242
243 use super::*;
244
245 #[tokio::test]
246 async fn completed_returns_value_when_future_wins() {
247 let ct = CancellationToken::new();
248 let out =
249 run_with_cancel_and_timeout(async { 42_u32 }, &ct, Some(Duration::from_secs(5))).await;
250 assert!(matches!(out, DetachOutcome::Completed(42)));
251 }
252
253 #[tokio::test]
254 async fn cancel_outcome_and_detached_future_runs_to_completion() {
255 // The cancel branch wins, but the future must still complete in
256 // the background -- this is the central contract.
257 //
258 // The pre-cancel check short-circuits if the token is cancelled
259 // at entry, so we delay-cancel after the helper has spawned the
260 // future.
261 let done = Arc::new(AtomicBool::new(false));
262 let done_clone = Arc::clone(&done);
263 let ct = CancellationToken::new();
264 let fut = async move {
265 tokio::time::sleep(Duration::from_millis(100)).await;
266 done_clone.store(true, Ordering::SeqCst);
267 };
268
269 let ct_for_cancel = ct.clone();
270 tokio::spawn(async move {
271 tokio::time::sleep(Duration::from_millis(10)).await;
272 ct_for_cancel.cancel();
273 });
274 let out = run_with_cancel_and_timeout(fut, &ct, None).await;
275 assert!(matches!(out, DetachOutcome::Cancelled));
276
277 // The detached task is still running. Give it time to finish.
278 tokio::time::sleep(Duration::from_millis(300)).await;
279 assert!(
280 done.load(Ordering::SeqCst),
281 "detached future must run to completion after cancel"
282 );
283 }
284
285 #[tokio::test]
286 async fn timeout_outcome_and_detached_future_runs_to_completion() {
287 let done = Arc::new(AtomicBool::new(false));
288 let done_clone = Arc::clone(&done);
289 let ct = CancellationToken::new();
290 let fut = async move {
291 tokio::time::sleep(Duration::from_millis(100)).await;
292 done_clone.store(true, Ordering::SeqCst);
293 };
294
295 let out = run_with_cancel_and_timeout(fut, &ct, Some(Duration::from_millis(10))).await;
296 assert!(matches!(out, DetachOutcome::TimedOut));
297
298 tokio::time::sleep(Duration::from_millis(300)).await;
299 assert!(
300 done.load(Ordering::SeqCst),
301 "detached future must run to completion after timeout"
302 );
303 }
304
305 #[tokio::test]
306 async fn panic_in_detached_future_surfaces_as_panicked() {
307 let ct = CancellationToken::new();
308 let out: DetachOutcome<()> = run_with_cancel_and_timeout(
309 async { panic!("boom") },
310 &ct,
311 Some(Duration::from_secs(5)),
312 )
313 .await;
314 // Panic is surfaced distinctly, not folded into Cancelled/TimedOut.
315 assert!(
316 matches!(out, DetachOutcome::Panicked(ref e) if e.is_panic()),
317 "expected Panicked carrying a panic JoinError"
318 );
319 }
320
321 /// Pre-cancel check: if the token is already cancelled at entry,
322 /// the future MUST NOT be spawned. This avoids starting expensive
323 /// or mutating work for already-abandoned requests.
324 #[tokio::test]
325 async fn pre_cancelled_token_skips_spawn() {
326 let started = Arc::new(AtomicU32::new(0));
327 let started_clone = Arc::clone(&started);
328 let ct = CancellationToken::new();
329 ct.cancel();
330
331 let out = run_with_cancel_and_timeout(
332 async move {
333 started_clone.fetch_add(1, Ordering::SeqCst);
334 },
335 &ct,
336 None,
337 )
338 .await;
339 assert!(matches!(out, DetachOutcome::Cancelled));
340
341 // Give the runtime a chance to run any errant spawn.
342 tokio::time::sleep(Duration::from_millis(50)).await;
343 assert_eq!(
344 started.load(Ordering::SeqCst),
345 0,
346 "pre-cancelled token must not spawn the future"
347 );
348 }
349
350 /// Completion wins on tie: even when the token is cancelled
351 /// concurrently with a ready future, the `Completed` arm must win
352 /// because `biased;` puts it first. The caller must NEVER see
353 /// `Cancelled` for an operation that actually completed
354 /// successfully (would mislead clients into bad retries on
355 /// mutating tools).
356 ///
357 /// We run many iterations to exercise the race; in any iteration
358 /// where the pre-cancel check fires first (token cancel raced ahead
359 /// of helper entry) we accept `Cancelled` as a non-tie path.
360 #[tokio::test]
361 async fn completion_wins_on_tie_with_cancel() {
362 for _ in 0..50 {
363 let ct = CancellationToken::new();
364 let ct_for_cancel = ct.clone();
365 tokio::spawn(async move {
366 ct_for_cancel.cancel();
367 });
368 let out = run_with_cancel_and_timeout(async { 7_u32 }, &ct, None).await;
369 match out {
370 DetachOutcome::Completed(7) | DetachOutcome::Cancelled => {}
371 DetachOutcome::Completed(other_val) => {
372 panic!("unexpected Completed value on tie race: {other_val}")
373 }
374 DetachOutcome::TimedOut => {
375 panic!("unexpected TimedOut on tie race (no timeout configured)")
376 }
377 DetachOutcome::Panicked(join_err) => {
378 panic!("unexpected Panicked on tie race: {join_err}")
379 }
380 }
381 }
382 }
383}