1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
//! Cancellable future for futures 0.1
//!
//! # Example
//! ```rust
//! # fn main() {
//! #  use futures_01::future::{Future, IntoFuture};
//!  use kyansel::futures_01::{CancellableError, FutureCancellable};
//! #  use tokio_01::{sync::oneshot, timer::Delay, runtime::Builder};
//! #  fn run(fut: impl Future<Item = (), Error = ()> + Send + 'static) { Builder::new().panic_handler(|err| std::panic::resume_unwind(err)).build().unwrap().block_on(fut); }
//!
//!  let (tx, rx) = oneshot::channel::<()>();
//!
//!  let deadline = tokio::clock::now() + std::time::Duration::from_secs(1).into();
//!
//!  //simulate a long-running future
//!  let cancellable = Delay::new(deadline)
//!      .map_err(|_| ())
//!      .and_then(|_| Ok(()))
//!      //add a way to cancel it
//!      .cancel_with(rx)
//!      .map_err(|e| {
//!         assert_eq!(e, CancellableError::Cancelled(()));
//!       })
//!      .map_err(|_| ());
//!
//!  let canceller = tx.send(()).into_future();
//!
//!  run(
//!    //join it with the canceller
//!    cancellable.join(canceller)
//!    //it will cancel immediately returning err
//!    .and_then(|(_ok, _tx_send)| Ok(println!("unreachable"))),
//!  );
//! # }
//! ```

use futures_01::{Async, Future, Poll};

///Future for the [`cancel_with`](trait.FutureCancellable.html#method.cancel_with) combinator,
///allowing a computation to be cancelled if a second computation completes succesfully.
///
///If the future is cancelled it will complete as a
/// [`CancellableError::Cancelled`](enum.CancellableError.html#variant.Cancelled)
///
///Created with [`FutureCancellable::cancel_with`](trait.FutureCancellable.html#method.cancel_with)
/// or [`cancellable`](fn.cancellable.html)
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct Cancellable<F, S>
where
    F: Future,
    S: Future,
{
    inner: F,
    stopper: Option<S>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
///Error returned by [`Cancellable`](struct.Cancellable.html)
pub enum CancellableError<C, E> {
    ///If the inner future was cancelled
    Cancelled(C),

    ///If the inner future just errored
    Errored(E),
}

impl<C, E> CancellableError<C, E> {
    ///Check if the future was cancelled
    pub fn is_cancelled(&self) -> bool {
        match self {
            Self::Cancelled(_) => true,
            _ => false,
        }
    }

    ///Retrieve the error of the future
    /// if it was not cancelled and it errored
    pub fn errored(self) -> Option<E> {
        match self {
            Self::Errored(e) => Some(e),
            _ => None,
        }
    }

    ///Retrieve the result of the canceller future
    /// if the future was cancelled
    pub fn cancelled(self) -> Option<C> {
        match self {
            Self::Cancelled(c) => Some(c),
            _ => None,
        }
    }
}

impl<F, S> Future for Cancellable<F, S>
where
    F: Future,
    S: Future,
{
    type Error = CancellableError<S::Item, F::Error>;
    type Item = F::Item;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        //always poll inner future
        let inner = match self.inner.poll() {
            Ok(ok @ Async::NotReady) => Ok(ok),
            Ok(ok @ Async::Ready(_)) => {
                return Ok(ok); //return early with the result
            }
            Err(e) => Err(CancellableError::Errored(e)),
        };

        if let Some(ref mut stopper) = self.stopper {
            match stopper.poll() {
                //if the inner future was ready we won't reach this
                Ok(Async::Ready(s)) => return Err(CancellableError::Cancelled(s)),
                Ok(_) => {}
                Err(_) => {
                    //don't poll again
                    self.stopper = None;
                }
            }
        }

        //this is either Ok(NotReady) or Err(Errored)
        inner
    }
}

/// An extension trait for `Future` that provides the [`Cancellable`](struct.Cancellable.html)
/// combinator.
///
/// Users are not expected to implement this trait. All types that implement
/// `Future` already implement `FutureCancellable`.
pub trait FutureCancellable: Future {
    ///Cancel this future if another one completes succesfully
    ///
    ///Note that this function consumes the receiving future and returns a wrapped version of it
    fn cancel_with<S>(self, stopper: S) -> Cancellable<Self, S>
    where
        S: Future,
        Self: Sized,
    {
        Cancellable { inner: self, stopper: Some(stopper) }
    }
}

///Creates a new [`Cancellable`](struct.Cancellable.html)
///
///This is essentially the same as
/// [`FutureCancellable::cancel_with`](trait.FutureCancellable.html#method.cancel_with),
/// the difference being that this is a function instead of a method
pub fn cancellable<Fut1, Fut2>(inner: Fut1, stopper: Fut2) -> Cancellable<Fut1, Fut2>
where
    Fut1: Future,
    Fut2: Future,
{
    Cancellable { inner, stopper: Some(stopper) }
}

impl<T: ?Sized> FutureCancellable for T where T: Future {}