1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
mod error;

use core::fmt::Debug;
use core::mem;
use core::task::Waker;

use parking_lot::const_mutex;
use parking_lot::Mutex;

pub use error::Error;

#[derive(Debug)]
enum InnerState<Input, Output> {
    Uninitialized,

    Ongoing(Option<Input>, Option<Waker>),

    /// The awaitable is done
    Done(Output),

    Consumed,
}

/// Awaitable guarantees that there is no spurious wakeup
#[derive(Debug)]
pub struct Awaitable<Input, Output>(Mutex<InnerState<Input, Output>>);

impl<Input, Output> Awaitable<Input, Output> {
    /// Create an uninitialized `Awaitable`.
    ///
    /// Must be `reset` before it can be used.
    pub const fn new() -> Self {
        Self(const_mutex(InnerState::Uninitialized))
    }
}

impl<Input, Output> Awaitable<Input, Output> {
    /// Reset `Awaitable` to its initial state.
    ///
    /// After this call, `install_waker`, `take_input` and `done`
    /// can be called.
    pub fn reset(&self, input: Option<Input>) {
        *self.0.lock() = InnerState::Ongoing(input, None);
    }

    /// Return true if the task is already done.
    ///
    /// **
    /// `install_waker` must not be called after `take_output` is called.
    /// **
    pub fn install_waker(&self, waker: Waker) -> Result<bool, Error> {
        use InnerState::*;

        let mut guard = self.0.lock();

        match &mut *guard {
            Uninitialized => Err(Error::Uninitialized),

            Ongoing(_input, stored_waker) => {
                *stored_waker = Some(waker);
                Ok(false)
            }
            Done(_) => Ok(true),
            Consumed => Err(Error::AlreadyConsumed),
        }
    }

    /// **`take_input` must not be called after `take_output` is called.
    pub fn take_input(&self) -> Result<Option<Input>, Error> {
        use InnerState::*;

        let mut guard = self.0.lock();

        match &mut *guard {
            Uninitialized => Err(Error::Uninitialized),

            Ongoing(input, _stored_waker) => Ok(input.take()),
            Done(_) => Ok(None),
            Consumed => Err(Error::AlreadyConsumed),
        }
    }

    /// `done` must be only called once on one instance of `Awaitable`.
    ///
    /// **`done` must not be called after `take_output` is called.**
    pub fn done(&self, value: Output) -> Result<(), Error> {
        use InnerState::*;

        let prev_state = mem::replace(&mut *self.0.lock(), Done(value));

        match prev_state {
            Uninitialized => Err(Error::Uninitialized),

            Done(_) => Err(Error::AlreadyDone),
            Ongoing(_input, stored_waker) => {
                if let Some(waker) = stored_waker {
                    waker.wake();
                }

                Ok(())
            }
            Consumed => Err(Error::AlreadyConsumed),
        }
    }

    /// Return `Some(output)` if the awaitable is done.
    pub fn take_output(&self) -> Option<Output> {
        use InnerState::*;

        let prev_state = mem::replace(&mut *self.0.lock(), Consumed);

        match prev_state {
            Done(value) => Some(value),
            _ => None,
        }
    }

    /// Return true if current state is `Done`.
    pub fn is_done(&self) -> bool {
        matches!(&*self.0.lock(), InnerState::Done(_))
    }

    /// Return true if current state is `Consumed`.
    pub fn is_consumed(&self) -> bool {
        matches!(&*self.0.lock(), InnerState::Consumed)
    }
}