1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
mod errors;
mod receiver;
mod sender;
pub use errors::Error;
pub use receiver::Receiver;
pub use sender::Sender;
use std::time::Duration;
pub fn unbounded<T>(delay: Duration) -> (Sender<T>, Receiver<T>) {
let (tx, rx) = crossbeam_channel::unbounded();
(Sender::new(tx, delay), Receiver::new(rx, delay))
}
pub fn bounded<T>(delay: Duration, cap: usize) -> (Sender<T>, Receiver<T>) {
let (tx, rx) = crossbeam_channel::bounded(cap);
(Sender::new(tx, delay), Receiver::new(rx, delay))
}
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
#[tokio::test]
async fn send_receive() {
let (tx, rx) = unbounded(Duration::from_millis(100));
let send_fut = async move {
for i in 0..100usize {
tx.send(i).await.expect("Failed to send");
}
};
let recv_fut = async move {
let f: Vec<_> = rx.collect().await;
f
};
tokio::spawn(send_fut);
let recv = recv_fut.await;
assert_eq!(recv.len(), 100);
}
#[tokio::test(single_thread)]
async fn send_receive_slow_sender_single_thread() {
let (tx, rx) = unbounded(Duration::from_millis(10));
let send_fut = async move {
for i in 0..100usize {
if i % 10 == 0 {
tokio_timer::sleep(Duration::from_millis(100)).await;
}
tx.send(i).await.expect("Failed to send");
}
};
let recv_fut = async move {
let f: Vec<_> = rx.collect().await;
f
};
tokio::spawn(send_fut);
let recv = recv_fut.await;
assert_eq!(recv.len(), 100);
}
#[tokio::test(multi_thread)]
async fn send_receive_slow_sender_multi_thread() {
let (tx, rx) = unbounded(Duration::from_secs(10));
let send_fut = async move {
for i in 0..100usize {
if i % 10 == 0 {
tokio_timer::sleep(Duration::from_millis(100)).await;
}
tx.send(i).await.expect("Failed to send");
}
};
let recv_fut = async move {
let f: Vec<_> = rx.collect().await;
f
};
tokio::spawn(send_fut);
let recv = recv_fut.await;
assert_eq!(recv.len(), 100);
}
}