1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
/// Snooze is a crate that allows you to trigger the sending of a message with a specific delay or
/// interval. This is usefull to handle TTL for example.
/// 
/// ## Usage
/// 
/// Basically, you just need to instanciate an instance of a `Notifier`, to which you register events to trigger
/// at a certain time:
/// 
/// ```rust
/// let mut notifier = Nofifier::new();
/// // do not drop the handle
/// notifier.notify_afer(Duration::from_secs(1), "world");
/// let _handle = notifier.notify_interval(Duration::from_secs(2), "hello");
/// 
/// assert_eq!(notifier.next().await, Some("hello"));
/// assert_eq!(notifier.next().await, Some("world"));
/// assert_eq!(notifier.next().await, Some("hello"));
/// ```
///
use std::time::Duration;

use tokio::sync::{mpsc, oneshot};
use tokio::time;

pub struct Nofifier<T> {
    sender: mpsc::UnboundedSender<T>,
    receiver: mpsc::UnboundedReceiver<T>
}

impl<T: Sync + Send + 'static> Nofifier<T> {
    pub fn new() -> Self {
        let (sender, receiver) = mpsc::unbounded_channel();
        Self { sender, receiver }
    }

    pub fn notify_afer(&self, duration: Duration, message: T) {
        let sender = self.sender.clone();
        tokio::spawn(async move {
            time::sleep(duration).await;
            let _ =  sender.clone().send(message);
        });
    }

    pub async fn next(&mut self) -> Option<T> {
        self.receiver.recv().await
    }
}

pub struct NotifyIntervalHandle(oneshot::Sender<()>);

impl NotifyIntervalHandle {
    pub fn close(self) { }
}

impl<T: Sync + Send + 'static + Clone> Nofifier<T> {
    /// Sends a message `T` every `duration`. Returns a handle to the task that shutsdown the task
    /// on drop.
    pub fn notify_interval(&self, duration: Duration, message: T) -> NotifyIntervalHandle {
        let sender = self.sender.clone();
        let (tx, mut rx) = oneshot::channel();
        tokio::spawn(async move {
            let mut interval = time::interval(duration);
            loop {
                tokio::select! {
                    _ = interval.tick() => {
                        if let Err(_) = sender.clone().send(message.clone()) {
                            break
                        }
                    }
                    _ = &mut rx => break,
                }
            }
        });
        NotifyIntervalHandle(tx)
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use std::time::Duration;

    #[tokio::test]
    async fn test_notify_after() {
        let mut notifier = Nofifier::new();
        notifier.notify_afer(Duration::from_secs(2), "hello");
        notifier.notify_afer(Duration::from_secs(1), "world");

        assert_eq!(notifier.next().await, Some("world"));
        assert_eq!(notifier.next().await, Some("hello"));
    }

    #[tokio::test]
    async fn test_notify_interfal() {
        let mut notifier = Nofifier::new();
        // do not drop the handle
        let _handle = notifier.notify_interval(Duration::from_secs(1), "hello");

        assert_eq!(notifier.next().await, Some("hello"));
        assert_eq!(notifier.next().await, Some("hello"));
    }

    #[tokio::test]
    async fn test_mixed() {
        let mut notifier = Nofifier::new();
        // do not drop the handle
        notifier.notify_afer(Duration::from_secs(1), "world");
        let _handle = notifier.notify_interval(Duration::from_secs(2), "hello");

        assert_eq!(notifier.next().await, Some("hello"));
        assert_eq!(notifier.next().await, Some("world"));
        assert_eq!(notifier.next().await, Some("hello"));
    }
}