use std::{
future::Future,
marker::PhantomData,
pin::{pin, Pin},
task::{Context, Poll},
};
use futures::{future::FusedFuture, select, FutureExt};
use crate::value::Notifier;
#[derive(Debug, Clone, Default)]
pub struct CancellationToken {
parent_notifier: Option<Notifier>,
notifier: Notifier,
}
impl CancellationToken {
pub fn new() -> CancellationToken {
CancellationToken {
parent_notifier: None,
notifier: Notifier::new(),
}
}
pub fn child_token(&self) -> CancellationToken {
CancellationToken {
parent_notifier: Some(self.notifier.clone()),
notifier: Notifier::new(),
}
}
pub fn cancel(&self) {
self.notifier.notify();
}
pub fn is_cancelled(&self) -> bool {
self.notifier.already_notified()
|| self
.parent_notifier
.as_ref()
.map(Notifier::already_notified)
.unwrap_or_default()
}
pub fn cancelled(&self) -> Cancelled<'_> {
Cancelled {
cancelled_owned: CancelledOwned {
cancellation_token: self.clone(),
},
_lifetime: PhantomData,
}
}
pub fn cancelled_owned(self) -> CancelledOwned {
CancelledOwned {
cancellation_token: self,
}
}
pub async fn run_until_cancelled<F>(&self, fut: F) -> Option<F::Output>
where
F: Future,
{
if self.is_cancelled() {
None
} else {
let mut fut = pin!(fut.fuse());
select! {
result = fut => Some(result),
_ = self.cancelled() => None,
}
}
}
pub async fn run_until_cancelled_owned<F>(self, fut: F) -> Option<F::Output>
where
F: Future,
{
self.run_until_cancelled(fut).await
}
}
#[derive(Debug)]
pub struct Cancelled<'a> {
cancelled_owned: CancelledOwned,
_lifetime: PhantomData<&'a ()>,
}
impl<'a> Future for Cancelled<'a> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.cancelled_owned.poll_unpin(cx)
}
}
impl<'a> FusedFuture for Cancelled<'a> {
fn is_terminated(&self) -> bool {
self.cancelled_owned.is_terminated()
}
}
#[derive(Debug)]
pub struct CancelledOwned {
cancellation_token: CancellationToken,
}
impl Future for CancelledOwned {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.cancellation_token.notifier.poll_unpin(cx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => {
if let Some(notifier) = self.cancellation_token.parent_notifier.as_mut() {
notifier.poll_unpin(cx)
} else {
Poll::Pending
}
}
}
}
}
impl FusedFuture for CancelledOwned {
fn is_terminated(&self) -> bool {
self.cancellation_token.notifier.is_terminated()
|| self
.cancellation_token
.parent_notifier
.as_ref()
.map(Notifier::is_terminated)
.unwrap_or_default()
}
}