Skip to main content

chartml_core/resolver/
cancel.rs

1//! Cooperative cancellation for in-flight provider work.
2//!
3//! `CancellationToken` is a thin `Arc<AtomicBool>` plus a waker list so async
4//! tasks can yield until cancellation flips. No external runtime dependency:
5//! `Arc<AtomicBool>` + `std::sync::Mutex<Vec<Waker>>` works equivalently on
6//! native and WASM (`?Send`).
7//!
8//! Phase 3 always passes `None` from the resolver — providers that opt into
9//! listening for cancellation can do so without breaking the trait signature
10//! when chartml 5.x starts emitting tokens (e.g. for tab-close cleanup).
11
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::sync::{Arc, Mutex};
16use std::task::{Context, Poll, Waker};
17
18/// Inner state shared by every clone of a token. Cheap to construct; the
19/// waker list stays empty unless somebody actually awaits `cancelled()`.
20struct CancelInner {
21    cancelled: AtomicBool,
22    wakers: Mutex<Vec<Waker>>,
23}
24
25/// Cooperative cancellation handle. `Clone` is cheap (just an `Arc`).
26///
27/// `is_cancelled()` is a non-blocking check that providers can poll between
28/// chunks of work. `cancelled()` returns a future that resolves the moment
29/// cancellation flips, suitable for `tokio::select!` or `futures::select!`
30/// against an upstream operation.
31#[derive(Clone)]
32pub struct CancellationToken(Arc<CancelInner>);
33
34impl Default for CancellationToken {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl CancellationToken {
41    /// Create a fresh, un-cancelled token.
42    pub fn new() -> Self {
43        Self(Arc::new(CancelInner {
44            cancelled: AtomicBool::new(false),
45            wakers: Mutex::new(Vec::new()),
46        }))
47    }
48
49    /// Flip the cancellation flag and wake every pending `cancelled()` future.
50    /// Idempotent — calling twice is harmless.
51    pub fn cancel(&self) {
52        // Use `swap` so we only wake wakers on the first flip; subsequent
53        // calls observe the already-true flag and return immediately.
54        if !self.0.cancelled.swap(true, Ordering::SeqCst) {
55            let mut wakers = self.0.wakers.lock().expect("cancel waker lock poisoned");
56            for waker in wakers.drain(..) {
57                waker.wake();
58            }
59        }
60    }
61
62    /// Non-blocking check — returns `true` once `cancel()` has been called.
63    pub fn is_cancelled(&self) -> bool {
64        self.0.cancelled.load(Ordering::SeqCst)
65    }
66
67    /// Future that resolves when cancellation flips. Pending until then.
68    /// Cheap to construct; stores at most one waker per polling task.
69    pub fn cancelled(&self) -> Cancelled<'_> {
70        Cancelled { token: self }
71    }
72}
73
74impl std::fmt::Debug for CancellationToken {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        f.debug_struct("CancellationToken")
77            .field("cancelled", &self.is_cancelled())
78            .finish()
79    }
80}
81
82/// Future returned by `CancellationToken::cancelled()`.
83pub struct Cancelled<'a> {
84    token: &'a CancellationToken,
85}
86
87impl Future for Cancelled<'_> {
88    type Output = ();
89
90    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
91        if self.token.is_cancelled() {
92            return Poll::Ready(());
93        }
94        // Register the current waker so `cancel()` can wake us up later.
95        let mut wakers = self
96            .token
97            .0
98            .wakers
99            .lock()
100            .expect("cancel waker lock poisoned");
101        // Re-check after acquiring the lock so we don't miss a cancel that
102        // raced between the load above and the lock acquisition.
103        if self.token.is_cancelled() {
104            return Poll::Ready(());
105        }
106        // Replace any stale waker for this task (poll may move tasks).
107        if !wakers.iter().any(|w| w.will_wake(cx.waker())) {
108            wakers.push(cx.waker().clone());
109        }
110        Poll::Pending
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117
118    #[test]
119    fn cancel_flips_flag() {
120        let t = CancellationToken::new();
121        assert!(!t.is_cancelled());
122        t.cancel();
123        assert!(t.is_cancelled());
124    }
125
126    #[test]
127    fn cancel_is_idempotent() {
128        let t = CancellationToken::new();
129        t.cancel();
130        t.cancel(); // must not panic / poison
131        assert!(t.is_cancelled());
132    }
133
134    #[test]
135    fn clones_share_state() {
136        let t = CancellationToken::new();
137        let t2 = t.clone();
138        t.cancel();
139        assert!(t2.is_cancelled());
140    }
141
142    #[tokio::test]
143    async fn cancelled_future_resolves_after_cancel() {
144        let t = CancellationToken::new();
145        let t2 = t.clone();
146        // Spawn a task that flips cancellation after a short delay.
147        tokio::spawn(async move {
148            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
149            t2.cancel();
150        });
151        t.cancelled().await;
152        assert!(t.is_cancelled());
153    }
154
155    #[tokio::test]
156    async fn cancelled_returns_immediately_if_already_cancelled() {
157        let t = CancellationToken::new();
158        t.cancel();
159        // Should not block — if it does, the test will time out.
160        t.cancelled().await;
161    }
162}