Skip to main content

ftui_runtime/
cancellation.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Cooperative cancellation tokens for commands and tasks.
3//!
4//! [`CancellationToken`] provides a thread-safe, cloneable signal that tasks
5//! can poll to detect cooperative cancellation requests. It extends the
6//! subscription [`StopSignal`](crate::StopSignal) pattern to the command/task
7//! domain, enabling bounded-lifetime effects and graceful teardown.
8//!
9//! # Migration rationale
10//!
11//! Web frameworks use `AbortController` / `AbortSignal` for effect cancellation.
12//! This module provides an equivalent Rust-native primitive that the migration
13//! code emitter can target when translating cancellable async workflows.
14//!
15//! # Example
16//!
17//! ```
18//! use ftui_runtime::cancellation::{CancellationSource, CancellationToken};
19//! use std::time::Duration;
20//!
21//! let source = CancellationSource::new();
22//! let token = source.token();
23//!
24//! // Pass token to a background task
25//! std::thread::spawn(move || {
26//!     while !token.is_cancelled() {
27//!         // do work...
28//!         std::thread::sleep(Duration::from_millis(10));
29//!     }
30//! });
31//!
32//! // Cancel from the control side
33//! source.cancel();
34//! ```
35
36#![forbid(unsafe_code)]
37
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::sync::{Arc, Condvar, Mutex};
40use web_time::Duration;
41
42/// A thread-safe, cloneable cancellation token.
43///
44/// Tasks and effects receive a token and poll [`is_cancelled`](Self::is_cancelled)
45/// to detect cancellation requests. Tokens are cheap to clone and share across
46/// thread boundaries.
47#[derive(Clone)]
48pub struct CancellationToken {
49    inner: Arc<CancellationInner>,
50}
51
52/// The control handle that triggers cancellation.
53///
54/// Dropping the source does **not** cancel the token — call [`cancel`](Self::cancel)
55/// explicitly. This prevents accidental cancellation on scope exit.
56pub struct CancellationSource {
57    inner: Arc<CancellationInner>,
58}
59
60struct CancellationInner {
61    cancelled: AtomicBool,
62    notify: (Mutex<()>, Condvar),
63}
64
65impl CancellationSource {
66    /// Create a new cancellation source with an uncancelled token.
67    pub fn new() -> Self {
68        Self {
69            inner: Arc::new(CancellationInner {
70                cancelled: AtomicBool::new(false),
71                notify: (Mutex::new(()), Condvar::new()),
72            }),
73        }
74    }
75
76    /// Obtain a cloneable token that observes this source's state.
77    pub fn token(&self) -> CancellationToken {
78        CancellationToken {
79            inner: Arc::clone(&self.inner),
80        }
81    }
82
83    /// Signal cancellation. All tokens derived from this source will observe
84    /// `is_cancelled() == true` and any pending `wait_timeout` calls will wake.
85    pub fn cancel(&self) {
86        self.inner.cancelled.store(true, Ordering::Release);
87        let (lock, cvar) = &self.inner.notify;
88        let _guard = lock.lock().unwrap_or_else(|e| e.into_inner());
89        cvar.notify_all();
90    }
91
92    /// Check whether cancellation has already been requested.
93    pub fn is_cancelled(&self) -> bool {
94        self.inner.cancelled.load(Ordering::Acquire)
95    }
96}
97
98impl Default for CancellationSource {
99    fn default() -> Self {
100        Self::new()
101    }
102}
103
104impl CancellationToken {
105    /// Returns `true` if cancellation has been requested.
106    #[inline]
107    pub fn is_cancelled(&self) -> bool {
108        self.inner.cancelled.load(Ordering::Acquire)
109    }
110
111    /// Block until either cancellation is requested or the timeout elapses.
112    ///
113    /// Returns `true` if cancelled, `false` if timed out.
114    pub fn wait_timeout(&self, duration: Duration) -> bool {
115        if self.is_cancelled() {
116            return true;
117        }
118        let (lock, cvar) = &self.inner.notify;
119        let mut guard = lock.lock().unwrap_or_else(|e| e.into_inner());
120        let start = web_time::Instant::now();
121        let mut remaining = duration;
122        loop {
123            if self.is_cancelled() {
124                return true;
125            }
126            let (new_guard, result) = cvar
127                .wait_timeout(guard, remaining)
128                .unwrap_or_else(|e| e.into_inner());
129            guard = new_guard;
130            if self.is_cancelled() {
131                return true;
132            }
133            if result.timed_out() {
134                return false;
135            }
136            let elapsed = start.elapsed();
137            if elapsed >= duration {
138                return false;
139            }
140            remaining = duration - elapsed;
141        }
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148    use std::sync::atomic::Ordering as AO;
149    use std::thread;
150
151    #[test]
152    fn token_starts_uncancelled() {
153        let source = CancellationSource::new();
154        let token = source.token();
155        assert!(!token.is_cancelled());
156        assert!(!source.is_cancelled());
157    }
158
159    #[test]
160    fn cancel_propagates_to_token() {
161        let source = CancellationSource::new();
162        let token = source.token();
163        source.cancel();
164        assert!(token.is_cancelled());
165    }
166
167    #[test]
168    fn cancel_propagates_to_all_clones() {
169        let source = CancellationSource::new();
170        let t1 = source.token();
171        let t2 = t1.clone();
172        let t3 = source.token();
173        source.cancel();
174        assert!(t1.is_cancelled());
175        assert!(t2.is_cancelled());
176        assert!(t3.is_cancelled());
177    }
178
179    #[test]
180    fn drop_source_does_not_cancel() {
181        let source = CancellationSource::new();
182        let token = source.token();
183        drop(source);
184        assert!(!token.is_cancelled());
185    }
186
187    #[test]
188    fn wait_timeout_returns_true_when_already_cancelled() {
189        let source = CancellationSource::new();
190        let token = source.token();
191        source.cancel();
192        assert!(token.wait_timeout(Duration::from_secs(10)));
193    }
194
195    #[test]
196    fn wait_timeout_returns_false_on_timeout() {
197        let source = CancellationSource::new();
198        let token = source.token();
199        assert!(!token.wait_timeout(Duration::from_millis(10)));
200    }
201
202    #[test]
203    fn wait_timeout_wakes_on_cancel() {
204        let source = CancellationSource::new();
205        let token = source.token();
206
207        let handle = thread::spawn(move || token.wait_timeout(Duration::from_secs(10)));
208
209        thread::sleep(Duration::from_millis(20));
210        source.cancel();
211
212        let result = handle.join().unwrap();
213        assert!(result);
214    }
215
216    #[test]
217    fn cancel_is_idempotent() {
218        let source = CancellationSource::new();
219        let token = source.token();
220        source.cancel();
221        source.cancel();
222        source.cancel();
223        assert!(token.is_cancelled());
224    }
225
226    #[test]
227    fn token_works_across_threads() {
228        let source = CancellationSource::new();
229        let token = source.token();
230        let flag = Arc::new(AtomicBool::new(false));
231        let flag_clone = flag.clone();
232
233        let handle = thread::spawn(move || {
234            while !token.is_cancelled() {
235                thread::sleep(Duration::from_millis(5));
236            }
237            flag_clone.store(true, AO::SeqCst);
238        });
239
240        thread::sleep(Duration::from_millis(20));
241        source.cancel();
242        handle.join().unwrap();
243        assert!(flag.load(AO::SeqCst));
244    }
245
246    #[test]
247    fn default_creates_uncancelled_source() {
248        let source = CancellationSource::default();
249        assert!(!source.is_cancelled());
250    }
251}