futures-util 0.3.3

Common utilities and extension traits for the futures-rs library.
Documentation
use crate::task::AtomicWaker;
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;
use core::fmt;
use core::pin::Pin;
use core::sync::atomic::{AtomicBool, Ordering};
use alloc::sync::Arc;

/// A future which can be remotely short-circuited using an `AbortHandle`.
#[derive(Debug, Clone)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Abortable<Fut> {
    future: Fut,
    inner: Arc<AbortInner>,
}

impl<Fut: Unpin> Unpin for Abortable<Fut> {}

impl<Fut> Abortable<Fut> where Fut: Future {
    unsafe_pinned!(future: Fut);

    /// Creates a new `Abortable` future using an existing `AbortRegistration`.
    /// `AbortRegistration`s can be acquired through `AbortHandle::new`.
    ///
    /// When `abort` is called on the handle tied to `reg` or if `abort` has
    /// already been called, the future will complete immediately without making
    /// any further progress.
    ///
    /// Example:
    ///
    /// ```
    /// # futures::executor::block_on(async {
    /// use futures::future::{Abortable, AbortHandle, Aborted};
    ///
    /// let (abort_handle, abort_registration) = AbortHandle::new_pair();
    /// let future = Abortable::new(async { 2 }, abort_registration);
    /// abort_handle.abort();
    /// assert_eq!(future.await, Err(Aborted));
    /// # });
    /// ```
    pub fn new(future: Fut, reg: AbortRegistration) -> Self {
        Abortable {
            future,
            inner: reg.inner,
        }
    }
}

/// A registration handle for a `Abortable` future.
/// Values of this type can be acquired from `AbortHandle::new` and are used
/// in calls to `Abortable::new`.
#[derive(Debug)]
pub struct AbortRegistration {
    inner: Arc<AbortInner>,
}

/// A handle to a `Abortable` future.
#[derive(Debug, Clone)]
pub struct AbortHandle {
    inner: Arc<AbortInner>,
}

impl AbortHandle {
    /// Creates an (`AbortHandle`, `AbortRegistration`) pair which can be used
    /// to abort a running future.
    ///
    /// This function is usually paired with a call to `Abortable::new`.
    ///
    /// Example:
    ///
    /// ```
    /// # futures::executor::block_on(async {
    /// use futures::future::{Abortable, AbortHandle, Aborted};
    ///
    /// let (abort_handle, abort_registration) = AbortHandle::new_pair();
    /// let future = Abortable::new(async { 2 }, abort_registration);
    /// abort_handle.abort();
    /// assert_eq!(future.await, Err(Aborted));
    /// # });
    /// ```
    pub fn new_pair() -> (Self, AbortRegistration) {
        let inner = Arc::new(AbortInner {
            waker: AtomicWaker::new(),
            cancel: AtomicBool::new(false),
        });

        (
            AbortHandle {
                inner: inner.clone(),
            },
            AbortRegistration {
                inner,
            },
        )
    }
}

// Inner type storing the waker to awaken and a bool indicating that it
// should be cancelled.
#[derive(Debug)]
struct AbortInner {
    waker: AtomicWaker,
    cancel: AtomicBool,
}

/// Creates a new `Abortable` future and a `AbortHandle` which can be used to stop it.
///
/// This function is a convenient (but less flexible) alternative to calling
/// `AbortHandle::new` and `Abortable::new` manually.
///
/// This function is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
pub fn abortable<Fut>(future: Fut) -> (Abortable<Fut>, AbortHandle)
    where Fut: Future
{
    let (handle, reg) = AbortHandle::new_pair();
    (
        Abortable::new(future, reg),
        handle,
    )
}

/// Indicator that the `Abortable` future was aborted.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Aborted;

impl fmt::Display for Aborted {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "`Abortable` future has been aborted")
    }
}

#[cfg(feature = "std")]
impl std::error::Error for Aborted {}

impl<Fut> Future for Abortable<Fut> where Fut: Future {
    type Output = Result<Fut::Output, Aborted>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Check if the future has been aborted
        if self.inner.cancel.load(Ordering::Relaxed) {
            return Poll::Ready(Err(Aborted))
        }

        // attempt to complete the future
        if let Poll::Ready(x) = self.as_mut().future().poll(cx) {
            return Poll::Ready(Ok(x))
        }

        // Register to receive a wakeup if the future is aborted in the... future
        self.inner.waker.register(cx.waker());

        // Check to see if the future was aborted between the first check and
        // registration.
        // Checking with `Relaxed` is sufficient because `register` introduces an
        // `AcqRel` barrier.
        if self.inner.cancel.load(Ordering::Relaxed) {
            return Poll::Ready(Err(Aborted))
        }

        Poll::Pending
    }
}

impl AbortHandle {
    /// Abort the `Abortable` future associated with this handle.
    ///
    /// Notifies the Abortable future associated with this handle that it
    /// should abort. Note that if the future is currently being polled on
    /// another thread, it will not immediately stop running. Instead, it will
    /// continue to run until its poll method returns.
    pub fn abort(&self) {
        self.inner.cancel.store(true, Ordering::Relaxed);
        self.inner.waker.wake();
    }
}