use std::sync::Arc;
use tokio::sync::Semaphore;
#[derive(Debug)]
pub struct NotifyOnce {
inner: Arc<Semaphore>,
have_recved: bool,
}
impl NotifyOnce {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let inner = Arc::new(Semaphore::new(0));
Self {
inner,
have_recved: false,
}
}
pub fn send(&self) {
self.inner.close();
}
pub async fn recv(&mut self) {
if self.have_recved {
std::future::pending().await
} else {
self.inner
.acquire()
.await
.map_err(|_| ())
.expect_err("Shouldn't've been able to acquire a permit");
self.have_recved = true;
}
}
pub async fn recv_owned(mut self) {
self.recv().await
}
#[must_use]
pub fn try_recv(&self) -> bool {
self.inner.is_closed()
}
}
impl Clone for NotifyOnce {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
have_recved: false,
}
}
}
#[cfg(test)]
mod test {
use std::time::Duration;
use tokio::time;
use tokio_test::{assert_pending, assert_ready};
use super::*;
#[test]
fn multiple_sends_doesnt_panic() {
let shutdown = NotifyOnce::new();
shutdown.send();
shutdown.send();
shutdown.send();
}
#[test]
fn only_yields_shutdown_once() {
let shutdown1 = NotifyOnce::new();
let mut shutdown2 = shutdown1.clone();
let mut recv_task2_1 = tokio_test::task::spawn(shutdown2.recv());
assert_pending!(recv_task2_1.poll());
shutdown1.send();
assert!(recv_task2_1.is_woken());
assert_ready!(recv_task2_1.poll());
drop(recv_task2_1);
let mut recv_task2_2 = tokio_test::task::spawn(shutdown2.recv());
assert_pending!(recv_task2_2.poll());
assert_pending!(recv_task2_2.poll());
shutdown1.send();
assert_pending!(recv_task2_2.poll());
assert_pending!(recv_task2_2.poll());
drop(recv_task2_2);
let mut shutdown3 = shutdown2.clone();
let mut recv_task3 = tokio_test::task::spawn(shutdown3.recv());
assert_ready!(recv_task3.poll());
}
#[tokio::test(start_paused = true)]
async fn subscribe_after_close_is_ok() {
let shutdown1 = NotifyOnce::new();
let mut shutdown2 = shutdown1.clone();
time::sleep(Duration::from_secs(1)).await;
shutdown1.send();
time::timeout(Duration::from_nanos(1), shutdown2.recv())
.await
.expect("Did not finish immediately");
let mut shutdown3 = shutdown2.clone();
assert!(shutdown3.try_recv());
time::timeout(Duration::from_nanos(1), shutdown3.recv())
.await
.expect("Did not finish immediately");
}
}