1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
//! Thread safe queue implementation.

use crate::inner::StreamProcessor;
use tokio::sync::mpsc::{self, error::SendError};

pub mod bounded {
    //! Bounded queue.
    use super::*;

    implement!(
        "A clonable thread safe sink-like queue.",
        mpsc::Sender<Item>,
        mpsc::Receiver<Item>,
        (queue_len: usize),
        mpsc::channel(queue_len),
    );

    impl<Item: std::fmt::Debug> LazyQueue<Item> {
        /// Sends a value, waiting until there is capacity.
        ///
        /// See [`tokio::sync::mpsc::Sender::send`] for more details.
        pub async fn send(&mut self, item: Item) -> Result<(), SendError<Item>> {
            self.sender.send(item).await
        }
    }
}

pub mod unbounded {
    //! Unbounded queue.
    use super::*;

    implement!(
        "A clonable thread safe sink-like queue.",
        mpsc::UnboundedSender<Item>,
        mpsc::UnboundedReceiver<Item>,
        (),
        mpsc::unbounded_channel(),
    );

    impl<Item> LazyQueue<Item> {
        /// Attempts to send a message on this queue without blocking.
        pub fn send(&self, item: Item) -> Result<(), SendError<Item>> {
            self.sender.send(item)
        }
    }
}

#[cfg(test)]
mod tests {
    use super::bounded::*;
    use futures::{FutureExt, StreamExt};
    use std::sync::mpsc::channel;
    use tokio::{runtime::Runtime, sync::watch};

    #[test]
    fn multithread() {
        let (unpause, unpause_rx) = watch::channel(false);

        let (events_sender, events_receiver) = channel();

        // This "processor" will simply store all the incoming items.
        // But it won't complete until `unpause.broadcast(true)` is run.
        let processor = move |item: u8| {
            let mut unpause_rx_clone = unpause_rx.clone();
            let events_sender = events_sender.clone();
            async move {
                loop {
                    if let Some(true) = unpause_rx_clone.next().await {
                        break;
                    }
                }
                events_sender.send(item).expect("Unable to send an event");
            }
        };

        const CAPACITY: usize = 3;
        let (mut queue, driver) = LazyQueue::new(processor, CAPACITY);

        let mut rt = Runtime::new().unwrap();

        let (driver, _handle) = driver.remote_handle();
        rt.spawn(driver);

        // Fill up the queue by sending into it a CAPACITY + 1 number of items.
        let items = vec![0, 2, 1, 3];
        assert_eq!(CAPACITY, items.len() - 1); // just a sanity check.
        for &item in &items {
            rt.block_on(queue.send(item)).expect("Can't send an item");
        }

        // Now if we try to send anything else the call should block since the queue is already
        // filled up. We are sending 2 elements since the real queue's capacity might be
        // CAPACITY + 1.
        for &item in &[9, 10] {
            let x = queue.send(item).now_or_never();
            assert!(x.is_none());
        }

        unpause.broadcast(true).unwrap();

        // Check that the processor has received the same items we have sent to it.
        assert_eq!(items, events_receiver.iter().take(4).collect::<Vec<_>>());
    }
}