notifies 0.1.0

various efficient async notifies
Documentation
//! # Single producer, multi-consumer notify
//!
//! See [`Master`] for more info on it.

use crate::spsc;

/// Single producer, multi-consumer notify. Essentially just collection of
/// [`spsc`] masters.
#[derive(Default)]
pub struct Master {
    masters: Vec<spsc::Master>,
}

impl Master {
    /// Create master.
    pub const fn new() -> Self {
        Self {
            masters: Vec::new(),
        }
    }

    /// Preallocate storage for `cap` slaves.
    pub fn with_capacity(cap: usize) -> Self {
        Self {
            masters: Vec::with_capacity(cap),
        }
    }

    /// Reserve space for at least `additional` more slaves.
    pub fn reserve(&mut self, additional: usize) {
        self.masters.reserve(additional);
    }
}

impl Master {
    /// Register slave and return handle to it. Dropping slave
    /// does nothing, as for now, space reclamation is not implemented.
    pub fn make_slave(&mut self) -> spsc::Slave {
        let (master, slave) = spsc::make();
        self.masters.push(master);

        slave
    }

    /// Get number of currently registered slaves.
    pub fn len(&self) -> usize {
        self.masters.len()
    }

    /// if [`Master::len`] == 0.
    pub fn is_empty(&self) -> bool {
        self.masters.is_empty()
    }

    /// Notify every registered slave. In tokio terms, this method stores
    /// permit for every slave, thus next await of [`spsc::Slave`] will wake-up
    /// immediately. Only one permit is stored.
    pub fn notify_waiters(&mut self) {
        for master in self.masters.iter_mut() {
            master.notify();
        }
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use tokio::{sync::mpsc, time::timeout};

    use super::*;

    type Rx = mpsc::UnboundedReceiver<usize>;

    async fn task(mut slave: spsc::Slave, tx: impl Fn() + Send + 'static) {
        loop {
            (&mut slave).await;
            tx();
        }
    }

    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
    async fn test_wakeups() {
        const WORKERS: usize = 2000;
        const TRIES: usize = 4;
        const TIMEOUT: Duration = Duration::from_millis(500);

        let correct: Vec<usize> = (0..WORKERS).collect();
        let mut buf = Vec::with_capacity(WORKERS);

        async fn case(rx: &mut Rx, correct: &[usize], buf: &mut Vec<usize>) {
            unsafe { buf.set_len(0) };
            for _ in 0..correct.len() {
                let item = timeout(TIMEOUT, rx.recv())
                    .await
                    .expect("timed out")
                    .unwrap();
                buf.push(item);
            }
            buf.sort_unstable();

            assert_eq!(buf, correct);
        }

        let mut master = Master::default();
        let (tx, mut rx) = mpsc::unbounded_channel();

        for worker_id in 0..WORKERS {
            let slave = master.make_slave();
            let tx = tx.clone();
            tokio::spawn(task(slave, move || tx.send(worker_id).unwrap()));
        }

        for try_no in 0..TRIES {
            master.notify_waiters();
            eprint!("try#{try_no}...");
            case(&mut rx, &correct, &mut buf).await;
            eprintln!(" success");
        }

        assert!(timeout(Duration::from_millis(100), rx.recv())
            .await
            .is_err());
    }
}