1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
use std::{
    pin::Pin,
    sync::Arc,
    task::{Context, Poll},
};

use futures::{
    channel::mpsc::{channel, Receiver, Sender, TrySendError},
    Stream, StreamExt,
};

/// The `Notify` struct encapsulates asynchronous, multi-producer, single-consumer (MPSC) channel functionality.
/// It is used to send notifications of type `T` from multiple producers to a single consumer.
#[derive(Debug)]

pub struct Notify<T> {
    sender: Sender<T>,
    receiver: Arc<futures::lock::Mutex<Receiver<T>>>,
}

impl<T> Clone for Notify<T> {
    fn clone(&self) -> Self {
        Self {
            sender: self.sender.clone(),
            receiver: self.receiver.clone(),
        }
    }
}

impl<T> Notify<T> {
    /// Creates a new instance of `Notify`.
    /// It initializes a channel with a buffer size of 1 and wraps the receiver in an `Arc<Mutex>`.
    pub fn new() -> Self {
        let (sender, receiver) = channel(1);

        Self {
            sender,
            receiver: Arc::new(futures::lock::Mutex::new(receiver)),
        }
    }

    /// Sends a notification of type `T` to the receiver.
    pub fn notify(&self, value: T) -> Result<(), TrySendError<T>> {
        self.sender.clone().try_send(value)
    }

    /// Waits for and retrieves the next notification.
    /// This is an asynchronous method that awaits until a notification is available.
    /// Panics if the sender is dropped, ensuring that `notified` is always eventually fulfilled.
    pub async fn notified(&self) {
        self.receiver
            .lock()
            .await
            .next()
            .await
            .expect("sender is dropped");
    }
}

impl<T> Default for Notify<T> {
    fn default() -> Self {
        Self::new()
    }
}

impl<T> Stream for Notify<T> {
    type Item = T;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if let Some(mut receiver) = self.receiver.try_lock() {
            receiver.poll_next_unpin(cx)
        } else {
            Poll::Pending
        }
    }
}