ftui_runtime/
cancellation.rs1#![forbid(unsafe_code)]
37
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::sync::{Arc, Condvar, Mutex};
40use web_time::Duration;
41
42#[derive(Clone)]
48pub struct CancellationToken {
49 inner: Arc<CancellationInner>,
50}
51
52pub struct CancellationSource {
57 inner: Arc<CancellationInner>,
58}
59
60struct CancellationInner {
61 cancelled: AtomicBool,
62 notify: (Mutex<()>, Condvar),
63}
64
65impl CancellationSource {
66 pub fn new() -> Self {
68 Self {
69 inner: Arc::new(CancellationInner {
70 cancelled: AtomicBool::new(false),
71 notify: (Mutex::new(()), Condvar::new()),
72 }),
73 }
74 }
75
76 pub fn token(&self) -> CancellationToken {
78 CancellationToken {
79 inner: Arc::clone(&self.inner),
80 }
81 }
82
83 pub fn cancel(&self) {
86 self.inner.cancelled.store(true, Ordering::Release);
87 let (lock, cvar) = &self.inner.notify;
88 let _guard = lock.lock().unwrap_or_else(|e| e.into_inner());
89 cvar.notify_all();
90 }
91
92 pub fn is_cancelled(&self) -> bool {
94 self.inner.cancelled.load(Ordering::Acquire)
95 }
96}
97
98impl Default for CancellationSource {
99 fn default() -> Self {
100 Self::new()
101 }
102}
103
104impl CancellationToken {
105 #[inline]
107 pub fn is_cancelled(&self) -> bool {
108 self.inner.cancelled.load(Ordering::Acquire)
109 }
110
111 pub fn wait_timeout(&self, duration: Duration) -> bool {
115 if self.is_cancelled() {
116 return true;
117 }
118 let (lock, cvar) = &self.inner.notify;
119 let mut guard = lock.lock().unwrap_or_else(|e| e.into_inner());
120 let start = web_time::Instant::now();
121 let mut remaining = duration;
122 loop {
123 if self.is_cancelled() {
124 return true;
125 }
126 let (new_guard, result) = cvar
127 .wait_timeout(guard, remaining)
128 .unwrap_or_else(|e| e.into_inner());
129 guard = new_guard;
130 if self.is_cancelled() {
131 return true;
132 }
133 if result.timed_out() {
134 return false;
135 }
136 let elapsed = start.elapsed();
137 if elapsed >= duration {
138 return false;
139 }
140 remaining = duration - elapsed;
141 }
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148 use std::sync::atomic::Ordering as AO;
149 use std::thread;
150
151 #[test]
152 fn token_starts_uncancelled() {
153 let source = CancellationSource::new();
154 let token = source.token();
155 assert!(!token.is_cancelled());
156 assert!(!source.is_cancelled());
157 }
158
159 #[test]
160 fn cancel_propagates_to_token() {
161 let source = CancellationSource::new();
162 let token = source.token();
163 source.cancel();
164 assert!(token.is_cancelled());
165 }
166
167 #[test]
168 fn cancel_propagates_to_all_clones() {
169 let source = CancellationSource::new();
170 let t1 = source.token();
171 let t2 = t1.clone();
172 let t3 = source.token();
173 source.cancel();
174 assert!(t1.is_cancelled());
175 assert!(t2.is_cancelled());
176 assert!(t3.is_cancelled());
177 }
178
179 #[test]
180 fn drop_source_does_not_cancel() {
181 let source = CancellationSource::new();
182 let token = source.token();
183 drop(source);
184 assert!(!token.is_cancelled());
185 }
186
187 #[test]
188 fn wait_timeout_returns_true_when_already_cancelled() {
189 let source = CancellationSource::new();
190 let token = source.token();
191 source.cancel();
192 assert!(token.wait_timeout(Duration::from_secs(10)));
193 }
194
195 #[test]
196 fn wait_timeout_returns_false_on_timeout() {
197 let source = CancellationSource::new();
198 let token = source.token();
199 assert!(!token.wait_timeout(Duration::from_millis(10)));
200 }
201
202 #[test]
203 fn wait_timeout_wakes_on_cancel() {
204 let source = CancellationSource::new();
205 let token = source.token();
206
207 let handle = thread::spawn(move || token.wait_timeout(Duration::from_secs(10)));
208
209 thread::sleep(Duration::from_millis(20));
210 source.cancel();
211
212 let result = handle.join().unwrap();
213 assert!(result);
214 }
215
216 #[test]
217 fn cancel_is_idempotent() {
218 let source = CancellationSource::new();
219 let token = source.token();
220 source.cancel();
221 source.cancel();
222 source.cancel();
223 assert!(token.is_cancelled());
224 }
225
226 #[test]
227 fn token_works_across_threads() {
228 let source = CancellationSource::new();
229 let token = source.token();
230 let flag = Arc::new(AtomicBool::new(false));
231 let flag_clone = flag.clone();
232
233 let handle = thread::spawn(move || {
234 while !token.is_cancelled() {
235 thread::sleep(Duration::from_millis(5));
236 }
237 flag_clone.store(true, AO::SeqCst);
238 });
239
240 thread::sleep(Duration::from_millis(20));
241 source.cancel();
242 handle.join().unwrap();
243 assert!(flag.load(AO::SeqCst));
244 }
245
246 #[test]
247 fn default_creates_uncancelled_source() {
248 let source = CancellationSource::default();
249 assert!(!source.is_cancelled());
250 }
251}