async_cancellation_token/lib.rs
1//! # async-cancellation-token
2//!
3//! `async-cancellation-token` is a lightweight **single-threaded** Rust library that provides
4//! **cancellation tokens** for cooperative cancellation of asynchronous tasks and callbacks.
5//!
6//! This crate is designed for **single-threaded async environments** such as `futures::executor::LocalPool`.
7//! It internally uses `Rc`, `Cell`, and `RefCell`, and is **not thread-safe**.
8//!
9//! Features:
10//! - `CancellationTokenSource` can cancel multiple associated `CancellationToken`s.
11//! - `CancellationToken` can be awaited via `.cancelled()` or checked synchronously.
12//! - Supports registration of **one-time callbacks** (`FnOnce`) that run on cancellation.
13//!
14//! ## Example
15//!
16//! ```rust
17//! use std::time::Duration;
18//! use async_cancellation_token::CancellationTokenSource;
19//! use futures::{FutureExt, executor::LocalPool, pin_mut, select, task::LocalSpawnExt};
20//! use futures_timer::Delay;
21//!
22//! let cts = CancellationTokenSource::new();
23//! let token = cts.token();
24//!
25//! let mut pool = LocalPool::new();
26//! let spawner = pool.spawner();
27//!
28//! // Spawn a task that performs 5 steps but can be cancelled
29//! spawner.spawn_local(async move {
30//! for i in 1..=5 {
31//! let delay = Delay::new(Duration::from_millis(100)).fuse();
32//! let cancelled = token.cancelled().fuse();
33//! pin_mut!(delay, cancelled);
34//!
35//! select! {
36//! _ = delay => println!("Step {i}"),
37//! _ = cancelled => {
38//! println!("Cancelled!");
39//! break;
40//! }
41//! }
42//! }
43//! }.map(|_| ())).unwrap();
44//!
45//! // Cancel after 250ms
46//! spawner.spawn_local(async move {
47//! Delay::new(Duration::from_millis(250)).await;
48//! cts.cancel();
49//! }.map(|_| ())).unwrap();
50//!
51//! pool.run();
52//! ```
53
54use std::{
55 cell::{Cell, RefCell},
56 error::Error,
57 fmt::Display,
58 future::Future,
59 pin::Pin,
60 rc::{Rc, Weak},
61 task::{Context, Poll, Waker},
62};
63
64use slab::Slab;
65
66/// Inner shared state for `CancellationToken` and `CancellationTokenSource`.
67///
68/// This is the single-threaded shared state. All fields are internal and should not
69/// be accessed directly outside the crate.
70///
71/// - `cancelled`: `true` once cancellation has occurred.
72/// - `wakers`: list of wakers for async futures awaiting cancellation.
73/// - `callbacks`: one-time callbacks (`FnOnce`) registered to run on cancellation.
74/// These are stored in a `Slab` to allow stable keys for `CancellationTokenRegistration`.
75#[derive(Default)]
76struct Inner {
77 /// Whether the token has been cancelled.
78 cancelled: Cell<bool>,
79 /// List of wakers to wake when cancellation occurs.
80 wakers: RefCell<Vec<Waker>>,
81 /// List of callbacks to call when cancellation occurs.
82 callbacks: RefCell<Slab<Box<dyn FnOnce()>>>,
83}
84
85/// A source that can cancel associated `CancellationToken`s.
86///
87/// Cancellation is **cooperative** and single-threaded. When cancelled:
88/// - All registered `FnOnce` callbacks are called (in registration order).
89/// - All futures waiting via `CancellationToken::cancelled()` are woken.
90///
91/// # Example
92///
93/// ```rust
94/// use async_cancellation_token::CancellationTokenSource;
95///
96/// let cts = CancellationTokenSource::new();
97/// let token = cts.token();
98///
99/// assert!(!cts.is_cancelled());
100/// cts.cancel();
101/// assert!(cts.is_cancelled());
102/// ```
103#[derive(Clone)]
104pub struct CancellationTokenSource {
105 inner: Rc<Inner>,
106}
107
108/// A token that can be checked for cancellation or awaited.
109///
110/// # Example
111///
112/// ```rust
113/// use async_cancellation_token::CancellationTokenSource;
114/// use futures::{FutureExt, executor::LocalPool, task::LocalSpawnExt};
115///
116/// let cts = CancellationTokenSource::new();
117/// let token = cts.token();
118///
119/// let mut pool = LocalPool::new();
120/// pool.spawner().spawn_local(async move {
121/// token.cancelled().await;
122/// println!("Cancelled!");
123/// }.map(|_| ())).unwrap();
124///
125/// cts.cancel();
126/// pool.run();
127/// ```
128#[derive(Clone)]
129pub struct CancellationToken {
130 inner: Rc<Inner>,
131}
132
133/// Error returned when a cancelled token is checked synchronously.
134#[derive(Copy, Clone, Debug, Default, Eq, Ord, PartialEq, PartialOrd, Hash)]
135pub struct Cancelled;
136
137impl Display for Cancelled {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 f.write_str("cancelled by CancellationTokenSource")
140 }
141}
142
143impl Error for Cancelled {}
144
145impl Default for CancellationTokenSource {
146 fn default() -> Self {
147 Self::new()
148 }
149}
150
151impl CancellationTokenSource {
152 /// Create a new `CancellationTokenSource`.
153 pub fn new() -> Self {
154 Self {
155 inner: Rc::new(Inner::default()),
156 }
157 }
158
159 /// Get a `CancellationToken` associated with this source.
160 pub fn token(&self) -> CancellationToken {
161 CancellationToken {
162 inner: self.inner.clone(),
163 }
164 }
165
166 /// Cancel all associated tokens.
167 ///
168 /// This marks the source as cancelled. After cancellation:
169 /// - All registered callbacks are called exactly once.
170 /// - All waiting futures are woken.
171 ///
172 /// **Note:** Cancellation is **idempotent**; calling this method multiple times has no effect.
173 /// **FnOnce callbacks will only be called once**.
174 ///
175 /// Single-threaded only. Not safe to call concurrently from multiple threads.
176 pub fn cancel(&self) {
177 if !self.inner.cancelled.replace(true) {
178 // Call all registered callbacks
179 for cb in self.inner.callbacks.borrow_mut().drain() {
180 cb();
181 }
182
183 // Wake all tasks waiting for cancellation
184 for w in self.inner.wakers.borrow_mut().drain(..) {
185 w.wake();
186 }
187 }
188 }
189
190 /// Check if this source has been cancelled.
191 pub fn is_cancelled(&self) -> bool {
192 self.inner.cancelled.get()
193 }
194}
195
196impl CancellationToken {
197 /// Check if the token has been cancelled.
198 pub fn is_cancelled(&self) -> bool {
199 self.inner.cancelled.get()
200 }
201
202 /// Synchronously check cancellation and return `Err(Cancelled)` if cancelled.
203 pub fn check_cancelled(&self) -> Result<(), Cancelled> {
204 if self.is_cancelled() {
205 Err(Cancelled)
206 } else {
207 Ok(())
208 }
209 }
210
211 /// Returns a `Future` that completes when the token is cancelled.
212 ///
213 /// # Example
214 ///
215 /// ```rust
216 /// use async_cancellation_token::CancellationTokenSource;
217 /// use futures::{FutureExt, executor::LocalPool, task::LocalSpawnExt};
218 ///
219 /// let cts = CancellationTokenSource::new();
220 /// let token = cts.token();
221 ///
222 /// let mut pool = LocalPool::new();
223 /// pool.spawner().spawn_local(async move {
224 /// token.cancelled().await;
225 /// println!("Cancelled!");
226 /// }.map(|_| ())).unwrap();
227 ///
228 /// cts.cancel();
229 /// pool.run();
230 /// ```
231 pub fn cancelled(&self) -> CancelledFuture {
232 CancelledFuture {
233 token: self.clone(),
234 }
235 }
236
237 /// Register a callback to run when the token is cancelled.
238 ///
239 /// - If the token is **already cancelled**, the callback is called immediately.
240 /// - Otherwise, the callback is stored and will be called exactly once upon cancellation.
241 ///
242 /// Returns a `CancellationTokenRegistration`, which will **remove the callback if dropped
243 /// before cancellation**.
244 ///
245 /// # Example
246 ///
247 /// ```rust
248 /// use std::{cell::Cell, rc::Rc};
249 /// use async_cancellation_token::CancellationTokenSource;
250 ///
251 /// let cts = CancellationTokenSource::new();
252 /// let token = cts.token();
253 ///
254 /// let flag = Rc::new(Cell::new(false));
255 /// let flag_clone = Rc::clone(&flag);
256 ///
257 /// let reg = token.register(move || {
258 /// flag_clone.set(true);
259 /// });
260 ///
261 /// cts.cancel();
262 /// assert!(flag.get());
263 ///
264 /// drop(reg);
265 /// ```
266 pub fn register(&self, f: impl FnOnce() + 'static) -> Option<CancellationTokenRegistration> {
267 if self.is_cancelled() {
268 f();
269 None
270 } else {
271 CancellationTokenRegistration {
272 inner: Rc::downgrade(&self.inner),
273 key: self.inner.callbacks.borrow_mut().insert(Box::new(f)),
274 }
275 .into()
276 }
277 }
278}
279
280/// Represents a registered callback on a `CancellationToken`.
281///
282/// When this object is dropped **before the token is cancelled**, the callback
283/// is automatically removed. If the token is already cancelled, Drop does nothing.
284///
285/// This ensures that callbacks are **only called once** and resources are cleaned up.
286///
287/// **Single-threaded only.** Not safe to use concurrently.
288pub struct CancellationTokenRegistration {
289 inner: Weak<Inner>,
290 key: usize,
291}
292
293impl Drop for CancellationTokenRegistration {
294 fn drop(&mut self) {
295 if let Some(inner) = self.inner.upgrade() {
296 if inner.cancelled.get() {
297 // Callback was already removed
298 return;
299 }
300 let _ = inner.callbacks.borrow_mut().remove(self.key);
301 }
302 }
303}
304
305/// A future that completes when a `CancellationToken` is cancelled.
306///
307/// - If the token is already cancelled, poll returns `Poll::Ready` immediately.
308/// - Otherwise, the future registers its waker and returns `Poll::Pending`.
309///
310/// **Single-threaded only.** Not Send or Sync.
311/// The future will be woken exactly once when the token is cancelled.
312pub struct CancelledFuture {
313 token: CancellationToken,
314}
315
316impl Future for CancelledFuture {
317 type Output = ();
318
319 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
320 if self.token.is_cancelled() {
321 Poll::Ready(())
322 } else {
323 let mut wakers = self.token.inner.wakers.borrow_mut();
324 if !wakers.iter().any(|w| w.will_wake(cx.waker())) {
325 wakers.push(cx.waker().clone());
326 }
327 Poll::Pending
328 }
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use std::cell::Cell;
335 use std::rc::Rc;
336 use std::time::Duration;
337
338 use futures::{FutureExt, executor::LocalPool, pin_mut, select, task::LocalSpawnExt};
339 use futures_timer::Delay;
340
341 use super::*;
342
343 /// Test cooperative cancellation of two tasks with different mechanisms.
344 #[test]
345 fn cancel_two_tasks() {
346 let cancelled_a = Rc::new(Cell::new(false));
347 let cancelled_b = Rc::new(Cell::new(false));
348
349 let task_a = |token: CancellationToken| {
350 let cancelled_a = Rc::clone(&cancelled_a);
351
352 async move {
353 for _ in 1..=5 {
354 let delay = Delay::new(Duration::from_millis(50)).fuse();
355 let cancelled = token.cancelled().fuse();
356
357 pin_mut!(delay, cancelled);
358
359 select! {
360 _ = delay => {},
361 _ = cancelled => {
362 cancelled_a.set(true);
363 break;
364 },
365 }
366 }
367 }
368 };
369
370 let task_b = |token: CancellationToken| {
371 let cancelled_b = Rc::clone(&cancelled_b);
372
373 async move {
374 for _ in 1..=5 {
375 Delay::new(Duration::from_millis(80)).await;
376
377 if token.check_cancelled().is_err() {
378 cancelled_b.set(true);
379 break;
380 }
381 }
382 }
383 };
384
385 let cts = CancellationTokenSource::new();
386 let mut pool = LocalPool::new();
387 let spawner = pool.spawner();
388
389 spawner
390 .spawn_local(task_a(cts.token()).map(|_| ()))
391 .unwrap();
392 spawner
393 .spawn_local(task_b(cts.token()).map(|_| ()))
394 .unwrap();
395
396 // Cancel after 200ms
397 {
398 let cts_clone = cts.clone();
399 spawner
400 .spawn_local(
401 async move {
402 Delay::new(Duration::from_millis(200)).await;
403 cts_clone.cancel();
404 }
405 .map(|_| ()),
406 )
407 .unwrap();
408 }
409
410 pool.run();
411
412 // Cancelled flags should be set
413 assert!(cts.is_cancelled());
414 assert!(cancelled_a.get());
415 assert!(cancelled_b.get());
416
417 // Calling cancel again should not panic or change state
418 cts.cancel();
419 assert!(cts.is_cancelled());
420 }
421
422 /// Test registering callbacks before and after cancellation, including Drop behavior.
423 #[test]
424 fn cancellation_register_callbacks() {
425 let cts = CancellationTokenSource::new();
426 let token = cts.token();
427
428 let flag_before = Rc::new(Cell::new(false));
429 let flag_after = Rc::new(Cell::new(false));
430 let flag_drop = Rc::new(Cell::new(false));
431
432 // 1. Callback registered before cancel → should execute
433 let reg_before = {
434 let flag = Rc::clone(&flag_before);
435 token
436 .register(move || {
437 flag.set(true);
438 })
439 .unwrap()
440 };
441
442 cts.cancel();
443 assert!(flag_before.get());
444
445 drop(reg_before);
446
447 // 2. Callback registered after cancel → executes immediately
448 {
449 let flag = Rc::clone(&flag_after);
450 token.register(move || {
451 flag.set(true);
452 });
453 }
454 assert!(flag_after.get());
455
456 // 3. Callback registered but dropped before cancel → should NOT execute
457 let token2 = CancellationTokenSource::new().token();
458 let reg_drop = {
459 let flag = Rc::clone(&flag_drop);
460 token2
461 .register(move || {
462 flag.set(true);
463 })
464 .unwrap()
465 };
466 drop(reg_drop); // dropped before cancel
467 token2.inner.cancelled.set(true); // force cancel
468 assert!(!flag_drop.get());
469 }
470
471 /// Test that CancelledFuture returns Poll::Ready after cancellation
472 #[test]
473 fn cancelled_future_poll_ready() {
474 let cts = CancellationTokenSource::new();
475 let token = cts.token();
476 let mut pool = LocalPool::new();
477 let spawner = pool.spawner();
478
479 let finished = Rc::new(Cell::new(false));
480 let finished_clone = Rc::clone(&finished);
481
482 spawner
483 .spawn_local(
484 async move {
485 token.cancelled().await;
486 finished_clone.set(true);
487 }
488 .map(|_| ()),
489 )
490 .unwrap();
491
492 // Cancel token
493 cts.cancel();
494
495 pool.run();
496 assert!(finished.get());
497 }
498
499 /// Test multiple callbacks and idempotent cancellation
500 #[test]
501 fn multiple_callbacks_and_idempotent_cancel() {
502 let cts = CancellationTokenSource::new();
503 let token = cts.token();
504
505 let flags: Vec<_> = (0..3).map(|_| Rc::new(Cell::new(false))).collect();
506
507 let regs: Vec<_> = flags
508 .iter()
509 .map(|flag| {
510 let f = Rc::clone(flag);
511 token
512 .register(move || {
513 f.set(true);
514 })
515 .unwrap()
516 })
517 .collect();
518
519 // Cancel once
520 cts.cancel();
521 for flag in &flags {
522 assert!(flag.get());
523 }
524
525 // Cancel again → should not panic, flags remain true
526 cts.cancel();
527 for flag in &flags {
528 assert!(flag.get());
529 }
530
531 drop(regs); // dropping after cancel → nothing happens, still safe
532 }
533}