async_cancellation_token/
lib.rs

1//! # async-cancellation-token
2//!
3//! `async-cancellation-token` is a lightweight **single-threaded** Rust library that provides
4//! **cancellation tokens** for cooperative cancellation of asynchronous tasks and callbacks.
5//!
6//! This crate is designed for **single-threaded async environments** such as `futures::executor::LocalPool`.
7//! It internally uses `Rc`, `Cell`, and `RefCell`, and is **not thread-safe**.
8//!
9//! Features:
10//! - `CancellationTokenSource` can cancel multiple associated `CancellationToken`s.
11//! - `CancellationToken` can be awaited via `.cancelled()` or checked synchronously.
12//! - Supports registration of **one-time callbacks** (`FnOnce`) that run on cancellation.
13//!
14//! ## Example
15//!
16//! ```rust
17//! use std::time::Duration;
18//! use async_cancellation_token::CancellationTokenSource;
19//! use futures::{FutureExt, executor::LocalPool, pin_mut, select, task::LocalSpawnExt};
20//! use futures_timer::Delay;
21//!
22//! let cts = CancellationTokenSource::new();
23//! let token = cts.token();
24//!
25//! let mut pool = LocalPool::new();
26//! let spawner = pool.spawner();
27//!
28//! // Spawn a task that performs 5 steps but can be cancelled
29//! spawner.spawn_local(async move {
30//!     for i in 1..=5 {
31//!         let delay = Delay::new(Duration::from_millis(100)).fuse();
32//!         let cancelled = token.cancelled().fuse();
33//!         pin_mut!(delay, cancelled);
34//!
35//!         select! {
36//!             _ = delay => println!("Step {i}"),
37//!             _ = cancelled => {
38//!                 println!("Cancelled!");
39//!                 break;
40//!             }
41//!         }
42//!     }
43//! }.map(|_| ())).unwrap();
44//!
45//! // Cancel after 250ms
46//! spawner.spawn_local(async move {
47//!     Delay::new(Duration::from_millis(250)).await;
48//!     cts.cancel();
49//! }.map(|_| ())).unwrap();
50//!
51//! pool.run();
52//! ```
53
54use std::{
55    cell::{Cell, RefCell},
56    error::Error,
57    fmt::Display,
58    future::Future,
59    pin::Pin,
60    rc::{Rc, Weak},
61    task::{Context, Poll, Waker},
62};
63
64use slab::Slab;
65
66/// Inner shared state for `CancellationToken` and `CancellationTokenSource`.
67///
68/// This is the single-threaded shared state. All fields are internal and should not
69/// be accessed directly outside the crate.
70///
71/// - `cancelled`: `true` once cancellation has occurred.
72/// - `wakers`: list of wakers for async futures awaiting cancellation.
73/// - `callbacks`: one-time callbacks (`FnOnce`) registered to run on cancellation.
74///   These are stored in a `Slab` to allow stable keys for `CancellationTokenRegistration`.
75#[derive(Default)]
76struct Inner {
77    /// Whether the token has been cancelled.
78    cancelled: Cell<bool>,
79    /// List of wakers to wake when cancellation occurs.
80    wakers: RefCell<Vec<Waker>>,
81    /// List of callbacks to call when cancellation occurs.
82    callbacks: RefCell<Slab<Box<dyn FnOnce()>>>,
83}
84
85/// A source that can cancel associated `CancellationToken`s.
86///
87/// Cancellation is **cooperative** and single-threaded. When cancelled:
88/// - All registered `FnOnce` callbacks are called (in registration order).
89/// - All futures waiting via `CancellationToken::cancelled()` are woken.
90///
91/// # Example
92///
93/// ```rust
94/// use async_cancellation_token::CancellationTokenSource;
95///
96/// let cts = CancellationTokenSource::new();
97/// let token = cts.token();
98///
99/// assert!(!cts.is_cancelled());
100/// cts.cancel();
101/// assert!(cts.is_cancelled());
102/// ```
103#[derive(Clone)]
104pub struct CancellationTokenSource {
105    inner: Rc<Inner>,
106}
107
108/// A token that can be checked for cancellation or awaited.
109///
110/// # Example
111///
112/// ```rust
113/// use async_cancellation_token::CancellationTokenSource;
114/// use futures::{FutureExt, executor::LocalPool, task::LocalSpawnExt};
115///
116/// let cts = CancellationTokenSource::new();
117/// let token = cts.token();
118///
119/// let mut pool = LocalPool::new();
120/// pool.spawner().spawn_local(async move {
121///     token.cancelled().await;
122///     println!("Cancelled!");
123/// }.map(|_| ())).unwrap();
124///
125/// cts.cancel();
126/// pool.run();
127/// ```
128#[derive(Clone)]
129pub struct CancellationToken {
130    inner: Rc<Inner>,
131}
132
133/// Error returned when a cancelled token is checked synchronously.
134#[derive(Copy, Clone, Debug, Default, Eq, Ord, PartialEq, PartialOrd, Hash)]
135pub struct Cancelled;
136
137impl Display for Cancelled {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        f.write_str("cancelled by CancellationTokenSource")
140    }
141}
142
143impl Error for Cancelled {}
144
145impl Default for CancellationTokenSource {
146    fn default() -> Self {
147        Self::new()
148    }
149}
150
151impl CancellationTokenSource {
152    /// Create a new `CancellationTokenSource`.
153    pub fn new() -> Self {
154        Self {
155            inner: Rc::new(Inner::default()),
156        }
157    }
158
159    /// Get a `CancellationToken` associated with this source.
160    pub fn token(&self) -> CancellationToken {
161        CancellationToken {
162            inner: self.inner.clone(),
163        }
164    }
165
166    /// Cancel all associated tokens.
167    ///
168    /// This marks the source as cancelled. After cancellation:
169    /// - All registered callbacks are called exactly once.
170    /// - All waiting futures are woken.
171    ///
172    /// **Note:** Cancellation is **idempotent**; calling this method multiple times has no effect.
173    /// **FnOnce callbacks will only be called once**.
174    ///
175    /// Single-threaded only. Not safe to call concurrently from multiple threads.
176    pub fn cancel(&self) {
177        if !self.inner.cancelled.replace(true) {
178            // Call all registered callbacks
179            for cb in self.inner.callbacks.borrow_mut().drain() {
180                cb();
181            }
182
183            // Wake all tasks waiting for cancellation
184            for w in self.inner.wakers.borrow_mut().drain(..) {
185                w.wake();
186            }
187        }
188    }
189
190    /// Check if this source has been cancelled.
191    pub fn is_cancelled(&self) -> bool {
192        self.inner.cancelled.get()
193    }
194}
195
196impl CancellationToken {
197    /// Check if the token has been cancelled.
198    pub fn is_cancelled(&self) -> bool {
199        self.inner.cancelled.get()
200    }
201
202    /// Synchronously check cancellation and return `Err(Cancelled)` if cancelled.
203    pub fn check_cancelled(&self) -> Result<(), Cancelled> {
204        if self.is_cancelled() {
205            Err(Cancelled)
206        } else {
207            Ok(())
208        }
209    }
210
211    /// Returns a `Future` that completes when the token is cancelled.
212    ///
213    /// # Example
214    ///
215    /// ```rust
216    /// use async_cancellation_token::CancellationTokenSource;
217    /// use futures::{FutureExt, executor::LocalPool, task::LocalSpawnExt};
218    ///
219    /// let cts = CancellationTokenSource::new();
220    /// let token = cts.token();
221    ///
222    /// let mut pool = LocalPool::new();
223    /// pool.spawner().spawn_local(async move {
224    ///     token.cancelled().await;
225    ///     println!("Cancelled!");
226    /// }.map(|_| ())).unwrap();
227    ///
228    /// cts.cancel();
229    /// pool.run();
230    /// ```
231    pub fn cancelled(&self) -> CancelledFuture {
232        CancelledFuture {
233            token: self.clone(),
234        }
235    }
236
237    /// Register a callback to run when the token is cancelled.
238    ///
239    /// - If the token is **already cancelled**, the callback is called immediately.
240    /// - Otherwise, the callback is stored and will be called exactly once upon cancellation.
241    ///
242    /// Returns a `CancellationTokenRegistration`, which will **remove the callback if dropped
243    /// before cancellation**.
244    ///
245    /// # Example
246    ///
247    /// ```rust
248    /// use std::{cell::Cell, rc::Rc};
249    /// use async_cancellation_token::CancellationTokenSource;
250    ///
251    /// let cts = CancellationTokenSource::new();
252    /// let token = cts.token();
253    ///
254    /// let flag = Rc::new(Cell::new(false));
255    /// let flag_clone = Rc::clone(&flag);
256    ///
257    /// let reg = token.register(move || {
258    ///     flag_clone.set(true);
259    /// });
260    ///
261    /// cts.cancel();
262    /// assert!(flag.get());
263    ///
264    /// drop(reg);
265    /// ```
266    pub fn register(&self, f: impl FnOnce() + 'static) -> Option<CancellationTokenRegistration> {
267        if self.is_cancelled() {
268            f();
269            None
270        } else {
271            CancellationTokenRegistration {
272                inner: Rc::downgrade(&self.inner),
273                key: self.inner.callbacks.borrow_mut().insert(Box::new(f)),
274            }
275            .into()
276        }
277    }
278}
279
280/// Represents a registered callback on a `CancellationToken`.
281///
282/// When this object is dropped **before the token is cancelled**, the callback
283/// is automatically removed. If the token is already cancelled, Drop does nothing.
284///
285/// This ensures that callbacks are **only called once** and resources are cleaned up.
286///
287/// **Single-threaded only.** Not safe to use concurrently.
288pub struct CancellationTokenRegistration {
289    inner: Weak<Inner>,
290    key: usize,
291}
292
293impl Drop for CancellationTokenRegistration {
294    fn drop(&mut self) {
295        if let Some(inner) = self.inner.upgrade() {
296            if inner.cancelled.get() {
297                // Callback was already removed
298                return;
299            }
300            let _ = inner.callbacks.borrow_mut().remove(self.key);
301        }
302    }
303}
304
305/// A future that completes when a `CancellationToken` is cancelled.
306///
307/// - If the token is already cancelled, poll returns `Poll::Ready` immediately.
308/// - Otherwise, the future registers its waker and returns `Poll::Pending`.
309///
310/// **Single-threaded only.** Not Send or Sync.
311/// The future will be woken exactly once when the token is cancelled.
312pub struct CancelledFuture {
313    token: CancellationToken,
314}
315
316impl Future for CancelledFuture {
317    type Output = ();
318
319    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
320        if self.token.is_cancelled() {
321            Poll::Ready(())
322        } else {
323            let mut wakers = self.token.inner.wakers.borrow_mut();
324            if !wakers.iter().any(|w| w.will_wake(cx.waker())) {
325                wakers.push(cx.waker().clone());
326            }
327            Poll::Pending
328        }
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use std::cell::Cell;
335    use std::rc::Rc;
336    use std::time::Duration;
337
338    use futures::{FutureExt, executor::LocalPool, pin_mut, select, task::LocalSpawnExt};
339    use futures_timer::Delay;
340
341    use super::*;
342
343    /// Test cooperative cancellation of two tasks with different mechanisms.
344    #[test]
345    fn cancel_two_tasks() {
346        let cancelled_a = Rc::new(Cell::new(false));
347        let cancelled_b = Rc::new(Cell::new(false));
348
349        let task_a = |token: CancellationToken| {
350            let cancelled_a = Rc::clone(&cancelled_a);
351
352            async move {
353                for _ in 1..=5 {
354                    let delay = Delay::new(Duration::from_millis(50)).fuse();
355                    let cancelled = token.cancelled().fuse();
356
357                    pin_mut!(delay, cancelled);
358
359                    select! {
360                        _ = delay => {},
361                        _ = cancelled => {
362                            cancelled_a.set(true);
363                            break;
364                        },
365                    }
366                }
367            }
368        };
369
370        let task_b = |token: CancellationToken| {
371            let cancelled_b = Rc::clone(&cancelled_b);
372
373            async move {
374                for _ in 1..=5 {
375                    Delay::new(Duration::from_millis(80)).await;
376
377                    if token.check_cancelled().is_err() {
378                        cancelled_b.set(true);
379                        break;
380                    }
381                }
382            }
383        };
384
385        let cts = CancellationTokenSource::new();
386        let mut pool = LocalPool::new();
387        let spawner = pool.spawner();
388
389        spawner
390            .spawn_local(task_a(cts.token()).map(|_| ()))
391            .unwrap();
392        spawner
393            .spawn_local(task_b(cts.token()).map(|_| ()))
394            .unwrap();
395
396        // Cancel after 200ms
397        {
398            let cts_clone = cts.clone();
399            spawner
400                .spawn_local(
401                    async move {
402                        Delay::new(Duration::from_millis(200)).await;
403                        cts_clone.cancel();
404                    }
405                    .map(|_| ()),
406                )
407                .unwrap();
408        }
409
410        pool.run();
411
412        // Cancelled flags should be set
413        assert!(cts.is_cancelled());
414        assert!(cancelled_a.get());
415        assert!(cancelled_b.get());
416
417        // Calling cancel again should not panic or change state
418        cts.cancel();
419        assert!(cts.is_cancelled());
420    }
421
422    /// Test registering callbacks before and after cancellation, including Drop behavior.
423    #[test]
424    fn cancellation_register_callbacks() {
425        let cts = CancellationTokenSource::new();
426        let token = cts.token();
427
428        let flag_before = Rc::new(Cell::new(false));
429        let flag_after = Rc::new(Cell::new(false));
430        let flag_drop = Rc::new(Cell::new(false));
431
432        // 1. Callback registered before cancel → should execute
433        let reg_before = {
434            let flag = Rc::clone(&flag_before);
435            token
436                .register(move || {
437                    flag.set(true);
438                })
439                .unwrap()
440        };
441
442        cts.cancel();
443        assert!(flag_before.get());
444
445        drop(reg_before);
446
447        // 2. Callback registered after cancel → executes immediately
448        {
449            let flag = Rc::clone(&flag_after);
450            token.register(move || {
451                flag.set(true);
452            });
453        }
454        assert!(flag_after.get());
455
456        // 3. Callback registered but dropped before cancel → should NOT execute
457        let token2 = CancellationTokenSource::new().token();
458        let reg_drop = {
459            let flag = Rc::clone(&flag_drop);
460            token2
461                .register(move || {
462                    flag.set(true);
463                })
464                .unwrap()
465        };
466        drop(reg_drop); // dropped before cancel
467        token2.inner.cancelled.set(true); // force cancel
468        assert!(!flag_drop.get());
469    }
470
471    /// Test that CancelledFuture returns Poll::Ready after cancellation
472    #[test]
473    fn cancelled_future_poll_ready() {
474        let cts = CancellationTokenSource::new();
475        let token = cts.token();
476        let mut pool = LocalPool::new();
477        let spawner = pool.spawner();
478
479        let finished = Rc::new(Cell::new(false));
480        let finished_clone = Rc::clone(&finished);
481
482        spawner
483            .spawn_local(
484                async move {
485                    token.cancelled().await;
486                    finished_clone.set(true);
487                }
488                .map(|_| ()),
489            )
490            .unwrap();
491
492        // Cancel token
493        cts.cancel();
494
495        pool.run();
496        assert!(finished.get());
497    }
498
499    /// Test multiple callbacks and idempotent cancellation
500    #[test]
501    fn multiple_callbacks_and_idempotent_cancel() {
502        let cts = CancellationTokenSource::new();
503        let token = cts.token();
504
505        let flags: Vec<_> = (0..3).map(|_| Rc::new(Cell::new(false))).collect();
506
507        let regs: Vec<_> = flags
508            .iter()
509            .map(|flag| {
510                let f = Rc::clone(flag);
511                token
512                    .register(move || {
513                        f.set(true);
514                    })
515                    .unwrap()
516            })
517            .collect();
518
519        // Cancel once
520        cts.cancel();
521        for flag in &flags {
522            assert!(flag.get());
523        }
524
525        // Cancel again → should not panic, flags remain true
526        cts.cancel();
527        for flag in &flags {
528            assert!(flag.get());
529        }
530
531        drop(regs); // dropping after cancel → nothing happens, still safe
532    }
533}