#![cfg_attr(not(test), expect(dead_code))]
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use educe::Educe;
use futures::stream::{Fuse, FusedStream};
use futures::{Stream, StreamExt};
use pin_project::pin_project;
use postage::watch;
#[derive(Educe)]
#[educe(Debug)]
pub(crate) struct NotifySender<T = ()> {
sender: watch::Sender<()>,
#[educe(Debug(ignore))]
_marker: PhantomData<fn() -> T>,
}
#[derive(Educe)]
#[educe(Debug)]
#[pin_project]
pub(crate) struct NotifyReceiver<T = ()> {
#[pin]
receiver: Fuse<watch::Receiver<()>>,
#[educe(Debug(ignore))]
_marker: PhantomData<fn() -> T>,
}
impl NotifySender {
pub(crate) fn new() -> Self {
Self::new_typed()
}
}
impl<T> NotifySender<T> {
pub(crate) fn new_typed() -> Self {
let (sender, _receiver) = watch::channel();
Self {
sender,
_marker: Default::default(),
}
}
pub(crate) fn notify(&mut self) {
self.sender.borrow_mut();
}
pub(crate) fn subscribe(&mut self) -> NotifyReceiver<T> {
let mut receiver = self.sender.subscribe();
use postage::stream::Stream as PostageStream;
use postage::stream::TryRecvError;
assert_eq!(PostageStream::try_recv(&mut receiver), Ok(()));
assert_eq!(
PostageStream::try_recv(&mut receiver),
Err(TryRecvError::Pending),
);
NotifyReceiver {
receiver: receiver.fuse(),
_marker: Default::default(),
}
}
}
impl<T> Stream for NotifyReceiver<T> {
type Item = ();
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().receiver.poll_next(cx)
}
}
impl<T> FusedStream for NotifyReceiver<T> {
fn is_terminated(&self) -> bool {
self.receiver.is_terminated()
}
}
#[cfg(test)]
mod test {
#![allow(clippy::unwrap_used)]
use super::*;
use futures::FutureExt;
#[test]
fn notify() {
tor_rtmock::MockRuntime::test_with_various(|_rt| async move {
let mut sender = NotifySender::new();
let mut receiver = sender.subscribe();
assert_eq!(receiver.next().now_or_never(), None);
assert_eq!(receiver.next().now_or_never(), None);
sender.notify();
assert_eq!(receiver.next().now_or_never(), Some(Some(())));
assert_eq!(receiver.next().now_or_never(), None);
sender.notify();
sender.notify();
sender.notify();
assert_eq!(receiver.next().now_or_never(), Some(Some(())));
assert_eq!(receiver.next().now_or_never(), None);
sender.notify();
drop(sender);
assert_eq!(receiver.next().now_or_never(), Some(Some(())));
assert_eq!(receiver.next().now_or_never(), Some(None));
assert_eq!(receiver.next().now_or_never(), Some(None));
});
}
#[test]
fn notify_multiple_receivers() {
tor_rtmock::MockRuntime::test_with_various(|_rt| async move {
let mut sender = NotifySender::new();
let mut receiver_1 = sender.subscribe();
let mut receiver_2 = sender.subscribe();
sender.notify();
let mut receiver_3 = sender.subscribe();
assert_eq!(receiver_1.next().now_or_never(), Some(Some(())));
assert_eq!(receiver_2.next().now_or_never(), Some(Some(())));
assert_eq!(receiver_3.next().now_or_never(), None);
});
}
}