chartml_core/resolver/
cancel.rs1use std::future::Future;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::sync::{Arc, Mutex};
16use std::task::{Context, Poll, Waker};
17
18struct CancelInner {
21 cancelled: AtomicBool,
22 wakers: Mutex<Vec<Waker>>,
23}
24
25#[derive(Clone)]
32pub struct CancellationToken(Arc<CancelInner>);
33
34impl Default for CancellationToken {
35 fn default() -> Self {
36 Self::new()
37 }
38}
39
40impl CancellationToken {
41 pub fn new() -> Self {
43 Self(Arc::new(CancelInner {
44 cancelled: AtomicBool::new(false),
45 wakers: Mutex::new(Vec::new()),
46 }))
47 }
48
49 pub fn cancel(&self) {
52 if !self.0.cancelled.swap(true, Ordering::SeqCst) {
55 let mut wakers = self.0.wakers.lock().expect("cancel waker lock poisoned");
56 for waker in wakers.drain(..) {
57 waker.wake();
58 }
59 }
60 }
61
62 pub fn is_cancelled(&self) -> bool {
64 self.0.cancelled.load(Ordering::SeqCst)
65 }
66
67 pub fn cancelled(&self) -> Cancelled<'_> {
70 Cancelled { token: self }
71 }
72}
73
74impl std::fmt::Debug for CancellationToken {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 f.debug_struct("CancellationToken")
77 .field("cancelled", &self.is_cancelled())
78 .finish()
79 }
80}
81
82pub struct Cancelled<'a> {
84 token: &'a CancellationToken,
85}
86
87impl Future for Cancelled<'_> {
88 type Output = ();
89
90 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
91 if self.token.is_cancelled() {
92 return Poll::Ready(());
93 }
94 let mut wakers = self
96 .token
97 .0
98 .wakers
99 .lock()
100 .expect("cancel waker lock poisoned");
101 if self.token.is_cancelled() {
104 return Poll::Ready(());
105 }
106 if !wakers.iter().any(|w| w.will_wake(cx.waker())) {
108 wakers.push(cx.waker().clone());
109 }
110 Poll::Pending
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117
118 #[test]
119 fn cancel_flips_flag() {
120 let t = CancellationToken::new();
121 assert!(!t.is_cancelled());
122 t.cancel();
123 assert!(t.is_cancelled());
124 }
125
126 #[test]
127 fn cancel_is_idempotent() {
128 let t = CancellationToken::new();
129 t.cancel();
130 t.cancel(); assert!(t.is_cancelled());
132 }
133
134 #[test]
135 fn clones_share_state() {
136 let t = CancellationToken::new();
137 let t2 = t.clone();
138 t.cancel();
139 assert!(t2.is_cancelled());
140 }
141
142 #[tokio::test]
143 async fn cancelled_future_resolves_after_cancel() {
144 let t = CancellationToken::new();
145 let t2 = t.clone();
146 tokio::spawn(async move {
148 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
149 t2.cancel();
150 });
151 t.cancelled().await;
152 assert!(t.is_cancelled());
153 }
154
155 #[tokio::test]
156 async fn cancelled_returns_immediately_if_already_cancelled() {
157 let t = CancellationToken::new();
158 t.cancel();
159 t.cancelled().await;
161 }
162}