apalis_core/
notify.rs

1use std::{
2    pin::Pin,
3    sync::Arc,
4    task::{Context, Poll},
5};
6
7use futures::{
8    channel::mpsc::{channel, Receiver, Sender, TrySendError},
9    Stream, StreamExt,
10};
11
12/// The `Notify` struct encapsulates asynchronous, multi-producer, single-consumer (MPSC) channel functionality.
13/// It is used to send notifications of type `T` from multiple producers to a single consumer.
14#[derive(Debug)]
15
16pub struct Notify<T> {
17    sender: Sender<T>,
18    receiver: Arc<futures::lock::Mutex<Receiver<T>>>,
19}
20
21impl<T> Clone for Notify<T> {
22    fn clone(&self) -> Self {
23        Self {
24            sender: self.sender.clone(),
25            receiver: self.receiver.clone(),
26        }
27    }
28}
29
30impl<T> Notify<T> {
31    /// Creates a new instance of `Notify`.
32    /// It initializes a channel with a buffer size of 1 and wraps the receiver in an `Arc<Mutex>`.
33    pub fn new() -> Self {
34        let (sender, receiver) = channel(1);
35
36        Self {
37            sender,
38            receiver: Arc::new(futures::lock::Mutex::new(receiver)),
39        }
40    }
41
42    /// Sends a notification of type `T` to the receiver.
43    pub fn notify(&self, value: T) -> Result<(), TrySendError<T>> {
44        self.sender.clone().try_send(value)
45    }
46
47    /// Waits for and retrieves the next notification.
48    /// This is an asynchronous method that awaits until a notification is available.
49    /// Panics if the sender is dropped, ensuring that `notified` is always eventually fulfilled.
50    pub async fn notified(&self) {
51        self.receiver
52            .lock()
53            .await
54            .next()
55            .await
56            .expect("sender is dropped");
57    }
58}
59
60impl<T> Default for Notify<T> {
61    fn default() -> Self {
62        Self::new()
63    }
64}
65
66impl<T> Stream for Notify<T> {
67    type Item = T;
68
69    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70        if let Some(mut receiver) = self.receiver.try_lock() {
71            receiver.poll_next_unpin(cx)
72        } else {
73            Poll::Pending
74        }
75    }
76}