1use std::sync::{
2 atomic::{AtomicUsize, Ordering},
3 Arc,
4};
5
6#[derive(Copy, Clone, Debug, Default, Hash, PartialEq, Eq, PartialOrd, Ord, thiserror::Error)]
8#[error("ready notifiers dropped before notifying")]
9pub struct WaitError;
10
11#[derive(Debug)]
13pub struct Waiter {
14 rx: tokio::sync::mpsc::Receiver<()>,
15 notifier: Notifier,
16}
17
18impl Waiter {
19 pub fn new(count: usize) -> Self {
21 let (tx, rx) = tokio::sync::mpsc::channel(1);
22 Self {
23 rx,
24 notifier: Notifier {
25 count: Arc::new(AtomicUsize::new(count)),
26 tx,
27 },
28 }
29 }
30
31 pub fn notifier(&self) -> Notifier {
33 self.notifier.clone()
34 }
35
36 pub async fn wait(mut self) -> Result<(), WaitError> {
38 drop(self.notifier);
39 self.rx.recv().await.ok_or(WaitError)
40 }
41}
42
43#[derive(Clone, Debug)]
45pub struct Notifier {
46 count: Arc<AtomicUsize>,
47 tx: tokio::sync::mpsc::Sender<()>,
48}
49
50impl Notifier {
51 pub fn notify(self) {
55 if self.count.fetch_sub(1, Ordering::AcqRel) == 1 {
56 self.tx.try_send(()).ok();
57 }
58 }
59}