rmcp_server_kit/cancel.rs
1//! Cancellation primitives that detach in-flight async work on the
2//! cancel/timeout branch instead of dropping it mid-`.await`.
3//!
4//! # Motivation
5//!
6//! `tokio::select!` arms that race a long-running future against
7//! [`tokio_util::sync::CancellationToken`] or `tokio::time::sleep` drop
8//! the losing future when another branch wins. For work that owns a
9//! remote-side resource (an SSH channel, an in-flight HTTP body, a DB
10//! transaction), dropping mid-flight leaves that resource half-open
11//! until some outer lifetime ends -- the inner future's own
12//! `close().await` calls only run on its own early-return paths, never
13//! on outer Drop.
14//!
15//! The fix: hand the future to [`tokio::spawn`] so it owns its own task
16//! frame. Race the resulting [`tokio::task::JoinHandle`] -- not the
17//! future itself -- against the cancel/timeout sources. When the
18//! cancel/timeout branch wins, the `JoinHandle` is dropped (NOT
19//! `.abort()`); the detached task keeps running to completion and
20//! drives the inner close path. The client gets its cancel/timeout
21//! response immediately, and the remote-side resource is released as
22//! soon as the spawned future finishes its current work.
23//!
24//! # Semantics
25//!
26//! - **Pre-cancel check**: if the token is already cancelled at entry,
27//! the future is NEVER spawned. Returns
28//! [`DetachOutcome::Cancelled`](crate::cancel::DetachOutcome::Cancelled)
29//! immediately. Avoids starting expensive (often mutating) work for
30//! requests the client has already abandoned.
31//! - **Completion wins on tie**: if the spawned future and a
32//! cancel/timeout signal are both ready in the same poll, the
33//! [`DetachOutcome::Completed`](crate::cancel::DetachOutcome::Completed)
34//! arm wins. This prevents reporting cancel/timeout for an operation
35//! that actually succeeded (especially harmful for mutating tools
36//! where the client might then retry).
37//! - **Panic surfacing**: a panic in the spawned future is exposed as
38//! [`DetachOutcome::Panicked`](crate::cancel::DetachOutcome::Panicked)
39//! carrying the [`tokio::task::JoinError`]. Callers decide how to
40//! translate it; the helper does not fold it into Cancelled/TimedOut.
41//!
42//! # Lifetime
43//!
44//! Spawned tasks live on the tokio runtime. They are bounded by:
45//! 1. The future's own completion (normal exit -- desired path).
46//! 2. Tokio runtime shutdown (unavoidable -- TCP teardown forces the
47//! remote side to release resources regardless).
48//!
49//! They are NOT bounded by the request handler that started them, by
50//! [`CancellationToken`](tokio_util::sync::CancellationToken) cancel,
51//! or by any [`tokio::task::JoinHandle`] the caller might hold. That
52//! is the entire point.
53//!
54//! # Caller obligations
55//!
56//! Detached tasks can accumulate if the inner future hangs forever
57//! (dead channel, wedged HTTP body, deadlock, missing protocol-level
58//! timeout). Callers MUST ensure the inner future has its own
59//! eventual-completion guarantee. The `timeout` argument here is a
60//! **response timeout**, not an operation timeout: it bounds how long
61//! the client waits, not how long the work runs.
62//!
63//! # Caveats
64//!
65//! These properties of the calling context are **lost** when work
66//! detaches onto the runtime:
67//!
68//! - **Task-local RBAC scope**. The helper does NOT propagate RBAC
69//! task-locals into the spawned future. Inside the detached task,
70//! the accessors [`crate::rbac::current_role`],
71//! [`crate::rbac::current_identity`], [`crate::rbac::current_token`],
72//! and [`crate::rbac::current_sub`] will return `None` even if the
73//! originating request was authenticated. This is intentional:
74//! detached work should finish or close already-authorized
75//! resources, not initiate fresh RBAC-gated operations. Holding
76//! secrets and tokens alive past the request boundary would extend
77//! credential lifetime past the request that authorized them.
78//!
79//! If a caller genuinely needs RBAC context inside detached work
80//! (e.g. emitting an audit event that names the originating
81//! identity), it MUST capture the values before the spawn and rebind
82//! them with [`crate::rbac::with_rbac_scope`]:
83//!
84//! ```no_run
85//! use rmcp_server_kit::{cancel, rbac};
86//! use std::time::Duration;
87//! use tokio_util::sync::CancellationToken;
88//!
89//! # async fn example(ct: CancellationToken) {
90//! // Capture BEFORE spawn.
91//! let role = rbac::current_role().unwrap_or_default();
92//! let identity = rbac::current_identity().unwrap_or_default();
93//! let token = rbac::current_token().unwrap_or_else(|| {
94//! use rmcp_server_kit::secret::SecretString;
95//! SecretString::new(String::new().into())
96//! });
97//! let sub = rbac::current_sub().unwrap_or_default();
98//!
99//! let fut = async move {
100//! rbac::with_rbac_scope(role, identity, token, sub, async {
101//! // Detached work here can call current_role() etc.
102//! })
103//! .await;
104//! };
105//!
106//! let _ = cancel::run_with_cancel_and_timeout(fut, &ct, Some(Duration::from_secs(5))).await;
107//! # }
108//! ```
109//!
110//! - **Tracing span**: the originating request's span IS preserved.
111//! The helper wraps the spawned future in
112//! `.instrument(tracing::Span::current())`, so log lines from the
113//! detached task remain attached to the request span (matching the
114//! convention in [`crate::tool_hooks`]).
115
116use std::time::Duration;
117
118use tokio_util::sync::CancellationToken;
119use tracing::Instrument;
120
121/// Outcome of [`run_with_cancel_and_timeout`].
122///
123/// [`Self::Completed`] carries the future's own return value.
124/// [`Self::Cancelled`] and [`Self::TimedOut`] indicate the future was
125/// detached (still running on the tokio runtime) and the caller should
126/// return a cancel/timeout response to the client immediately.
127/// [`Self::Panicked`] indicates the spawned future panicked; the
128/// [`tokio::task::JoinError`] is exposed so the caller can surface it.
129#[derive(Debug)]
130#[non_exhaustive]
131#[must_use = "DetachOutcome carries the operation result; ignoring it discards either the value or the cancel/timeout signal"]
132pub enum DetachOutcome<T> {
133 /// The spawned future ran to completion and returned a value.
134 Completed(T),
135 /// The cancellation token fired before the future completed. The
136 /// future was detached onto the runtime and keeps running.
137 Cancelled,
138 /// The `timeout` budget elapsed before the future completed. The
139 /// future was detached onto the runtime and keeps running.
140 TimedOut,
141 /// The spawned future panicked. Carries the underlying
142 /// [`tokio::task::JoinError`] so the caller can decide how to
143 /// surface the panic (typical: log + return an internal-error
144 /// tool response).
145 Panicked(tokio::task::JoinError),
146}
147
148/// Race a `'static` future against client cancellation and an optional
149/// timeout, detaching the future on cancel/timeout so it can complete
150/// its own cleanup path.
151///
152/// # Pre-condition
153///
154/// If `ct.is_cancelled()` already holds at entry, this returns
155/// [`DetachOutcome::Cancelled`] without spawning `fut`. The future will
156/// NEVER start in that case -- do not begin expensive or mutating work
157/// for already-abandoned requests.
158///
159/// # Behavior
160///
161/// Otherwise spawns `fut` onto the tokio runtime (wrapped in
162/// `.instrument(Span::current())` so its log lines stay attached to
163/// the originating request span) and races its
164/// [`tokio::task::JoinHandle`] against `ct` and (optionally)
165/// `tokio::time::sleep(timeout)`. On tie, completion wins (the handle
166/// arm comes first under `biased;`). On cancel/timeout the spawned
167/// task keeps running to completion -- dropping the [`tokio::task::JoinHandle`]
168/// is a no-op for the task.
169///
170/// # Requirements on `fut`
171///
172/// `fut` MUST be cleanup-safe under self-driven completion: when
173/// detached, only the future's own internal Drop and early-return
174/// paths run; nothing else will help it clean up.
175///
176/// # Cancel-safety
177///
178/// // cancel-safe under composition: the inner future is moved into a
179/// // `tokio::spawn` before we await anything, so the cancel/timeout
180/// // arms can only ever drop the `JoinHandle` (a no-op for the task)
181/// // -- never the inner future itself.
182#[must_use = "DetachOutcome must be inspected to distinguish completion from cancel/timeout/panic"]
183pub async fn run_with_cancel_and_timeout<F, T>(
184 fut: F,
185 ct: &CancellationToken,
186 timeout: Option<Duration>,
187) -> DetachOutcome<T>
188where
189 F: Future<Output = T> + Send + 'static,
190 T: Send + 'static,
191{
192 // Pre-cancel check: never start work for already-abandoned requests.
193 if ct.is_cancelled() {
194 return DetachOutcome::Cancelled;
195 }
196
197 let mut handle = tokio::spawn(fut.instrument(tracing::Span::current()));
198
199 // `biased;` evaluates arms top-down. The `&mut handle` arm comes
200 // FIRST so a ready completion wins over a simultaneously-ready
201 // cancel/timeout. Dropping the `JoinHandle` does NOT abort the
202 // task -- the spawned future runs to its own completion and cleans
203 // up via its own Drop / early-return paths.
204 if let Some(t) = timeout {
205 tokio::select! {
206 biased;
207 joined = &mut handle => map_join(joined),
208 () = ct.cancelled() => DetachOutcome::Cancelled,
209 () = tokio::time::sleep(t) => DetachOutcome::TimedOut,
210 }
211 } else {
212 tokio::select! {
213 biased;
214 joined = &mut handle => map_join(joined),
215 () = ct.cancelled() => DetachOutcome::Cancelled,
216 }
217 }
218}
219
220/// Translate a [`tokio::task::JoinHandle`] result into a
221/// [`DetachOutcome`]. Panics are surfaced distinctly via
222/// [`DetachOutcome::Panicked`] so the caller can distinguish them from
223/// cancel/timeout -- do not fold panics into the cancel path, that
224/// loses real failure info.
225fn map_join<T>(joined: Result<T, tokio::task::JoinError>) -> DetachOutcome<T> {
226 match joined {
227 Ok(v) => DetachOutcome::Completed(v),
228 Err(join_err) => DetachOutcome::Panicked(join_err),
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 #![allow(
235 clippy::unwrap_used,
236 clippy::expect_used,
237 clippy::panic,
238 reason = "test-only relaxations; production code uses ? and tracing"
239 )]
240
241 use std::sync::{
242 Arc,
243 atomic::{AtomicBool, AtomicU32, Ordering},
244 };
245
246 use tokio::time::Duration;
247
248 use super::*;
249
250 #[tokio::test]
251 async fn completed_returns_value_when_future_wins() {
252 let ct = CancellationToken::new();
253 let out =
254 run_with_cancel_and_timeout(async { 42_u32 }, &ct, Some(Duration::from_secs(5))).await;
255 assert!(matches!(out, DetachOutcome::Completed(42)));
256 }
257
258 #[tokio::test]
259 async fn cancel_outcome_and_detached_future_runs_to_completion() {
260 // The cancel branch wins, but the future must still complete in
261 // the background -- this is the central contract.
262 //
263 // The pre-cancel check short-circuits if the token is cancelled
264 // at entry, so we delay-cancel after the helper has spawned the
265 // future.
266 let done = Arc::new(AtomicBool::new(false));
267 let done_clone = Arc::clone(&done);
268 let ct = CancellationToken::new();
269 let fut = async move {
270 tokio::time::sleep(Duration::from_millis(100)).await;
271 done_clone.store(true, Ordering::SeqCst);
272 };
273
274 let ct_for_cancel = ct.clone();
275 tokio::spawn(async move {
276 tokio::time::sleep(Duration::from_millis(10)).await;
277 ct_for_cancel.cancel();
278 });
279 let out = run_with_cancel_and_timeout(fut, &ct, None).await;
280 assert!(matches!(out, DetachOutcome::Cancelled));
281
282 // The detached task is still running. Give it time to finish.
283 tokio::time::sleep(Duration::from_millis(300)).await;
284 assert!(
285 done.load(Ordering::SeqCst),
286 "detached future must run to completion after cancel"
287 );
288 }
289
290 #[tokio::test]
291 async fn timeout_outcome_and_detached_future_runs_to_completion() {
292 let done = Arc::new(AtomicBool::new(false));
293 let done_clone = Arc::clone(&done);
294 let ct = CancellationToken::new();
295 let fut = async move {
296 tokio::time::sleep(Duration::from_millis(100)).await;
297 done_clone.store(true, Ordering::SeqCst);
298 };
299
300 let out = run_with_cancel_and_timeout(fut, &ct, Some(Duration::from_millis(10))).await;
301 assert!(matches!(out, DetachOutcome::TimedOut));
302
303 tokio::time::sleep(Duration::from_millis(300)).await;
304 assert!(
305 done.load(Ordering::SeqCst),
306 "detached future must run to completion after timeout"
307 );
308 }
309
310 #[tokio::test]
311 async fn panic_in_detached_future_surfaces_as_panicked() {
312 let ct = CancellationToken::new();
313 let out: DetachOutcome<()> = run_with_cancel_and_timeout(
314 async { panic!("boom") },
315 &ct,
316 Some(Duration::from_secs(5)),
317 )
318 .await;
319 // Panic is surfaced distinctly, not folded into Cancelled/TimedOut.
320 assert!(
321 matches!(out, DetachOutcome::Panicked(ref e) if e.is_panic()),
322 "expected Panicked carrying a panic JoinError"
323 );
324 }
325
326 /// Pre-cancel check: if the token is already cancelled at entry,
327 /// the future MUST NOT be spawned. This avoids starting expensive
328 /// or mutating work for already-abandoned requests.
329 #[tokio::test]
330 async fn pre_cancelled_token_skips_spawn() {
331 let started = Arc::new(AtomicU32::new(0));
332 let started_clone = Arc::clone(&started);
333 let ct = CancellationToken::new();
334 ct.cancel();
335
336 let out = run_with_cancel_and_timeout(
337 async move {
338 started_clone.fetch_add(1, Ordering::SeqCst);
339 },
340 &ct,
341 None,
342 )
343 .await;
344 assert!(matches!(out, DetachOutcome::Cancelled));
345
346 // Give the runtime a chance to run any errant spawn.
347 tokio::time::sleep(Duration::from_millis(50)).await;
348 assert_eq!(
349 started.load(Ordering::SeqCst),
350 0,
351 "pre-cancelled token must not spawn the future"
352 );
353 }
354
355 /// Completion wins on tie: even when the token is cancelled
356 /// concurrently with a ready future, the `Completed` arm must win
357 /// because `biased;` puts it first. The caller must NEVER see
358 /// `Cancelled` for an operation that actually completed
359 /// successfully (would mislead clients into bad retries on
360 /// mutating tools).
361 ///
362 /// We run many iterations to exercise the race; in any iteration
363 /// where the pre-cancel check fires first (token cancel raced ahead
364 /// of helper entry) we accept `Cancelled` as a non-tie path.
365 #[tokio::test]
366 async fn completion_wins_on_tie_with_cancel() {
367 for _ in 0..50 {
368 let ct = CancellationToken::new();
369 let ct_for_cancel = ct.clone();
370 tokio::spawn(async move {
371 ct_for_cancel.cancel();
372 });
373 let out = run_with_cancel_and_timeout(async { 7_u32 }, &ct, None).await;
374 match out {
375 DetachOutcome::Completed(7) | DetachOutcome::Cancelled => {}
376 DetachOutcome::Completed(other_val) => {
377 panic!("unexpected Completed value on tie race: {other_val}")
378 }
379 DetachOutcome::TimedOut => {
380 panic!("unexpected TimedOut on tie race (no timeout configured)")
381 }
382 DetachOutcome::Panicked(join_err) => {
383 panic!("unexpected Panicked on tie race: {join_err}")
384 }
385 }
386 }
387 }
388}