use std::{
cell::{Cell, RefCell},
collections::HashSet,
mem,
ops::DerefMut,
pin::Pin,
rc::Rc,
task::{Context, Poll},
};
use compio_driver::{Cancel, Key, OpCode};
use futures_util::{FutureExt, ready};
use synchrony::unsync::event::{Event, EventListener};
use crate::{ContextExt, Runtime};
#[derive(Debug)]
struct Inner {
tokens: RefCell<HashSet<Cancel>>,
is_cancelled: Cell<bool>,
runtime: Runtime,
notify: Event,
}
#[derive(Clone, Debug)]
pub struct CancelToken(Rc<Inner>);
impl PartialEq for CancelToken {
fn eq(&self, other: &Self) -> bool {
Rc::ptr_eq(&self.0, &other.0)
}
}
impl Eq for CancelToken {}
impl CancelToken {
pub fn new() -> Self {
Self(Rc::new(Inner {
tokens: RefCell::new(HashSet::new()),
is_cancelled: Cell::new(false),
runtime: Runtime::current(),
notify: Event::new(),
}))
}
pub(crate) fn listen(&self) -> EventListener {
self.0.notify.listen()
}
pub fn cancel(self) {
self.0.notify.notify_all();
if self.0.is_cancelled.replace(true) {
return;
}
let tokens = mem::take(self.0.tokens.borrow_mut().deref_mut());
for t in tokens {
self.0.runtime.cancel_token(t);
}
}
pub fn is_cancelled(&self) -> bool {
self.0.is_cancelled.get()
}
pub fn register<T: OpCode>(&self, key: &Key<T>) {
if self.0.is_cancelled.get() {
self.0.runtime.cancel(key.clone());
} else {
let token = self.0.runtime.register_cancel(key);
self.0.tokens.borrow_mut().insert(token);
}
}
pub fn wait(self) -> WaitFuture {
WaitFuture::new(self)
}
pub async fn current() -> Option<Self> {
std::future::poll_fn(|cx| Poll::Ready(cx.get_cancel().cloned())).await
}
}
impl Default for CancelToken {
fn default() -> Self {
Self::new()
}
}
pub struct WaitFuture {
listen: EventListener,
token: CancelToken,
}
impl WaitFuture {
fn new(token: CancelToken) -> WaitFuture {
WaitFuture {
listen: token.listen(),
token,
}
}
}
impl Future for WaitFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
loop {
if self.token.is_cancelled() {
return Poll::Ready(());
} else {
ready!(self.listen.poll_unpin(cx))
}
}
}
}