Skip to main content

indusagi_core/
cancel.rs

1//! Cooperative cancellation — the framework's single cancellation currency.
2//!
3//! Replaces the TypeScript framework's native `AbortController`/`AbortSignal`
4//! pair, which was chained **parent → round → per-call child** in
5//! `runtime/dispatch/scheduler.ts`. There is no `CancelToken` *class* in the TS
6//! source; the chaining convention is collapsed here onto a thin wrapper around
7//! [`tokio_util::sync::CancellationToken`], which is already cooperative,
8//! queryable, multi-listener, and drop-safe-chainable.
9//!
10//! ## The cooperative-cancel contract (the cross-cutting invariant)
11//!
12//! Cancellation is **cooperative**: holders poll [`CancellationToken::is_cancelled`]
13//! or `await` [`CancellationToken::cancelled`]; nothing is forcibly killed. The
14//! invariant every subsystem must honor:
15//!
16//! - A cancelled **tool** returns a typed `is_error` outcome — never `Err`,
17//!   never a panic.
18//! - A cancelled **connector** yields a terminal `Emission::Error` — it never
19//!   throws out of the stream.
20//! - When core code itself needs to *signal* "I observed cancel", it yields the
21//!   typed [`CoreError::Cancelled`] (see [`CancellationToken::error_if_cancelled`]
22//!   / [`CancellationToken::guard`]). Cancellation is a typed value, never a
23//!   panic and never silently folded into an `Ok`.
24//!
25//! Rust is safer here than the Python port: there is no ambient exception to
26//! swallow, so the sharpest Python footgun (the `except CancelledError: raise`
27//! discipline) simply does not exist. The typed-error discipline still has to be
28//! enforced — that is what this module's helpers are for.
29//!
30//! ## The chain
31//!
32//! ```text
33//!   parent  ── child_token() ──▶  round  ── child_token() ──▶  per-call child
34//! ```
35//!
36//! Cancellation propagates *down* the chain (cancelling a parent cancels every
37//! descendant) but never *up* (cancelling a child leaves the parent live). This
38//! is exactly the scheduler's parent→round→child semantics, and the chain is
39//! drop-safe: dropping a token does not cancel it, but a `DropGuard` can cancel
40//! on scope exit when wanted.
41
42use std::future::Future;
43
44pub use tokio_util::sync::CancellationToken;
45
46use crate::errors::CoreError;
47
48/// A root cancellation token. Equivalent to `new AbortController()` whose
49/// `.signal` is handed to the top of the chain.
50pub fn root_token() -> CancellationToken {
51    CancellationToken::new()
52}
53
54/// Derive a *round* token from a `parent`. Cancelling `parent` cancels the
55/// returned token (and everything chained beneath it); cancelling the returned
56/// token leaves `parent` live. This is `parent.child_token()`, named for the
57/// scheduler's parent→round step.
58pub fn round_token(parent: &CancellationToken) -> CancellationToken {
59    parent.child_token()
60}
61
62/// Derive a *per-call* token from a `round`. Same propagation rules; named for
63/// the scheduler's round→child step. (Functionally identical to
64/// [`round_token`]; the two names document intent at the call site.)
65pub fn child_token(round: &CancellationToken) -> CancellationToken {
66    round.child_token()
67}
68
69/// Extension helpers on [`CancellationToken`] that enforce the typed-error
70/// contract. Implemented as an extension trait so the canonical tokio-util type
71/// flows through the whole framework unchanged.
72pub trait CancelExt {
73    /// Return `Err(CoreError::Cancelled)` iff the token is already cancelled,
74    /// else `Ok(())`. The cooperative analogue of `signal.aborted` that yields a
75    /// *typed* error instead of a bool — call it at loop tops in find/grep walks
76    /// and framer reads.
77    fn error_if_cancelled(&self) -> Result<(), CoreError>;
78
79    /// Run `fut` to completion unless the token cancels first. On cancel, the
80    /// in-flight future is dropped (hard teardown of that future only) and a
81    /// typed [`CoreError::Cancelled`] is returned — never a panic, never a
82    /// swallowed value. This is the `select! { cancelled => Err(..), res => res }`
83    /// pattern packaged so call sites cannot accidentally forget the cancel arm.
84    fn guard<F, T>(&self, fut: F) -> impl Future<Output = Result<T, CoreError>> + Send
85    where
86        F: Future<Output = T> + Send;
87}
88
89impl CancelExt for CancellationToken {
90    fn error_if_cancelled(&self) -> Result<(), CoreError> {
91        if self.is_cancelled() {
92            Err(CoreError::cancelled())
93        } else {
94            Ok(())
95        }
96    }
97
98    async fn guard<F, T>(&self, fut: F) -> Result<T, CoreError>
99    where
100        F: Future<Output = T> + Send,
101    {
102        tokio::select! {
103            biased;
104            () = self.cancelled() => Err(CoreError::cancelled()),
105            out = fut => Ok(out),
106        }
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use super::*;
113    use std::time::Duration;
114
115    #[test]
116    fn root_starts_uncancelled() {
117        let t = root_token();
118        assert!(!t.is_cancelled());
119        assert!(t.error_if_cancelled().is_ok());
120    }
121
122    #[test]
123    fn cancel_down_the_chain_propagates_parent_to_child() {
124        let parent = root_token();
125        let round = round_token(&parent);
126        let child = child_token(&round);
127
128        assert!(!child.is_cancelled());
129        parent.cancel();
130        // Down-propagation: cancelling the parent cancels the whole chain.
131        assert!(parent.is_cancelled());
132        assert!(round.is_cancelled());
133        assert!(child.is_cancelled());
134    }
135
136    #[test]
137    fn cancel_does_not_propagate_up() {
138        let parent = root_token();
139        let round = round_token(&parent);
140        let child = child_token(&round);
141
142        child.cancel();
143        // Up-propagation must NOT happen: a failing per-call child never tears
144        // down the round or the parent.
145        assert!(child.is_cancelled());
146        assert!(!round.is_cancelled());
147        assert!(!parent.is_cancelled());
148    }
149
150    #[test]
151    fn error_if_cancelled_yields_typed_error_not_panic() {
152        let t = root_token();
153        t.cancel();
154        let err = t.error_if_cancelled().unwrap_err();
155        assert!(err.is_cancelled());
156        assert_eq!(err.to_string(), "cancelled");
157    }
158
159    #[tokio::test]
160    async fn guard_returns_ok_when_future_wins() {
161        let t = root_token();
162        let out = t.guard(async { 7 }).await;
163        assert_eq!(out.unwrap(), 7);
164    }
165
166    #[tokio::test]
167    async fn guard_returns_typed_cancel_when_token_wins() {
168        let t = root_token();
169        let t2 = t.clone();
170        tokio::spawn(async move {
171            tokio::time::sleep(Duration::from_millis(5)).await;
172            t2.cancel();
173        });
174        // A slow future that observes cancel resolves to a TYPED error, never a
175        // panic and never an Err that isn't the cancel sentinel.
176        let out: Result<(), CoreError> = t
177            .guard(async {
178                tokio::time::sleep(Duration::from_secs(3600)).await;
179            })
180            .await;
181        assert!(out.unwrap_err().is_cancelled());
182    }
183}