1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use crossbeam::epoch;
use pin_project::pin_project;

use crate::error::{AnyError, Error, ErrorPredicate};
use crate::recloser::Recloser;

/// Provides future aware method on top of a regular `Recloser`.
pub struct AsyncRecloser {
    inner: Arc<Recloser>,
}

impl AsyncRecloser {
    pub fn from(recloser: Recloser) -> Self {
        AsyncRecloser {
            inner: Arc::new(recloser),
        }
    }

    /// Same as `Recloser::call(...)` but with `Future`.
    pub fn call<F, T, E>(&self, f: F) -> RecloserFuture<F, AnyError>
    where
        F: Future<Output = Result<T, E>>,
    {
        self.call_with(AnyError, f)
    }

    /// Same as `Recloser::call_with(...)` but with `Future`.
    pub fn call_with<F, T, E, P>(&self, predicate: P, f: F) -> RecloserFuture<F, P>
    where
        F: Future<Output = Result<T, E>>,
        P: ErrorPredicate<E>,
    {
        let recloser = AsyncRecloser {
            inner: self.inner.clone(),
        };

        RecloserFuture {
            recloser,
            future: f,
            predicate,
            checked: false,
        }
    }
}

/// Custom `Future` returned by `AsyncRecloser` wrapped future calls.
#[pin_project]
pub struct RecloserFuture<F, P> {
    recloser: AsyncRecloser,
    #[pin]
    future: F,
    predicate: P,
    checked: bool,
}

impl<F, T, E, P> Future for RecloserFuture<F, P>
where
    F: Future<Output = Result<T, E>>,
    P: ErrorPredicate<E>,
{
    type Output = Result<T, Error<E>>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        let guard = &epoch::pin();
        let this = self.project();

        if !&*this.checked {
            *this.checked = true;
            if !this.recloser.inner.call_permitted(guard) {
                return Poll::Ready(Err(Error::Rejected));
            }
        }

        match this.future.poll(cx) {
            Poll::Ready(Ok(ok)) => {
                this.recloser.inner.on_success(guard);
                Poll::Ready(Ok(ok))
            }
            Poll::Pending => Poll::Pending,
            Poll::Ready(Err(err)) => {
                if this.predicate.is_err(&err) {
                    this.recloser.inner.on_error(guard);
                } else {
                    this.recloser.inner.on_success(guard);
                }
                Poll::Ready(Err(Error::Inner(err)))
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use async_std::task;
    use futures::future;

    use super::*;

    #[test]
    fn multi_futures() {
        let guard = &epoch::pin();

        let recloser = Recloser::custom().closed_len(1).build();
        let recloser = AsyncRecloser::from(recloser);

        let future = future::lazy(|_| Err::<(), ()>(()));
        let future = recloser.call(future);

        assert!(matches!(task::block_on(future), Err(Error::Inner(()))));
        assert_eq!(true, recloser.inner.call_permitted(guard));

        let future = future::lazy(|_| Err::<usize, usize>(12));
        let future = recloser.call(future);

        assert!(matches!(task::block_on(future), Err(Error::Inner(12))));
        assert_eq!(false, recloser.inner.call_permitted(guard));
    }
}