1use std::future::Future;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::task::{Context, Poll, Waker};
11
12use crate::runtime::executor::{BoxedCancelToken, CancelToken, EventFuture};
13
14#[derive(Clone, Debug)]
16pub struct SimpleCancelToken {
17 inner: Arc<TokenInner>,
18}
19
20struct TokenInner {
21 cancelled: AtomicBool,
22 wakers: Mutex<Vec<Waker>>,
23 callback: Mutex<Option<Box<dyn FnOnce() + Send + Sync>>>,
24}
25
26impl std::fmt::Debug for TokenInner {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 f.debug_struct("TokenInner")
29 .field("cancelled", &self.cancelled)
30 .field("wakers", &self.wakers)
31 .field("has_callback", &self.callback.lock().unwrap().is_some())
32 .finish()
33 }
34}
35
36impl SimpleCancelToken {
37 pub fn new() -> Self {
38 SimpleCancelToken {
39 inner: Arc::new(TokenInner {
40 cancelled: AtomicBool::new(false),
41 wakers: Mutex::new(Vec::new()),
42 callback: Mutex::new(None),
43 }),
44 }
45 }
46
47 pub fn new_boxed() -> BoxedCancelToken {
48 Box::new(Self::new())
49 }
50
51 pub fn cancel(&self) {
52 self.inner.cancelled.store(true, Ordering::Release);
54
55 let mut wakers = self.inner.wakers.lock().unwrap();
57 for waker in wakers.drain(..) {
58 waker.wake();
59 }
60 drop(wakers);
61
62 let callback = self.inner.callback.lock().unwrap().take();
65 if let Some(cb) = callback {
66 cb();
67 }
68 }
69
70 pub fn is_cancelled(&self) -> bool {
71 self.inner.cancelled.load(Ordering::Acquire)
72 }
73
74 pub fn cancelled(&self) -> CancelledFuture {
76 CancelledFuture {
77 token: self.clone(),
78 }
79 }
80}
81
82pub struct CancelledFuture {
84 token: SimpleCancelToken,
85}
86
87impl Future for CancelledFuture {
88 type Output = ();
89
90 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
91 if self.token.is_cancelled() {
92 return Poll::Ready(());
93 }
94
95 let mut wakers = self.token.inner.wakers.lock().unwrap();
97
98 if self.token.is_cancelled() {
100 return Poll::Ready(());
101 }
102
103 wakers.push(cx.waker().clone());
105 Poll::Pending
106 }
107}
108
109impl Default for SimpleCancelToken {
110 fn default() -> Self {
111 Self::new()
112 }
113}
114
115impl CancelToken for SimpleCancelToken {
117 fn cancel(&self) {
118 self.cancel();
119 }
120
121 fn is_cancelled(&self) -> bool {
122 self.is_cancelled()
123 }
124
125 fn wait(&self) -> Pin<Box<dyn EventFuture>> {
126 Box::pin(self.cancelled())
127 }
128
129 fn on_cancel(&self, callback: Box<dyn FnOnce() + Send + Sync>) {
130 if self.is_cancelled() {
131 callback();
132 return;
133 }
134 let mut slot = self.inner.callback.lock().unwrap();
135 if self.is_cancelled() {
137 drop(slot);
138 callback();
139 } else {
140 debug_assert!(slot.is_none(), "a callback has already been registered");
141 *slot = Some(callback);
142 }
143 }
144
145 fn clone_box(&self) -> Box<dyn CancelToken> {
146 Box::new(self.clone())
147 }
148}