1pub mod tbqueue;
2pub mod tchan;
3pub mod tqueue;
4pub mod tvecdequeue;
5
6use crate::Stm;
7
8pub trait TQueueLike<T>: Clone + Send {
12 fn read(&self) -> Stm<T>;
14 fn write(&self, value: T) -> Stm<()>;
16 fn is_empty(&self) -> Stm<bool>;
18}
19
20#[cfg(test)]
21mod test {
22 use std::time::Duration;
23 use tokio::sync::mpsc::{self, UnboundedReceiver};
24
25 use super::TQueueLike;
26 use crate::atomically;
27
28 async fn recv_timeout<T>(rx: &mut UnboundedReceiver<T>, t: Duration) -> T {
29 tokio::time::timeout(t, rx.recv()).await.unwrap().unwrap()
30 }
31
32 pub async fn test_write_and_read_back<Q: 'static + TQueueLike<i32>>(mq: fn() -> Q) {
33 let queue = mq();
34 let (x, y) = atomically(|| {
35 queue.write(42)?;
36 queue.write(31)?;
37 let x = queue.read()?;
38 let y = queue.read()?;
39 Ok((x, y))
40 })
41 .await;
42
43 assert_eq!(42, x);
44 assert_eq!(31, y);
45 }
46
47 pub async fn test_threaded<Q: TQueueLike<i32> + Sync + 'static>(mq: fn() -> Q) {
55 let queue1 = mq();
56 let queue2 = queue1.clone();
58
59 let (sender, mut receiver) = mpsc::unbounded_channel();
60
61 tokio::spawn(async move {
62 let x = atomically(|| queue2.read()).await;
63 sender.send(x).unwrap();
64 });
65
66 tokio::time::sleep(Duration::from_millis(100)).await;
67 atomically(|| queue1.write(42)).await;
68
69 let x = recv_timeout(&mut receiver, Duration::from_millis(500)).await;
70
71 assert_eq!(42, x);
72 }
73
74 pub async fn test_is_empty<Q: 'static + TQueueLike<i32>>(mq: fn() -> Q) {
75 let queue = mq();
76 let is_empty = atomically(|| queue.is_empty()).await;
77
78 assert!(is_empty);
79 }
80
81 pub async fn test_non_empty<Q: 'static + TQueueLike<i32>>(mq: fn() -> Q) {
82 let queue = mq();
83 atomically(|| queue.write(42)).await;
84 let is_empty = atomically(|| queue.is_empty()).await;
85 assert!(!is_empty);
86 }
87}
88
89#[cfg(all(test, feature = "unstable"))]
90mod bench {
91 use super::TQueueLike;
92 use crate::atomically;
93 use std::time::Duration;
94
95 #[cfg(all(test, feature = "unstable"))]
96 use etest::Bencher;
97 fn new_bench_runtime() -> tokio::runtime::Runtime {
100 tokio::runtime::Builder::new_multi_thread()
101 .enable_time()
102 .build()
103 .unwrap()
104 }
105
106 pub fn bench_two_threads_read_write<Q: TQueueLike<i32> + Sync + 'static>(
108 b: &mut Bencher,
109 mq: fn() -> Q,
110 ) {
111 let rt = new_bench_runtime();
112
113 b.iter(|| {
114 rt.block_on(async {
115 let n = 1000;
116 let queue1 = mq();
117 let queue2 = queue1.clone();
118
119 let sender = tokio::spawn(async move {
120 for i in 1..n {
121 atomically(|| queue1.write(i)).await;
122 }
123 });
124
125 let receiver = tokio::spawn(async move {
126 for i in 1..n {
127 let r = atomically(|| queue2.read()).await;
128 assert_eq!(i, r);
129 }
130 });
131
132 tokio::time::timeout(Duration::from_secs(10), sender)
133 .await
134 .unwrap()
135 .unwrap();
136
137 tokio::time::timeout(Duration::from_secs(10), receiver)
138 .await
139 .unwrap()
140 .unwrap();
141 });
142 });
143 }
144
145 pub fn bench_one_thread_write_many_then_read<Q: TQueueLike<i32>>(
147 b: &mut Bencher,
148 mq: fn() -> Q,
149 ) {
150 let rt = new_bench_runtime();
151
152 b.iter(|| {
153 rt.block_on(async {
154 let n = 1000;
155 let chan = mq();
156
157 for i in 1..n {
158 atomically(|| chan.write(i)).await;
159 }
160 for i in 1..n {
161 let r = atomically(|| chan.read()).await;
162 assert_eq!(i, r);
163 }
164 });
165 });
166 }
167
168 pub fn bench_one_thread_repeat_write_read<Q: TQueueLike<i32>>(b: &mut Bencher, mq: fn() -> Q) {
170 let rt = new_bench_runtime();
171
172 b.iter(|| {
173 rt.block_on(async {
174 let n = 1000;
175 let m = 100;
176 let chan = mq();
177
178 for i in 1..(n / m) {
179 for j in 1..m {
180 atomically(|| chan.write(i * m + j)).await;
181 }
182 for j in 1..m {
183 let r = atomically(|| chan.read()).await;
184 assert_eq!(i * m + j, r);
185 }
186 }
187 });
188 });
189 }
190}
191
192#[macro_export]
200macro_rules! test_queue_mod {
201 ($make:expr) => {
202 #[cfg(test)]
203 mod test_queue {
204 use $crate::queues::test::*;
205
206 #[tokio::test]
207 async fn write_and_read_back() {
208 test_write_and_read_back($make).await;
209 }
210
211 #[tokio::test]
212 async fn threaded() {
213 test_threaded($make).await;
214 }
215
216 #[tokio::test]
217 async fn is_empty() {
218 test_is_empty($make).await;
219 }
220
221 #[tokio::test]
222 async fn non_empty() {
223 test_non_empty($make).await;
224 }
225 }
226
227 #[cfg(all(test, feature = "unstable"))]
228 mod bench_queue {
229 use super::super::bench::*;
230 use etest::Bencher;
231
232 #[bench]
233 fn two_threads_read_write(b: &mut Bencher) {
234 bench_two_threads_read_write(b, $make);
235 }
236
237 #[bench]
238 fn one_thread_write_many_then_read(b: &mut Bencher) {
239 bench_one_thread_write_many_then_read(b, $make);
240 }
241
242 #[bench]
243 fn one_thread_repeat_write_read(b: &mut Bencher) {
244 bench_one_thread_repeat_write_read(b, $make);
245 }
246 }
247 };
248}