1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
use crate::inner::StreamProcessor;
use tokio::sync::mpsc::{self, error::SendError};
pub mod bounded {
use super::*;
implement!(
"A clonable thread safe sink-like queue.",
mpsc::Sender<Item>,
mpsc::Receiver<Item>,
(queue_len: usize),
mpsc::channel(queue_len),
);
impl<Item: std::fmt::Debug> LazyQueue<Item> {
pub async fn send(&mut self, item: Item) -> Result<(), SendError<Item>> {
self.sender.send(item).await
}
}
}
pub mod unbounded {
use super::*;
implement!(
"A clonable thread safe sink-like queue.",
mpsc::UnboundedSender<Item>,
mpsc::UnboundedReceiver<Item>,
(),
mpsc::unbounded_channel(),
);
impl<Item> LazyQueue<Item> {
pub fn send(&self, item: Item) -> Result<(), SendError<Item>> {
self.sender.send(item)
}
}
}
#[cfg(test)]
mod tests {
use super::bounded::*;
use futures::{FutureExt, StreamExt};
use std::sync::mpsc::channel;
use tokio::{runtime::Runtime, sync::watch};
#[test]
fn multithread() {
let (unpause, unpause_rx) = watch::channel(false);
let (events_sender, events_receiver) = channel();
let processor = move |item: u8| {
let mut unpause_rx_clone = unpause_rx.clone();
let events_sender = events_sender.clone();
async move {
loop {
if let Some(true) = unpause_rx_clone.next().await {
break;
}
}
events_sender.send(item).expect("Unable to send an event");
}
};
const CAPACITY: usize = 3;
let (mut queue, driver) = LazyQueue::new(processor, CAPACITY);
let mut rt = Runtime::new().unwrap();
let (driver, _handle) = driver.remote_handle();
rt.spawn(driver);
let items = vec![0, 2, 1, 3];
assert_eq!(CAPACITY, items.len() - 1);
for &item in &items {
rt.block_on(queue.send(item)).expect("Can't send an item");
}
for &item in &[9, 10] {
let x = queue.send(item).now_or_never();
assert!(x.is_none());
}
unpause.broadcast(true).unwrap();
assert_eq!(items, events_receiver.iter().take(4).collect::<Vec<_>>());
}
}