use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use crate::runtime::executor::{BoxedCancelToken, CancelToken, EventFuture};
#[derive(Clone, Debug)]
pub struct SimpleCancelToken {
inner: Arc<TokenInner>,
}
struct TokenInner {
cancelled: AtomicBool,
wakers: Mutex<Vec<Waker>>,
callback: Mutex<Option<Box<dyn FnOnce() + Send + Sync>>>,
}
impl std::fmt::Debug for TokenInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TokenInner")
.field("cancelled", &self.cancelled)
.field("wakers", &self.wakers)
.field("has_callback", &self.callback.lock().unwrap().is_some())
.finish()
}
}
impl SimpleCancelToken {
pub fn new() -> Self {
SimpleCancelToken {
inner: Arc::new(TokenInner {
cancelled: AtomicBool::new(false),
wakers: Mutex::new(Vec::new()),
callback: Mutex::new(None),
}),
}
}
pub fn new_boxed() -> BoxedCancelToken {
Box::new(Self::new())
}
pub fn cancel(&self) {
self.inner.cancelled.store(true, Ordering::Release);
let mut wakers = self.inner.wakers.lock().unwrap();
for waker in wakers.drain(..) {
waker.wake();
}
drop(wakers);
let callback = self.inner.callback.lock().unwrap().take();
if let Some(cb) = callback {
cb();
}
}
pub fn is_cancelled(&self) -> bool {
self.inner.cancelled.load(Ordering::Acquire)
}
pub fn cancelled(&self) -> CancelledFuture {
CancelledFuture {
token: self.clone(),
}
}
}
pub struct CancelledFuture {
token: SimpleCancelToken,
}
impl Future for CancelledFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.token.is_cancelled() {
return Poll::Ready(());
}
let mut wakers = self.token.inner.wakers.lock().unwrap();
if self.token.is_cancelled() {
return Poll::Ready(());
}
wakers.push(cx.waker().clone());
Poll::Pending
}
}
impl Default for SimpleCancelToken {
fn default() -> Self {
Self::new()
}
}
impl CancelToken for SimpleCancelToken {
fn cancel(&self) {
self.cancel();
}
fn is_cancelled(&self) -> bool {
self.is_cancelled()
}
fn wait(&self) -> Pin<Box<dyn EventFuture>> {
Box::pin(self.cancelled())
}
fn on_cancel(&self, callback: Box<dyn FnOnce() + Send + Sync>) {
if self.is_cancelled() {
callback();
return;
}
let mut slot = self.inner.callback.lock().unwrap();
if self.is_cancelled() {
drop(slot);
callback();
} else {
debug_assert!(slot.is_none(), "a callback has already been registered");
*slot = Some(callback);
}
}
fn clone_box(&self) -> Box<dyn CancelToken> {
Box::new(self.clone())
}
}