1use std::{
2 pin::Pin,
3 sync::Arc,
4 task::{Context, Poll},
5};
6
7use futures::{
8 channel::mpsc::{channel, Receiver, Sender, TrySendError},
9 Stream, StreamExt,
10};
11
12#[derive(Debug)]
15
16pub struct Notify<T> {
17 sender: Sender<T>,
18 receiver: Arc<futures::lock::Mutex<Receiver<T>>>,
19}
20
21impl<T> Clone for Notify<T> {
22 fn clone(&self) -> Self {
23 Self {
24 sender: self.sender.clone(),
25 receiver: self.receiver.clone(),
26 }
27 }
28}
29
30impl<T> Notify<T> {
31 pub fn new() -> Self {
34 let (sender, receiver) = channel(1);
35
36 Self {
37 sender,
38 receiver: Arc::new(futures::lock::Mutex::new(receiver)),
39 }
40 }
41
42 pub fn notify(&self, value: T) -> Result<(), TrySendError<T>> {
44 self.sender.clone().try_send(value)
45 }
46
47 pub async fn notified(&self) {
51 self.receiver
52 .lock()
53 .await
54 .next()
55 .await
56 .expect("sender is dropped");
57 }
58}
59
60impl<T> Default for Notify<T> {
61 fn default() -> Self {
62 Self::new()
63 }
64}
65
66impl<T> Stream for Notify<T> {
67 type Item = T;
68
69 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70 if let Some(mut receiver) = self.receiver.try_lock() {
71 receiver.poll_next_unpin(cx)
72 } else {
73 Poll::Pending
74 }
75 }
76}