1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
//! An `async-std` version Notify, like `tokio` Notify but implement Clone.

use std::future::Future;
use std::task::Context;

use async_std::sync::{channel, Receiver, Sender};
use futures_util::pin_mut;
use futures_util::task::noop_waker;

/// Notify a single task to wake up.
///
/// `Notify` provides a basic mechanism to notify a single task of an event.
/// `Notify` itself does not carry any data. Instead, it is to be used to signal
/// another task to perform an operation.
///
/// If [`notify()`] is called **before** [`notified().await`], then the next call to
/// [`notified().await`] will complete immediately, consuming the permit. Any
/// subsequent calls to [`notified().await`] will wait for a new permit.
///
/// If [`notify()`] is called **multiple** times before [`notified().await`], only a
/// **single** permit is stored. The next call to [`notified().await`] will
/// complete immediately, but the one after will wait for a new permit.
///
/// [`notify()`]: Notify::notify
/// [`notified().await`]: Notify::notified()
///
/// # Examples
///
/// Basic usage.
///
/// ```
/// use async_notify::Notify;
///
/// #[async_std::main]
/// async fn main() {
///     let notify = Notify::new();
///     let notify2 = notify.clone();
///
///     async_std::task::spawn(async move {
///         notify2.notified().await;
///         println!("received notification");
///     });
///
///     println!("sending notification");
///     notify.notify();
/// }
/// ```
pub struct Notify {
    sender: Sender<()>,
    receiver: Receiver<()>,
}

/// Like tokio Notify, this is a async-std version Notify and implement Clone.
impl Notify {
    pub fn new() -> Self {
        let (sender, receiver) = channel(1);

        Self { sender, receiver }
    }

    /// Notifies a waiting task
    ///
    /// If a task is currently waiting, that task is notified. Otherwise, a
    /// permit is stored in this `Notify` value and the **next** call to
    /// [`notified().await`] will complete immediately consuming the permit made
    /// available by this call to `notify()`.
    ///
    /// At most one permit may be stored by `Notify`. Many sequential calls to
    /// `notify` will result in a single permit being stored. The next call to
    /// `notified().await` will complete immediately, but the one after that
    /// will wait.
    ///
    /// [`notified().await`]: Notify::notified()
    ///
    /// # Examples
    ///
    /// ```
    /// use async_notify::Notify;
    ///
    /// #[async_std::main]
    /// async fn main() {
    ///     let notify = Notify::new();
    ///     let notify2 = notify.clone();
    ///
    ///     async_std::task::spawn(async move {
    ///         notify2.notified().await;
    ///         println!("received notification");
    ///     });
    ///
    ///     println!("sending notification");
    ///     notify.notify();
    /// }
    /// ```
    #[inline]
    pub fn notify(&self) {
        let future = self.sender.send(());
        pin_mut!(future);

        let _ = future.poll(&mut Context::from_waker(&noop_waker()));
    }

    /// Wait for a notification.
    ///
    /// Each `Notify` value holds a single permit. If a permit is available from
    /// an earlier call to [`notify()`], then `notified().await` will complete
    /// immediately, consuming that permit. Otherwise, `notified().await` waits
    /// for a permit to be made available by the next call to `notify()`.
    ///
    /// [`notify()`]: Notify::notify
    ///
    /// # Examples
    ///
    /// ```
    /// use async_notify::Notify;
    ///
    /// #[async_std::main]
    /// async fn main() {
    ///     let notify = Notify::new();
    ///     let notify2 = notify.clone();
    ///
    ///     async_std::task::spawn(async move {
    ///         notify2.notified().await;
    ///         println!("received notification");
    ///     });
    ///
    ///     println!("sending notification");
    ///     notify.notify();
    /// }
    /// ```
    #[inline]
    pub async fn notified(&self) {
        // Option never be None because sender and receiver always stay together.
        self.receiver.recv().await;
    }
}

impl Default for Notify {
    fn default() -> Notify {
        Notify::new()
    }
}

impl Clone for Notify {
    fn clone(&self) -> Self {
        Self {
            sender: self.sender.clone(),
            receiver: self.receiver.clone(),
        }
    }
}

#[cfg(test)]
mod tests {
    use futures_util::select;
    use futures_util::FutureExt;

    use super::*;

    #[async_std::test]
    async fn test() {
        let notify = Notify::new();
        let notify2 = notify.clone();

        notify.notify();

        select! {
            _ = notify2.notified().fuse() => (),
            default => panic!("should be notified")
        }
    }
}