lazy-queue 0.2.1

Futures-driven lazy queue processing
Documentation
//! Thread safe queue implementation.

use crate::inner::StreamProcessor;
use tokio::sync::mpsc::{self, error::SendError};

pub mod bounded {
    //! Bounded queue.
    use super::*;

    implement!(
        "A clonable thread safe sink-like queue.",
        mpsc::Sender<Item>,
        mpsc::Receiver<Item>,
        (queue_len: usize),
        mpsc::channel(queue_len),
    );

    impl<Item: std::fmt::Debug> LazyQueue<Item> {
        /// Sends a value, waiting until there is capacity.
        ///
        /// See [`tokio::sync::mpsc::Sender::send`] for more details.
        pub async fn send(&mut self, item: Item) -> Result<(), SendError<Item>> {
            self.sender.send(item).await
        }
    }
}

pub mod unbounded {
    //! Unbounded queue.
    use super::*;

    implement!(
        "A clonable thread safe sink-like queue.",
        mpsc::UnboundedSender<Item>,
        mpsc::UnboundedReceiver<Item>,
        (),
        mpsc::unbounded_channel(),
    );

    impl<Item> LazyQueue<Item> {
        /// Attempts to send a message on this queue without blocking.
        pub fn send(&self, item: Item) -> Result<(), SendError<Item>> {
            self.sender.send(item)
        }
    }
}

#[cfg(test)]
mod tests {
    use super::bounded::*;
    use futures::{FutureExt, StreamExt};
    use std::sync::mpsc::channel;
    use tokio::{runtime::Runtime, sync::watch};

    #[test]
    fn multithread() {
        let (unpause, unpause_rx) = watch::channel(false);

        let (events_sender, events_receiver) = channel();

        // This "processor" will simply store all the incoming items.
        // But it won't complete until `unpause.broadcast(true)` is run.
        let processor = move |item: u8| {
            let mut unpause_rx_clone = unpause_rx.clone();
            let events_sender = events_sender.clone();
            async move {
                loop {
                    if let Some(true) = unpause_rx_clone.next().await {
                        break;
                    }
                }
                events_sender.send(item).expect("Unable to send an event");
            }
        };

        const CAPACITY: usize = 3;
        let (mut queue, driver) = LazyQueue::new(processor, CAPACITY);

        let mut rt = Runtime::new().unwrap();

        let (driver, _handle) = driver.remote_handle();
        rt.spawn(driver);

        // Fill up the queue by sending into it a CAPACITY + 1 number of items.
        let items = vec![0, 2, 1, 3];
        assert_eq!(CAPACITY, items.len() - 1); // just a sanity check.
        for &item in &items {
            rt.block_on(queue.send(item)).expect("Can't send an item");
        }

        // Now if we try to send anything else the call should block since the queue is already
        // filled up. We are sending 2 elements since the real queue's capacity might be
        // CAPACITY + 1.
        for &item in &[9, 10] {
            let x = queue.send(item).now_or_never();
            assert!(x.is_none());
        }

        unpause.broadcast(true).unwrap();

        // Check that the processor has received the same items we have sent to it.
        assert_eq!(items, events_receiver.iter().take(4).collect::<Vec<_>>());
    }
}