Skip to main content

mssf_core/sync/
token.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::task::{Context, Poll, Waker};
11
12use crate::runtime::executor::{BoxedCancelToken, CancelToken, EventFuture};
13
14/// A simple cancel token implementation
15#[derive(Clone, Debug)]
16pub struct SimpleCancelToken {
17    inner: Arc<TokenInner>,
18}
19
20struct TokenInner {
21    cancelled: AtomicBool,
22    wakers: Mutex<Vec<Waker>>,
23    callback: Mutex<Option<Box<dyn FnOnce() + Send + Sync>>>,
24}
25
26impl std::fmt::Debug for TokenInner {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        f.debug_struct("TokenInner")
29            .field("cancelled", &self.cancelled)
30            .field("wakers", &self.wakers)
31            .field("has_callback", &self.callback.lock().unwrap().is_some())
32            .finish()
33    }
34}
35
36impl SimpleCancelToken {
37    pub fn new() -> Self {
38        SimpleCancelToken {
39            inner: Arc::new(TokenInner {
40                cancelled: AtomicBool::new(false),
41                wakers: Mutex::new(Vec::new()),
42                callback: Mutex::new(None),
43            }),
44        }
45    }
46
47    pub fn new_boxed() -> BoxedCancelToken {
48        Box::new(Self::new())
49    }
50
51    pub fn cancel(&self) {
52        // Set the cancelled flag
53        self.inner.cancelled.store(true, Ordering::Release);
54
55        // Wake all waiting tasks
56        let mut wakers = self.inner.wakers.lock().unwrap();
57        for waker in wakers.drain(..) {
58            waker.wake();
59        }
60        drop(wakers);
61
62        // Take and invoke the callback, releasing the lock
63        // before calling it to avoid deadlock.
64        let callback = self.inner.callback.lock().unwrap().take();
65        if let Some(cb) = callback {
66            cb();
67        }
68    }
69
70    pub fn is_cancelled(&self) -> bool {
71        self.inner.cancelled.load(Ordering::Acquire)
72    }
73
74    /// Returns a future that completes when cancellation is triggered
75    pub fn cancelled(&self) -> CancelledFuture {
76        CancelledFuture {
77            token: self.clone(),
78        }
79    }
80}
81
82/// This future is cancel safe.
83pub struct CancelledFuture {
84    token: SimpleCancelToken,
85}
86
87impl Future for CancelledFuture {
88    type Output = ();
89
90    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
91        if self.token.is_cancelled() {
92            return Poll::Ready(());
93        }
94
95        // Register this task's waker to be notified when cancelled
96        let mut wakers = self.token.inner.wakers.lock().unwrap();
97
98        // Double-check after acquiring the lock
99        if self.token.is_cancelled() {
100            return Poll::Ready(());
101        }
102
103        // Store the waker to be called when cancel() is invoked
104        wakers.push(cx.waker().clone());
105        Poll::Pending
106    }
107}
108
109impl Default for SimpleCancelToken {
110    fn default() -> Self {
111        Self::new()
112    }
113}
114
115/// Integrate with mssf trait system.
116impl CancelToken for SimpleCancelToken {
117    fn cancel(&self) {
118        self.cancel();
119    }
120
121    fn is_cancelled(&self) -> bool {
122        self.is_cancelled()
123    }
124
125    fn wait(&self) -> Pin<Box<dyn EventFuture>> {
126        Box::pin(self.cancelled())
127    }
128
129    fn on_cancel(&self, callback: Box<dyn FnOnce() + Send + Sync>) {
130        if self.is_cancelled() {
131            callback();
132            return;
133        }
134        let mut slot = self.inner.callback.lock().unwrap();
135        // Double-check after acquiring the lock
136        if self.is_cancelled() {
137            drop(slot);
138            callback();
139        } else {
140            debug_assert!(slot.is_none(), "a callback has already been registered");
141            *slot = Some(callback);
142        }
143    }
144
145    fn clone_box(&self) -> Box<dyn CancelToken> {
146        Box::new(self.clone())
147    }
148}