lazy_queue/
sync.rs

1//! Thread safe queue implementation.
2
3use crate::inner::StreamProcessor;
4use tokio::sync::mpsc::{self, error::SendError};
5
6pub mod bounded {
7    //! Bounded queue.
8    use super::*;
9
10    implement!(
11        "A clonable thread safe sink-like queue.",
12        mpsc::Sender<Item>,
13        mpsc::Receiver<Item>,
14        (queue_len: usize),
15        mpsc::channel(queue_len),
16    );
17
18    impl<Item: std::fmt::Debug> LazyQueue<Item> {
19        /// Sends a value, waiting until there is capacity.
20        ///
21        /// See [`tokio::sync::mpsc::Sender::send`] for more details.
22        pub async fn send(&mut self, item: Item) -> Result<(), SendError<Item>> {
23            self.sender.send(item).await
24        }
25    }
26}
27
28pub mod unbounded {
29    //! Unbounded queue.
30    use super::*;
31
32    implement!(
33        "A clonable thread safe sink-like queue.",
34        mpsc::UnboundedSender<Item>,
35        mpsc::UnboundedReceiver<Item>,
36        (),
37        mpsc::unbounded_channel(),
38    );
39
40    impl<Item> LazyQueue<Item> {
41        /// Attempts to send a message on this queue without blocking.
42        pub fn send(&self, item: Item) -> Result<(), SendError<Item>> {
43            self.sender.send(item)
44        }
45    }
46}
47
48#[cfg(test)]
49mod tests {
50    use super::bounded::*;
51    use futures::{FutureExt, StreamExt};
52    use std::sync::mpsc::channel;
53    use tokio::{runtime::Runtime, sync::watch};
54
55    #[test]
56    fn multithread() {
57        let (unpause, unpause_rx) = watch::channel(false);
58
59        let (events_sender, events_receiver) = channel();
60
61        // This "processor" will simply store all the incoming items.
62        // But it won't complete until `unpause.broadcast(true)` is run.
63        let processor = move |item: u8| {
64            let mut unpause_rx_clone = unpause_rx.clone();
65            let events_sender = events_sender.clone();
66            async move {
67                loop {
68                    if let Some(true) = unpause_rx_clone.next().await {
69                        break;
70                    }
71                }
72                events_sender.send(item).expect("Unable to send an event");
73            }
74        };
75
76        const CAPACITY: usize = 3;
77        let (mut queue, driver) = LazyQueue::new(processor, CAPACITY);
78
79        let mut rt = Runtime::new().unwrap();
80
81        let (driver, _handle) = driver.remote_handle();
82        rt.spawn(driver);
83
84        // Fill up the queue by sending into it a CAPACITY + 1 number of items.
85        let items = vec![0, 2, 1, 3];
86        assert_eq!(CAPACITY, items.len() - 1); // just a sanity check.
87        for &item in &items {
88            rt.block_on(queue.send(item)).expect("Can't send an item");
89        }
90
91        // Now if we try to send anything else the call should block since the queue is already
92        // filled up. We are sending 2 elements since the real queue's capacity might be
93        // CAPACITY + 1.
94        for &item in &[9, 10] {
95            let x = queue.send(item).now_or_never();
96            assert!(x.is_none());
97        }
98
99        unpause.broadcast(true).unwrap();
100
101        // Check that the processor has received the same items we have sent to it.
102        assert_eq!(items, events_receiver.iter().take(4).collect::<Vec<_>>());
103    }
104}