Skip to main content

chartml_core/resolver/
cancel.rs

1//! Cooperative cancellation for in-flight provider work.
2//!
3//! `CancellationToken` is a thin `Arc<AtomicBool>` plus a waker list so async
4//! tasks can yield until cancellation flips. No external runtime dependency:
5//! `Arc<AtomicBool>` + `std::sync::Mutex<Vec<Waker>>` works equivalently on
6//! native and WASM (`?Send`).
7//!
8//! Phase 3 always passes `None` from the resolver — providers that opt into
9//! listening for cancellation can do so without breaking the trait signature
10//! when chartml 5.x starts emitting tokens (e.g. for tab-close cleanup).
11
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::sync::{Arc, Mutex};
16use std::task::{Context, Poll, Waker};
17
18/// Inner state shared by every clone of a token. Cheap to construct; the
19/// waker list stays empty unless somebody actually awaits `cancelled()`.
20struct CancelInner {
21    cancelled: AtomicBool,
22    wakers: Mutex<Vec<Waker>>,
23}
24
25/// Cooperative cancellation handle. `Clone` is cheap (just an `Arc`).
26///
27/// `is_cancelled()` is a non-blocking check that providers can poll between
28/// chunks of work. `cancelled()` returns a future that resolves the moment
29/// cancellation flips, suitable for `tokio::select!` or `futures::select!`
30/// against an upstream operation.
31#[derive(Clone)]
32pub struct CancellationToken(Arc<CancelInner>);
33
34impl Default for CancellationToken {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl CancellationToken {
41    /// Create a fresh, un-cancelled token.
42    pub fn new() -> Self {
43        Self(Arc::new(CancelInner {
44            cancelled: AtomicBool::new(false),
45            wakers: Mutex::new(Vec::new()),
46        }))
47    }
48
49    /// Flip the cancellation flag and wake every pending `cancelled()` future.
50    /// Idempotent — calling twice is harmless.
51    pub fn cancel(&self) {
52        // Use `swap` so we only wake wakers on the first flip; subsequent
53        // calls observe the already-true flag and return immediately.
54        if !self.0.cancelled.swap(true, Ordering::SeqCst) {
55            let mut wakers = self.0.wakers.lock().expect("cancel waker lock poisoned");
56            for waker in wakers.drain(..) {
57                waker.wake();
58            }
59        }
60    }
61
62    /// Non-blocking check — returns `true` once `cancel()` has been called.
63    pub fn is_cancelled(&self) -> bool {
64        self.0.cancelled.load(Ordering::SeqCst)
65    }
66
67    /// Future that resolves when cancellation flips. Pending until then.
68    /// Cheap to construct; stores at most one waker per polling task.
69    pub fn cancelled(&self) -> Cancelled<'_> {
70        Cancelled { token: self }
71    }
72}
73
74impl std::fmt::Debug for CancellationToken {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        f.debug_struct("CancellationToken")
77            .field("cancelled", &self.is_cancelled())
78            .finish()
79    }
80}
81
82/// Future returned by `CancellationToken::cancelled()`.
83pub struct Cancelled<'a> {
84    token: &'a CancellationToken,
85}
86
87impl Future for Cancelled<'_> {
88    type Output = ();
89
90    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
91        if self.token.is_cancelled() {
92            return Poll::Ready(());
93        }
94        // Register the current waker so `cancel()` can wake us up later.
95        let mut wakers = self
96            .token
97            .0
98            .wakers
99            .lock()
100            .expect("cancel waker lock poisoned");
101        // Re-check after acquiring the lock so we don't miss a cancel that
102        // raced between the load above and the lock acquisition.
103        if self.token.is_cancelled() {
104            return Poll::Ready(());
105        }
106        // Replace any stale waker for this task (poll may move tasks).
107        if !wakers.iter().any(|w| w.will_wake(cx.waker())) {
108            wakers.push(cx.waker().clone());
109        }
110        Poll::Pending
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    #![allow(clippy::unwrap_used)]
117    use super::*;
118
119    #[test]
120    fn cancel_flips_flag() {
121        let t = CancellationToken::new();
122        assert!(!t.is_cancelled());
123        t.cancel();
124        assert!(t.is_cancelled());
125    }
126
127    #[test]
128    fn cancel_is_idempotent() {
129        let t = CancellationToken::new();
130        t.cancel();
131        t.cancel(); // must not panic / poison
132        assert!(t.is_cancelled());
133    }
134
135    #[test]
136    fn clones_share_state() {
137        let t = CancellationToken::new();
138        let t2 = t.clone();
139        t.cancel();
140        assert!(t2.is_cancelled());
141    }
142
143    #[tokio::test]
144    async fn cancelled_future_resolves_after_cancel() {
145        let t = CancellationToken::new();
146        let t2 = t.clone();
147        // Spawn a task that flips cancellation after a short delay.
148        tokio::spawn(async move {
149            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
150            t2.cancel();
151        });
152        t.cancelled().await;
153        assert!(t.is_cancelled());
154    }
155
156    #[tokio::test]
157    async fn cancelled_returns_immediately_if_already_cancelled() {
158        let t = CancellationToken::new();
159        t.cancel();
160        // Should not block — if it does, the test will time out.
161        t.cancelled().await;
162    }
163}