chartml_core/resolver/
cancel.rs1use std::future::Future;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::sync::{Arc, Mutex};
16use std::task::{Context, Poll, Waker};
17
18struct CancelInner {
21 cancelled: AtomicBool,
22 wakers: Mutex<Vec<Waker>>,
23}
24
25#[derive(Clone)]
32pub struct CancellationToken(Arc<CancelInner>);
33
34impl Default for CancellationToken {
35 fn default() -> Self {
36 Self::new()
37 }
38}
39
40impl CancellationToken {
41 pub fn new() -> Self {
43 Self(Arc::new(CancelInner {
44 cancelled: AtomicBool::new(false),
45 wakers: Mutex::new(Vec::new()),
46 }))
47 }
48
49 pub fn cancel(&self) {
52 if !self.0.cancelled.swap(true, Ordering::SeqCst) {
55 let mut wakers = self.0.wakers.lock().expect("cancel waker lock poisoned");
56 for waker in wakers.drain(..) {
57 waker.wake();
58 }
59 }
60 }
61
62 pub fn is_cancelled(&self) -> bool {
64 self.0.cancelled.load(Ordering::SeqCst)
65 }
66
67 pub fn cancelled(&self) -> Cancelled<'_> {
70 Cancelled { token: self }
71 }
72}
73
74impl std::fmt::Debug for CancellationToken {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 f.debug_struct("CancellationToken")
77 .field("cancelled", &self.is_cancelled())
78 .finish()
79 }
80}
81
82pub struct Cancelled<'a> {
84 token: &'a CancellationToken,
85}
86
87impl Future for Cancelled<'_> {
88 type Output = ();
89
90 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
91 if self.token.is_cancelled() {
92 return Poll::Ready(());
93 }
94 let mut wakers = self
96 .token
97 .0
98 .wakers
99 .lock()
100 .expect("cancel waker lock poisoned");
101 if self.token.is_cancelled() {
104 return Poll::Ready(());
105 }
106 if !wakers.iter().any(|w| w.will_wake(cx.waker())) {
108 wakers.push(cx.waker().clone());
109 }
110 Poll::Pending
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 #![allow(clippy::unwrap_used)]
117 use super::*;
118
119 #[test]
120 fn cancel_flips_flag() {
121 let t = CancellationToken::new();
122 assert!(!t.is_cancelled());
123 t.cancel();
124 assert!(t.is_cancelled());
125 }
126
127 #[test]
128 fn cancel_is_idempotent() {
129 let t = CancellationToken::new();
130 t.cancel();
131 t.cancel(); assert!(t.is_cancelled());
133 }
134
135 #[test]
136 fn clones_share_state() {
137 let t = CancellationToken::new();
138 let t2 = t.clone();
139 t.cancel();
140 assert!(t2.is_cancelled());
141 }
142
143 #[tokio::test]
144 async fn cancelled_future_resolves_after_cancel() {
145 let t = CancellationToken::new();
146 let t2 = t.clone();
147 tokio::spawn(async move {
149 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
150 t2.cancel();
151 });
152 t.cancelled().await;
153 assert!(t.is_cancelled());
154 }
155
156 #[tokio::test]
157 async fn cancelled_returns_immediately_if_already_cancelled() {
158 let t = CancellationToken::new();
159 t.cancel();
160 t.cancelled().await;
162 }
163}