1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
// SPDX-License-Identifier: Apache-2.0
//! Cooperative cancellation tokens for commands and tasks.
//!
//! [`CancellationToken`] provides a thread-safe, cloneable signal that tasks
//! can poll to detect cooperative cancellation requests. It extends the
//! subscription [`StopSignal`](crate::StopSignal) pattern to the command/task
//! domain, enabling bounded-lifetime effects and graceful teardown.
//!
//! # Migration rationale
//!
//! Web frameworks use `AbortController` / `AbortSignal` for effect cancellation.
//! This module provides an equivalent Rust-native primitive that the migration
//! code emitter can target when translating cancellable async workflows.
//!
//! # Example
//!
//! ```
//! use ftui_runtime::cancellation::{CancellationSource, CancellationToken};
//! use std::time::Duration;
//!
//! let source = CancellationSource::new();
//! let token = source.token();
//!
//! // Pass token to a background task
//! std::thread::spawn(move || {
//! while !token.is_cancelled() {
//! // do work...
//! std::thread::sleep(Duration::from_millis(10));
//! }
//! });
//!
//! // Cancel from the control side
//! source.cancel();
//! ```
#![forbid(unsafe_code)]
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use web_time::Duration;
/// A thread-safe, cloneable cancellation token.
///
/// Tasks and effects receive a token and poll [`is_cancelled`](Self::is_cancelled)
/// to detect cancellation requests. Tokens are cheap to clone and share across
/// thread boundaries.
#[derive(Clone)]
pub struct CancellationToken {
inner: Arc<CancellationInner>,
}
/// The control handle that triggers cancellation.
///
/// Dropping the source does **not** cancel the token — call [`cancel`](Self::cancel)
/// explicitly. This prevents accidental cancellation on scope exit.
pub struct CancellationSource {
inner: Arc<CancellationInner>,
}
struct CancellationInner {
cancelled: AtomicBool,
notify: (Mutex<()>, Condvar),
}
impl CancellationSource {
/// Create a new cancellation source with an uncancelled token.
pub fn new() -> Self {
Self {
inner: Arc::new(CancellationInner {
cancelled: AtomicBool::new(false),
notify: (Mutex::new(()), Condvar::new()),
}),
}
}
/// Obtain a cloneable token that observes this source's state.
pub fn token(&self) -> CancellationToken {
CancellationToken {
inner: Arc::clone(&self.inner),
}
}
/// Signal cancellation. All tokens derived from this source will observe
/// `is_cancelled() == true` and any pending `wait_timeout` calls will wake.
pub fn cancel(&self) {
self.inner.cancelled.store(true, Ordering::Release);
let (lock, cvar) = &self.inner.notify;
let _guard = lock.lock().unwrap_or_else(|e| e.into_inner());
cvar.notify_all();
}
/// Check whether cancellation has already been requested.
pub fn is_cancelled(&self) -> bool {
self.inner.cancelled.load(Ordering::Acquire)
}
}
impl Default for CancellationSource {
fn default() -> Self {
Self::new()
}
}
impl CancellationToken {
/// Returns `true` if cancellation has been requested.
#[inline]
pub fn is_cancelled(&self) -> bool {
self.inner.cancelled.load(Ordering::Acquire)
}
/// Block until either cancellation is requested or the timeout elapses.
///
/// Returns `true` if cancelled, `false` if timed out.
pub fn wait_timeout(&self, duration: Duration) -> bool {
if self.is_cancelled() {
return true;
}
let (lock, cvar) = &self.inner.notify;
let mut guard = lock.lock().unwrap_or_else(|e| e.into_inner());
let start = web_time::Instant::now();
let mut remaining = duration;
loop {
if self.is_cancelled() {
return true;
}
let (new_guard, result) = cvar
.wait_timeout(guard, remaining)
.unwrap_or_else(|e| e.into_inner());
guard = new_guard;
if self.is_cancelled() {
return true;
}
if result.timed_out() {
return false;
}
let elapsed = start.elapsed();
if elapsed >= duration {
return false;
}
remaining = duration - elapsed;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::Ordering as AO;
use std::thread;
#[test]
fn token_starts_uncancelled() {
let source = CancellationSource::new();
let token = source.token();
assert!(!token.is_cancelled());
assert!(!source.is_cancelled());
}
#[test]
fn cancel_propagates_to_token() {
let source = CancellationSource::new();
let token = source.token();
source.cancel();
assert!(token.is_cancelled());
}
#[test]
fn cancel_propagates_to_all_clones() {
let source = CancellationSource::new();
let t1 = source.token();
let t2 = t1.clone();
let t3 = source.token();
source.cancel();
assert!(t1.is_cancelled());
assert!(t2.is_cancelled());
assert!(t3.is_cancelled());
}
#[test]
fn drop_source_does_not_cancel() {
let source = CancellationSource::new();
let token = source.token();
drop(source);
assert!(!token.is_cancelled());
}
#[test]
fn wait_timeout_returns_true_when_already_cancelled() {
let source = CancellationSource::new();
let token = source.token();
source.cancel();
assert!(token.wait_timeout(Duration::from_secs(10)));
}
#[test]
fn wait_timeout_returns_false_on_timeout() {
let source = CancellationSource::new();
let token = source.token();
assert!(!token.wait_timeout(Duration::from_millis(10)));
}
#[test]
fn wait_timeout_wakes_on_cancel() {
let source = CancellationSource::new();
let token = source.token();
let handle = thread::spawn(move || token.wait_timeout(Duration::from_secs(10)));
thread::sleep(Duration::from_millis(20));
source.cancel();
let result = handle.join().unwrap();
assert!(result);
}
#[test]
fn cancel_is_idempotent() {
let source = CancellationSource::new();
let token = source.token();
source.cancel();
source.cancel();
source.cancel();
assert!(token.is_cancelled());
}
#[test]
fn token_works_across_threads() {
let source = CancellationSource::new();
let token = source.token();
let flag = Arc::new(AtomicBool::new(false));
let flag_clone = flag.clone();
let handle = thread::spawn(move || {
while !token.is_cancelled() {
thread::sleep(Duration::from_millis(5));
}
flag_clone.store(true, AO::SeqCst);
});
thread::sleep(Duration::from_millis(20));
source.cancel();
handle.join().unwrap();
assert!(flag.load(AO::SeqCst));
}
#[test]
fn default_creates_uncancelled_source() {
let source = CancellationSource::default();
assert!(!source.is_cancelled());
}
}