Skip to main content

rmcp_server_kit/
cancel.rs

1//! Cancellation primitives that detach in-flight async work on the
2//! cancel/timeout branch instead of dropping it mid-`.await`.
3//!
4//! # Motivation
5//!
6//! `tokio::select!` arms that race a long-running future against
7//! [`tokio_util::sync::CancellationToken`] or `tokio::time::sleep` drop
8//! the losing future when another branch wins. For work that owns a
9//! remote-side resource (an SSH channel, an in-flight HTTP body, a DB
10//! transaction), dropping mid-flight leaves that resource half-open
11//! until some outer lifetime ends -- the inner future's own
12//! `close().await` calls only run on its own early-return paths, never
13//! on outer Drop.
14//!
15//! The fix: hand the future to [`tokio::spawn`] so it owns its own task
16//! frame. Race the resulting [`tokio::task::JoinHandle`] -- not the
17//! future itself -- against the cancel/timeout sources. When the
18//! cancel/timeout branch wins, the `JoinHandle` is dropped (NOT
19//! `.abort()`); the detached task keeps running to completion and
20//! drives the inner close path. The client gets its cancel/timeout
21//! response immediately, and the remote-side resource is released as
22//! soon as the spawned future finishes its current work.
23//!
24//! # Semantics
25//!
26//! - **Pre-cancel check**: if the token is already cancelled at entry,
27//!   the future is NEVER spawned. Returns
28//!   [`DetachOutcome::Cancelled`](crate::cancel::DetachOutcome::Cancelled)
29//!   immediately. Avoids starting expensive (often mutating) work for
30//!   requests the client has already abandoned.
31//! - **Completion wins on tie**: if the spawned future and a
32//!   cancel/timeout signal are both ready in the same poll, the
33//!   [`DetachOutcome::Completed`](crate::cancel::DetachOutcome::Completed)
34//!   arm wins. This prevents reporting cancel/timeout for an operation
35//!   that actually succeeded (especially harmful for mutating tools
36//!   where the client might then retry).
37//! - **Panic surfacing**: a panic in the spawned future is exposed as
38//!   [`DetachOutcome::Panicked`](crate::cancel::DetachOutcome::Panicked)
39//!   carrying the [`tokio::task::JoinError`]. Callers decide how to
40//!   translate it; the helper does not fold it into Cancelled/TimedOut.
41//!
42//! # Lifetime
43//!
44//! Spawned tasks live on the tokio runtime. They are bounded by:
45//! 1. The future's own completion (normal exit -- desired path).
46//! 2. Tokio runtime shutdown (unavoidable -- TCP teardown forces the
47//!    remote side to release resources regardless).
48//!
49//! They are NOT bounded by the request handler that started them, by
50//! [`CancellationToken`](tokio_util::sync::CancellationToken) cancel,
51//! or by any [`tokio::task::JoinHandle`] the caller might hold. That
52//! is the entire point.
53//!
54//! # Caller obligations
55//!
56//! Detached tasks can accumulate if the inner future hangs forever
57//! (dead channel, wedged HTTP body, deadlock, missing protocol-level
58//! timeout). Callers MUST ensure the inner future has its own
59//! eventual-completion guarantee. The `timeout` argument here is a
60//! **response timeout**, not an operation timeout: it bounds how long
61//! the client waits, not how long the work runs.
62//!
63//! # Caveats
64//!
65//! These properties of the calling context are **lost** when work
66//! detaches onto the runtime:
67//!
68//! - **Task-local RBAC scope**. The helper does NOT propagate RBAC
69//!   task-locals into the spawned future. Inside the detached task,
70//!   the accessors [`crate::rbac::current_role`],
71//!   [`crate::rbac::current_identity`], [`crate::rbac::current_token`],
72//!   and [`crate::rbac::current_sub`] will return `None` even if the
73//!   originating request was authenticated. This is intentional:
74//!   detached work should finish or close already-authorized
75//!   resources, not initiate fresh RBAC-gated operations. Holding
76//!   secrets and tokens alive past the request boundary would extend
77//!   credential lifetime past the request that authorized them.
78//!
79//!   If a caller genuinely needs RBAC context inside detached work
80//!   (e.g. emitting an audit event that names the originating
81//!   identity), it MUST capture the values before the spawn and rebind
82//!   them with [`crate::rbac::with_rbac_scope`]:
83//!
84//!   ```no_run
85//!   use rmcp_server_kit::{cancel, rbac};
86//!   use std::time::Duration;
87//!   use tokio_util::sync::CancellationToken;
88//!
89//!   # async fn example(ct: CancellationToken) {
90//!   // Capture BEFORE spawn.
91//!   let role = rbac::current_role().unwrap_or_default();
92//!   let identity = rbac::current_identity().unwrap_or_default();
93//!   let token = rbac::current_token().unwrap_or_else(|| {
94//!       use rmcp_server_kit::secret::SecretString;
95//!       SecretString::new(String::new().into())
96//!   });
97//!   let sub = rbac::current_sub().unwrap_or_default();
98//!
99//!   let fut = async move {
100//!       rbac::with_rbac_scope(role, identity, token, sub, async {
101//!           // Detached work here can call current_role() etc.
102//!       })
103//!       .await;
104//!   };
105//!
106//!   let _ = cancel::run_with_cancel_and_timeout(fut, &ct, Some(Duration::from_secs(5))).await;
107//!   # }
108//!   ```
109//!
110//! - **Tracing span**: the originating request's span IS preserved.
111//!   The helper wraps the spawned future in
112//!   `.instrument(tracing::Span::current())`, so log lines from the
113//!   detached task remain attached to the request span (matching the
114//!   convention in [`crate::tool_hooks`]).
115
116use std::time::Duration;
117
118use tokio_util::sync::CancellationToken;
119use tracing::Instrument;
120
121/// Outcome of [`run_with_cancel_and_timeout`].
122///
123/// [`Self::Completed`] carries the future's own return value.
124/// [`Self::Cancelled`] and [`Self::TimedOut`] indicate the future was
125/// detached (still running on the tokio runtime) and the caller should
126/// return a cancel/timeout response to the client immediately.
127/// [`Self::Panicked`] indicates the spawned future panicked; the
128/// [`tokio::task::JoinError`] is exposed so the caller can surface it.
129#[derive(Debug)]
130#[non_exhaustive]
131#[must_use = "DetachOutcome carries the operation result; ignoring it discards either the value or the cancel/timeout signal"]
132pub enum DetachOutcome<T> {
133    /// The spawned future ran to completion and returned a value.
134    Completed(T),
135    /// The cancellation token fired before the future completed. The
136    /// future was detached onto the runtime and keeps running.
137    Cancelled,
138    /// The `timeout` budget elapsed before the future completed. The
139    /// future was detached onto the runtime and keeps running.
140    TimedOut,
141    /// The spawned future panicked. Carries the underlying
142    /// [`tokio::task::JoinError`] so the caller can decide how to
143    /// surface the panic (typical: log + return an internal-error
144    /// tool response).
145    Panicked(tokio::task::JoinError),
146}
147
148/// Race a `'static` future against client cancellation and an optional
149/// timeout, detaching the future on cancel/timeout so it can complete
150/// its own cleanup path.
151///
152/// # Pre-condition
153///
154/// If `ct.is_cancelled()` already holds at entry, this returns
155/// [`DetachOutcome::Cancelled`] without spawning `fut`. The future will
156/// NEVER start in that case -- do not begin expensive or mutating work
157/// for already-abandoned requests.
158///
159/// # Behavior
160///
161/// Otherwise spawns `fut` onto the tokio runtime (wrapped in
162/// `.instrument(Span::current())` so its log lines stay attached to
163/// the originating request span) and races its
164/// [`tokio::task::JoinHandle`] against `ct` and (optionally)
165/// `tokio::time::sleep(timeout)`. On tie, completion wins (the handle
166/// arm comes first under `biased;`). On cancel/timeout the spawned
167/// task keeps running to completion -- dropping the [`tokio::task::JoinHandle`]
168/// is a no-op for the task.
169///
170/// # Requirements on `fut`
171///
172/// `fut` MUST be cleanup-safe under self-driven completion: when
173/// detached, only the future's own internal Drop and early-return
174/// paths run; nothing else will help it clean up.
175///
176/// # Cancel-safety
177///
178/// // cancel-safe under composition: the inner future is moved into a
179/// // `tokio::spawn` before we await anything, so the cancel/timeout
180/// // arms can only ever drop the `JoinHandle` (a no-op for the task)
181/// // -- never the inner future itself.
182#[must_use = "DetachOutcome must be inspected to distinguish completion from cancel/timeout/panic"]
183pub async fn run_with_cancel_and_timeout<F, T>(
184    fut: F,
185    ct: &CancellationToken,
186    timeout: Option<Duration>,
187) -> DetachOutcome<T>
188where
189    F: Future<Output = T> + Send + 'static,
190    T: Send + 'static,
191{
192    // Pre-cancel check: never start work for already-abandoned requests.
193    if ct.is_cancelled() {
194        return DetachOutcome::Cancelled;
195    }
196
197    let mut handle = tokio::spawn(fut.instrument(tracing::Span::current()));
198
199    // `biased;` evaluates arms top-down. The `&mut handle` arm comes
200    // FIRST so a ready completion wins over a simultaneously-ready
201    // cancel/timeout. Dropping the `JoinHandle` does NOT abort the
202    // task -- the spawned future runs to its own completion and cleans
203    // up via its own Drop / early-return paths.
204    if let Some(t) = timeout {
205        tokio::select! {
206            biased;
207            joined = &mut handle => map_join(joined),
208            () = ct.cancelled() => DetachOutcome::Cancelled,
209            () = tokio::time::sleep(t) => DetachOutcome::TimedOut,
210        }
211    } else {
212        tokio::select! {
213            biased;
214            joined = &mut handle => map_join(joined),
215            () = ct.cancelled() => DetachOutcome::Cancelled,
216        }
217    }
218}
219
220/// Translate a [`tokio::task::JoinHandle`] result into a
221/// [`DetachOutcome`]. Panics are surfaced distinctly via
222/// [`DetachOutcome::Panicked`] so the caller can distinguish them from
223/// cancel/timeout -- do not fold panics into the cancel path, that
224/// loses real failure info.
225fn map_join<T>(joined: Result<T, tokio::task::JoinError>) -> DetachOutcome<T> {
226    match joined {
227        Ok(v) => DetachOutcome::Completed(v),
228        Err(join_err) => DetachOutcome::Panicked(join_err),
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    #![allow(
235        clippy::unwrap_used,
236        clippy::expect_used,
237        clippy::panic,
238        reason = "test-only relaxations; production code uses ? and tracing"
239    )]
240
241    use std::sync::{
242        Arc,
243        atomic::{AtomicBool, AtomicU32, Ordering},
244    };
245
246    use tokio::time::Duration;
247
248    use super::*;
249
250    #[tokio::test]
251    async fn completed_returns_value_when_future_wins() {
252        let ct = CancellationToken::new();
253        let out =
254            run_with_cancel_and_timeout(async { 42_u32 }, &ct, Some(Duration::from_secs(5))).await;
255        assert!(matches!(out, DetachOutcome::Completed(42)));
256    }
257
258    #[tokio::test]
259    async fn cancel_outcome_and_detached_future_runs_to_completion() {
260        // The cancel branch wins, but the future must still complete in
261        // the background -- this is the central contract.
262        //
263        // The pre-cancel check short-circuits if the token is cancelled
264        // at entry, so we delay-cancel after the helper has spawned the
265        // future.
266        let done = Arc::new(AtomicBool::new(false));
267        let done_clone = Arc::clone(&done);
268        let ct = CancellationToken::new();
269        let fut = async move {
270            tokio::time::sleep(Duration::from_millis(100)).await;
271            done_clone.store(true, Ordering::SeqCst);
272        };
273
274        let ct_for_cancel = ct.clone();
275        tokio::spawn(async move {
276            tokio::time::sleep(Duration::from_millis(10)).await;
277            ct_for_cancel.cancel();
278        });
279        let out = run_with_cancel_and_timeout(fut, &ct, None).await;
280        assert!(matches!(out, DetachOutcome::Cancelled));
281
282        // The detached task is still running. Give it time to finish.
283        tokio::time::sleep(Duration::from_millis(300)).await;
284        assert!(
285            done.load(Ordering::SeqCst),
286            "detached future must run to completion after cancel"
287        );
288    }
289
290    #[tokio::test]
291    async fn timeout_outcome_and_detached_future_runs_to_completion() {
292        let done = Arc::new(AtomicBool::new(false));
293        let done_clone = Arc::clone(&done);
294        let ct = CancellationToken::new();
295        let fut = async move {
296            tokio::time::sleep(Duration::from_millis(100)).await;
297            done_clone.store(true, Ordering::SeqCst);
298        };
299
300        let out = run_with_cancel_and_timeout(fut, &ct, Some(Duration::from_millis(10))).await;
301        assert!(matches!(out, DetachOutcome::TimedOut));
302
303        tokio::time::sleep(Duration::from_millis(300)).await;
304        assert!(
305            done.load(Ordering::SeqCst),
306            "detached future must run to completion after timeout"
307        );
308    }
309
310    #[tokio::test]
311    async fn panic_in_detached_future_surfaces_as_panicked() {
312        let ct = CancellationToken::new();
313        let out: DetachOutcome<()> = run_with_cancel_and_timeout(
314            async { panic!("boom") },
315            &ct,
316            Some(Duration::from_secs(5)),
317        )
318        .await;
319        // Panic is surfaced distinctly, not folded into Cancelled/TimedOut.
320        assert!(
321            matches!(out, DetachOutcome::Panicked(ref e) if e.is_panic()),
322            "expected Panicked carrying a panic JoinError"
323        );
324    }
325
326    /// Pre-cancel check: if the token is already cancelled at entry,
327    /// the future MUST NOT be spawned. This avoids starting expensive
328    /// or mutating work for already-abandoned requests.
329    #[tokio::test]
330    async fn pre_cancelled_token_skips_spawn() {
331        let started = Arc::new(AtomicU32::new(0));
332        let started_clone = Arc::clone(&started);
333        let ct = CancellationToken::new();
334        ct.cancel();
335
336        let out = run_with_cancel_and_timeout(
337            async move {
338                started_clone.fetch_add(1, Ordering::SeqCst);
339            },
340            &ct,
341            None,
342        )
343        .await;
344        assert!(matches!(out, DetachOutcome::Cancelled));
345
346        // Give the runtime a chance to run any errant spawn.
347        tokio::time::sleep(Duration::from_millis(50)).await;
348        assert_eq!(
349            started.load(Ordering::SeqCst),
350            0,
351            "pre-cancelled token must not spawn the future"
352        );
353    }
354
355    /// Completion wins on tie: even when the token is cancelled
356    /// concurrently with a ready future, the `Completed` arm must win
357    /// because `biased;` puts it first. The caller must NEVER see
358    /// `Cancelled` for an operation that actually completed
359    /// successfully (would mislead clients into bad retries on
360    /// mutating tools).
361    ///
362    /// We run many iterations to exercise the race; in any iteration
363    /// where the pre-cancel check fires first (token cancel raced ahead
364    /// of helper entry) we accept `Cancelled` as a non-tie path.
365    #[tokio::test]
366    async fn completion_wins_on_tie_with_cancel() {
367        for _ in 0..50 {
368            let ct = CancellationToken::new();
369            let ct_for_cancel = ct.clone();
370            tokio::spawn(async move {
371                ct_for_cancel.cancel();
372            });
373            let out = run_with_cancel_and_timeout(async { 7_u32 }, &ct, None).await;
374            match out {
375                DetachOutcome::Completed(7) | DetachOutcome::Cancelled => {}
376                DetachOutcome::Completed(other_val) => {
377                    panic!("unexpected Completed value on tie race: {other_val}")
378                }
379                DetachOutcome::TimedOut => {
380                    panic!("unexpected TimedOut on tie race (no timeout configured)")
381                }
382                DetachOutcome::Panicked(join_err) => {
383                    panic!("unexpected Panicked on tie race: {join_err}")
384                }
385            }
386        }
387    }
388}