1use crate::inner::StreamProcessor;
4use tokio::sync::mpsc::{self, error::SendError};
5
6pub mod bounded {
7 use super::*;
9
10 implement!(
11 "A clonable thread safe sink-like queue.",
12 mpsc::Sender<Item>,
13 mpsc::Receiver<Item>,
14 (queue_len: usize),
15 mpsc::channel(queue_len),
16 );
17
18 impl<Item: std::fmt::Debug> LazyQueue<Item> {
19 pub async fn send(&mut self, item: Item) -> Result<(), SendError<Item>> {
23 self.sender.send(item).await
24 }
25 }
26}
27
28pub mod unbounded {
29 use super::*;
31
32 implement!(
33 "A clonable thread safe sink-like queue.",
34 mpsc::UnboundedSender<Item>,
35 mpsc::UnboundedReceiver<Item>,
36 (),
37 mpsc::unbounded_channel(),
38 );
39
40 impl<Item> LazyQueue<Item> {
41 pub fn send(&self, item: Item) -> Result<(), SendError<Item>> {
43 self.sender.send(item)
44 }
45 }
46}
47
48#[cfg(test)]
49mod tests {
50 use super::bounded::*;
51 use futures::{FutureExt, StreamExt};
52 use std::sync::mpsc::channel;
53 use tokio::{runtime::Runtime, sync::watch};
54
55 #[test]
56 fn multithread() {
57 let (unpause, unpause_rx) = watch::channel(false);
58
59 let (events_sender, events_receiver) = channel();
60
61 let processor = move |item: u8| {
64 let mut unpause_rx_clone = unpause_rx.clone();
65 let events_sender = events_sender.clone();
66 async move {
67 loop {
68 if let Some(true) = unpause_rx_clone.next().await {
69 break;
70 }
71 }
72 events_sender.send(item).expect("Unable to send an event");
73 }
74 };
75
76 const CAPACITY: usize = 3;
77 let (mut queue, driver) = LazyQueue::new(processor, CAPACITY);
78
79 let mut rt = Runtime::new().unwrap();
80
81 let (driver, _handle) = driver.remote_handle();
82 rt.spawn(driver);
83
84 let items = vec![0, 2, 1, 3];
86 assert_eq!(CAPACITY, items.len() - 1); for &item in &items {
88 rt.block_on(queue.send(item)).expect("Can't send an item");
89 }
90
91 for &item in &[9, 10] {
95 let x = queue.send(item).now_or_never();
96 assert!(x.is_none());
97 }
98
99 unpause.broadcast(true).unwrap();
100
101 assert_eq!(items, events_receiver.iter().take(4).collect::<Vec<_>>());
103 }
104}