indusagi_core/cancel.rs
1//! Cooperative cancellation — the framework's single cancellation currency.
2//!
3//! Replaces the TypeScript framework's native `AbortController`/`AbortSignal`
4//! pair, which was chained **parent → round → per-call child** in
5//! `runtime/dispatch/scheduler.ts`. There is no `CancelToken` *class* in the TS
6//! source; the chaining convention is collapsed here onto a thin wrapper around
7//! [`tokio_util::sync::CancellationToken`], which is already cooperative,
8//! queryable, multi-listener, and drop-safe-chainable.
9//!
10//! ## The cooperative-cancel contract (the cross-cutting invariant)
11//!
12//! Cancellation is **cooperative**: holders poll [`CancellationToken::is_cancelled`]
13//! or `await` [`CancellationToken::cancelled`]; nothing is forcibly killed. The
14//! invariant every subsystem must honor:
15//!
16//! - A cancelled **tool** returns a typed `is_error` outcome — never `Err`,
17//! never a panic.
18//! - A cancelled **connector** yields a terminal `Emission::Error` — it never
19//! throws out of the stream.
20//! - When core code itself needs to *signal* "I observed cancel", it yields the
21//! typed [`CoreError::Cancelled`] (see [`CancellationToken::error_if_cancelled`]
22//! / [`CancellationToken::guard`]). Cancellation is a typed value, never a
23//! panic and never silently folded into an `Ok`.
24//!
25//! Rust is safer here than the Python port: there is no ambient exception to
26//! swallow, so the sharpest Python footgun (the `except CancelledError: raise`
27//! discipline) simply does not exist. The typed-error discipline still has to be
28//! enforced — that is what this module's helpers are for.
29//!
30//! ## The chain
31//!
32//! ```text
33//! parent ── child_token() ──▶ round ── child_token() ──▶ per-call child
34//! ```
35//!
36//! Cancellation propagates *down* the chain (cancelling a parent cancels every
37//! descendant) but never *up* (cancelling a child leaves the parent live). This
38//! is exactly the scheduler's parent→round→child semantics, and the chain is
39//! drop-safe: dropping a token does not cancel it, but a `DropGuard` can cancel
40//! on scope exit when wanted.
41
42use std::future::Future;
43
44pub use tokio_util::sync::CancellationToken;
45
46use crate::errors::CoreError;
47
48/// A root cancellation token. Equivalent to `new AbortController()` whose
49/// `.signal` is handed to the top of the chain.
50pub fn root_token() -> CancellationToken {
51 CancellationToken::new()
52}
53
54/// Derive a *round* token from a `parent`. Cancelling `parent` cancels the
55/// returned token (and everything chained beneath it); cancelling the returned
56/// token leaves `parent` live. This is `parent.child_token()`, named for the
57/// scheduler's parent→round step.
58pub fn round_token(parent: &CancellationToken) -> CancellationToken {
59 parent.child_token()
60}
61
62/// Derive a *per-call* token from a `round`. Same propagation rules; named for
63/// the scheduler's round→child step. (Functionally identical to
64/// [`round_token`]; the two names document intent at the call site.)
65pub fn child_token(round: &CancellationToken) -> CancellationToken {
66 round.child_token()
67}
68
69/// Extension helpers on [`CancellationToken`] that enforce the typed-error
70/// contract. Implemented as an extension trait so the canonical tokio-util type
71/// flows through the whole framework unchanged.
72pub trait CancelExt {
73 /// Return `Err(CoreError::Cancelled)` iff the token is already cancelled,
74 /// else `Ok(())`. The cooperative analogue of `signal.aborted` that yields a
75 /// *typed* error instead of a bool — call it at loop tops in find/grep walks
76 /// and framer reads.
77 fn error_if_cancelled(&self) -> Result<(), CoreError>;
78
79 /// Run `fut` to completion unless the token cancels first. On cancel, the
80 /// in-flight future is dropped (hard teardown of that future only) and a
81 /// typed [`CoreError::Cancelled`] is returned — never a panic, never a
82 /// swallowed value. This is the `select! { cancelled => Err(..), res => res }`
83 /// pattern packaged so call sites cannot accidentally forget the cancel arm.
84 fn guard<F, T>(&self, fut: F) -> impl Future<Output = Result<T, CoreError>> + Send
85 where
86 F: Future<Output = T> + Send;
87}
88
89impl CancelExt for CancellationToken {
90 fn error_if_cancelled(&self) -> Result<(), CoreError> {
91 if self.is_cancelled() {
92 Err(CoreError::cancelled())
93 } else {
94 Ok(())
95 }
96 }
97
98 async fn guard<F, T>(&self, fut: F) -> Result<T, CoreError>
99 where
100 F: Future<Output = T> + Send,
101 {
102 tokio::select! {
103 biased;
104 () = self.cancelled() => Err(CoreError::cancelled()),
105 out = fut => Ok(out),
106 }
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use super::*;
113 use std::time::Duration;
114
115 #[test]
116 fn root_starts_uncancelled() {
117 let t = root_token();
118 assert!(!t.is_cancelled());
119 assert!(t.error_if_cancelled().is_ok());
120 }
121
122 #[test]
123 fn cancel_down_the_chain_propagates_parent_to_child() {
124 let parent = root_token();
125 let round = round_token(&parent);
126 let child = child_token(&round);
127
128 assert!(!child.is_cancelled());
129 parent.cancel();
130 // Down-propagation: cancelling the parent cancels the whole chain.
131 assert!(parent.is_cancelled());
132 assert!(round.is_cancelled());
133 assert!(child.is_cancelled());
134 }
135
136 #[test]
137 fn cancel_does_not_propagate_up() {
138 let parent = root_token();
139 let round = round_token(&parent);
140 let child = child_token(&round);
141
142 child.cancel();
143 // Up-propagation must NOT happen: a failing per-call child never tears
144 // down the round or the parent.
145 assert!(child.is_cancelled());
146 assert!(!round.is_cancelled());
147 assert!(!parent.is_cancelled());
148 }
149
150 #[test]
151 fn error_if_cancelled_yields_typed_error_not_panic() {
152 let t = root_token();
153 t.cancel();
154 let err = t.error_if_cancelled().unwrap_err();
155 assert!(err.is_cancelled());
156 assert_eq!(err.to_string(), "cancelled");
157 }
158
159 #[tokio::test]
160 async fn guard_returns_ok_when_future_wins() {
161 let t = root_token();
162 let out = t.guard(async { 7 }).await;
163 assert_eq!(out.unwrap(), 7);
164 }
165
166 #[tokio::test]
167 async fn guard_returns_typed_cancel_when_token_wins() {
168 let t = root_token();
169 let t2 = t.clone();
170 tokio::spawn(async move {
171 tokio::time::sleep(Duration::from_millis(5)).await;
172 t2.cancel();
173 });
174 // A slow future that observes cancel resolves to a TYPED error, never a
175 // panic and never an Err that isn't the cancel sentinel.
176 let out: Result<(), CoreError> = t
177 .guard(async {
178 tokio::time::sleep(Duration::from_secs(3600)).await;
179 })
180 .await;
181 assert!(out.unwrap_err().is_cancelled());
182 }
183}