pub(crate) mod guard;
pub(crate) mod guard_ref;
mod tree_node;
use crate::loom::sync::Arc;
use crate::util::MaybeDangling;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use guard::DropGuard;
use guard_ref::DropGuardRef;
use pin_project_lite::pin_project;
pub struct CancellationToken {
inner: Arc<tree_node::TreeNode>,
}
impl std::panic::UnwindSafe for CancellationToken {}
impl std::panic::RefUnwindSafe for CancellationToken {}
pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct WaitForCancellationFuture<'a> {
cancellation_token: &'a CancellationToken,
#[pin]
future: tokio::sync::futures::Notified<'a>,
}
}
pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct WaitForCancellationFutureOwned {
#[pin]
future: MaybeDangling<tokio::sync::futures::Notified<'static>>,
cancellation_token: CancellationToken,
}
}
impl core::fmt::Debug for CancellationToken {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("CancellationToken")
.field("is_cancelled", &self.is_cancelled())
.finish()
}
}
impl Clone for CancellationToken {
fn clone(&self) -> Self {
tree_node::increase_handle_refcount(&self.inner);
CancellationToken {
inner: self.inner.clone(),
}
}
}
impl Drop for CancellationToken {
fn drop(&mut self) {
tree_node::decrease_handle_refcount(&self.inner);
}
}
impl Default for CancellationToken {
fn default() -> CancellationToken {
CancellationToken::new()
}
}
impl CancellationToken {
pub fn new() -> CancellationToken {
CancellationToken {
inner: Arc::new(tree_node::TreeNode::new()),
}
}
pub fn child_token(&self) -> CancellationToken {
CancellationToken {
inner: tree_node::child_node(&self.inner),
}
}
pub fn cancel(&self) {
tree_node::cancel(&self.inner);
}
pub fn is_cancelled(&self) -> bool {
tree_node::is_cancelled(&self.inner)
}
pub fn cancelled(&self) -> WaitForCancellationFuture<'_> {
WaitForCancellationFuture {
cancellation_token: self,
future: self.inner.notified(),
}
}
pub fn cancelled_owned(self) -> WaitForCancellationFutureOwned {
WaitForCancellationFutureOwned::new(self)
}
pub fn drop_guard(self) -> DropGuard {
DropGuard { inner: Some(self) }
}
pub fn drop_guard_ref(&self) -> DropGuardRef<'_> {
DropGuardRef { inner: Some(self) }
}
pub async fn run_until_cancelled<F>(&self, fut: F) -> Option<F::Output>
where
F: Future,
{
if self.is_cancelled() {
None
} else {
RunUntilCancelledFuture {
cancellation: self.cancelled(),
future: fut,
}
.await
}
}
pub async fn run_until_cancelled_owned<F>(self, fut: F) -> Option<F::Output>
where
F: Future,
{
self.run_until_cancelled(fut).await
}
}
impl<'a> core::fmt::Debug for WaitForCancellationFuture<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("WaitForCancellationFuture").finish()
}
}
impl<'a> Future for WaitForCancellationFuture<'a> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut this = self.project();
loop {
if this.cancellation_token.is_cancelled() {
return Poll::Ready(());
}
if this.future.as_mut().poll(cx).is_pending() {
return Poll::Pending;
}
this.future.set(this.cancellation_token.inner.notified());
}
}
}
impl core::fmt::Debug for WaitForCancellationFutureOwned {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("WaitForCancellationFutureOwned").finish()
}
}
impl WaitForCancellationFutureOwned {
fn new(cancellation_token: CancellationToken) -> Self {
WaitForCancellationFutureOwned {
future: MaybeDangling::new(unsafe { Self::new_future(&cancellation_token) }),
cancellation_token,
}
}
unsafe fn new_future(
cancellation_token: &CancellationToken,
) -> tokio::sync::futures::Notified<'static> {
let inner_ptr = Arc::as_ptr(&cancellation_token.inner);
(*inner_ptr).notified()
}
}
impl Future for WaitForCancellationFutureOwned {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut this = self.project();
loop {
if this.cancellation_token.is_cancelled() {
return Poll::Ready(());
}
if this.future.as_mut().poll(cx).is_pending() {
return Poll::Pending;
}
this.future.set(MaybeDangling::new(unsafe {
Self::new_future(this.cancellation_token)
}));
}
}
}
pin_project! {
#[must_use = "futures do nothing unless polled"]
pub(crate) struct RunUntilCancelledFuture<'a, F: Future> {
#[pin]
cancellation: WaitForCancellationFuture<'a>,
#[pin]
future: F,
}
}
impl<'a, F: Future> RunUntilCancelledFuture<'a, F> {
pub(crate) fn new(cancellation_token: &'a CancellationToken, future: F) -> Self {
Self {
cancellation: cancellation_token.cancelled(),
future,
}
}
}
impl<'a, F: Future> Future for RunUntilCancelledFuture<'a, F> {
type Output = Option<F::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(res) = this.future.poll(cx) {
Poll::Ready(Some(res))
} else if this.cancellation.poll(cx).is_ready() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}
pin_project! {
#[must_use = "futures do nothing unless polled"]
pub(crate) struct RunUntilCancelledFutureOwned<F: Future> {
#[pin]
cancellation: WaitForCancellationFutureOwned,
#[pin]
future: F,
}
}
impl<F: Future> Future for RunUntilCancelledFutureOwned<F> {
type Output = Option<F::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(res) = this.future.poll(cx) {
Poll::Ready(Some(res))
} else if this.cancellation.poll(cx).is_ready() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}
impl<F: Future> RunUntilCancelledFutureOwned<F> {
pub(crate) fn new(cancellation_token: CancellationToken, future: F) -> Self {
Self {
cancellation: cancellation_token.cancelled_owned(),
future,
}
}
}