Skip to main content

aranya_util/
ready.rs

1use std::sync::{
2    atomic::{AtomicUsize, Ordering},
3    Arc,
4};
5
6/// All [`Notifier`]s were dropped before notifying.
7#[derive(Copy, Clone, Debug, Default, Hash, PartialEq, Eq, PartialOrd, Ord, thiserror::Error)]
8#[error("ready notifiers dropped before notifying")]
9pub struct WaitError;
10
11/// Waits for `n` tasks to be ready.
12#[derive(Debug)]
13pub struct Waiter {
14    rx: tokio::sync::mpsc::Receiver<()>,
15    notifier: Notifier,
16}
17
18impl Waiter {
19    /// Create a waiter that will wait for `count` ready notifications.
20    pub fn new(count: usize) -> Self {
21        let (tx, rx) = tokio::sync::mpsc::channel(1);
22        Self {
23            rx,
24            notifier: Notifier {
25                count: Arc::new(AtomicUsize::new(count)),
26                tx,
27            },
28        }
29    }
30
31    /// Get a notifier associated with this waiter.
32    pub fn notifier(&self) -> Notifier {
33        self.notifier.clone()
34    }
35
36    /// Wait for `count` ready notifications.
37    pub async fn wait(mut self) -> Result<(), WaitError> {
38        drop(self.notifier);
39        self.rx.recv().await.ok_or(WaitError)
40    }
41}
42
43/// Notifies that a task is ready.
44#[derive(Clone, Debug)]
45pub struct Notifier {
46    count: Arc<AtomicUsize>,
47    tx: tokio::sync::mpsc::Sender<()>,
48}
49
50impl Notifier {
51    /// Notifies that one task is ready.
52    ///
53    /// After `count` calls, [`Waiter::wait`] will resolve.
54    pub fn notify(self) {
55        if self.count.fetch_sub(1, Ordering::AcqRel) == 1 {
56            self.tx.try_send(()).ok();
57        }
58    }
59}