async_stm/queues/
mod.rs

1pub mod tbqueue;
2pub mod tchan;
3pub mod tqueue;
4pub mod tvecdequeue;
5
6use crate::Stm;
7
8/// Transactional queue-like structure.
9///
10/// This is a common interface between the various implementations in Simon Marlow's book.
11pub trait TQueueLike<T>: Clone + Send {
12    /// Pop the head of the queue, or retry until there is an element if it's empty.
13    fn read(&self) -> Stm<T>;
14    /// Push to the end of the queue.
15    fn write(&self, value: T) -> Stm<()>;
16    /// Check if the queue is empty.
17    fn is_empty(&self) -> Stm<bool>;
18}
19
20#[cfg(test)]
21mod test {
22    use std::time::Duration;
23    use tokio::sync::mpsc::{self, UnboundedReceiver};
24
25    use super::TQueueLike;
26    use crate::atomically;
27
28    async fn recv_timeout<T>(rx: &mut UnboundedReceiver<T>, t: Duration) -> T {
29        tokio::time::timeout(t, rx.recv()).await.unwrap().unwrap()
30    }
31
32    pub async fn test_write_and_read_back<Q: 'static + TQueueLike<i32>>(mq: fn() -> Q) {
33        let queue = mq();
34        let (x, y) = atomically(|| {
35            queue.write(42)?;
36            queue.write(31)?;
37            let x = queue.read()?;
38            let y = queue.read()?;
39            Ok((x, y))
40        })
41        .await;
42
43        assert_eq!(42, x);
44        assert_eq!(31, y);
45    }
46
47    /// Run multiple threads.
48    ///
49    /// Thread 1: Read from the channel, block until it's non-empty, then return the value.
50    ///
51    /// Thread 2: Wait a bit, then write a value.
52    ///
53    /// Check that Thread 1 has been woken up to read the value written by Thread 2.
54    pub async fn test_threaded<Q: TQueueLike<i32> + Sync + 'static>(mq: fn() -> Q) {
55        let queue1 = mq();
56        // Clone for Thread 2
57        let queue2 = queue1.clone();
58
59        let (sender, mut receiver) = mpsc::unbounded_channel();
60
61        tokio::spawn(async move {
62            let x = atomically(|| queue2.read()).await;
63            sender.send(x).unwrap();
64        });
65
66        tokio::time::sleep(Duration::from_millis(100)).await;
67        atomically(|| queue1.write(42)).await;
68
69        let x = recv_timeout(&mut receiver, Duration::from_millis(500)).await;
70
71        assert_eq!(42, x);
72    }
73
74    pub async fn test_is_empty<Q: 'static + TQueueLike<i32>>(mq: fn() -> Q) {
75        let queue = mq();
76        let is_empty = atomically(|| queue.is_empty()).await;
77
78        assert!(is_empty);
79    }
80
81    pub async fn test_non_empty<Q: 'static + TQueueLike<i32>>(mq: fn() -> Q) {
82        let queue = mq();
83        atomically(|| queue.write(42)).await;
84        let is_empty = atomically(|| queue.is_empty()).await;
85        assert!(!is_empty);
86    }
87}
88
89#[cfg(all(test, feature = "unstable"))]
90mod bench {
91    use super::TQueueLike;
92    use crate::atomically;
93    use std::time::Duration;
94
95    #[cfg(all(test, feature = "unstable"))]
96    use etest::Bencher;
97    // Benchmarks based on https://github.com/simonmar/parconc-examples/blob/master/chanbench.hs
98
99    fn new_bench_runtime() -> tokio::runtime::Runtime {
100        tokio::runtime::Builder::new_multi_thread()
101            .enable_time()
102            .build()
103            .unwrap()
104    }
105
106    /// Two threads, one reading from and one writing to the channel.
107    pub fn bench_two_threads_read_write<Q: TQueueLike<i32> + Sync + 'static>(
108        b: &mut Bencher,
109        mq: fn() -> Q,
110    ) {
111        let rt = new_bench_runtime();
112
113        b.iter(|| {
114            rt.block_on(async {
115                let n = 1000;
116                let queue1 = mq();
117                let queue2 = queue1.clone();
118
119                let sender = tokio::spawn(async move {
120                    for i in 1..n {
121                        atomically(|| queue1.write(i)).await;
122                    }
123                });
124
125                let receiver = tokio::spawn(async move {
126                    for i in 1..n {
127                        let r = atomically(|| queue2.read()).await;
128                        assert_eq!(i, r);
129                    }
130                });
131
132                tokio::time::timeout(Duration::from_secs(10), sender)
133                    .await
134                    .unwrap()
135                    .unwrap();
136
137                tokio::time::timeout(Duration::from_secs(10), receiver)
138                    .await
139                    .unwrap()
140                    .unwrap();
141            });
142        });
143    }
144
145    /// One thread, writing a large number of values then reading them.
146    pub fn bench_one_thread_write_many_then_read<Q: TQueueLike<i32>>(
147        b: &mut Bencher,
148        mq: fn() -> Q,
149    ) {
150        let rt = new_bench_runtime();
151
152        b.iter(|| {
153            rt.block_on(async {
154                let n = 1000;
155                let chan = mq();
156
157                for i in 1..n {
158                    atomically(|| chan.write(i)).await;
159                }
160                for i in 1..n {
161                    let r = atomically(|| chan.read()).await;
162                    assert_eq!(i, r);
163                }
164            });
165        });
166    }
167
168    // One thread, repeatedly writing and then reading a number of values.
169    pub fn bench_one_thread_repeat_write_read<Q: TQueueLike<i32>>(b: &mut Bencher, mq: fn() -> Q) {
170        let rt = new_bench_runtime();
171
172        b.iter(|| {
173            rt.block_on(async {
174                let n = 1000;
175                let m = 100;
176                let chan = mq();
177
178                for i in 1..(n / m) {
179                    for j in 1..m {
180                        atomically(|| chan.write(i * m + j)).await;
181                    }
182                    for j in 1..m {
183                        let r = atomically(|| chan.read()).await;
184                        assert_eq!(i * m + j, r);
185                    }
186                }
187            });
188        });
189    }
190}
191
192/// Reuse the same test definitions for each implementation of the [TQueueLike] trait
193/// by calling this macro with a function to create a new instance of the queue.
194///
195/// For example:
196/// ```text
197/// test_queue_mod!(|| { crate::queues::tchan::TChan::<i32>::new() });
198/// ```
199#[macro_export]
200macro_rules! test_queue_mod {
201    ($make:expr) => {
202        #[cfg(test)]
203        mod test_queue {
204            use $crate::queues::test::*;
205
206            #[tokio::test]
207            async fn write_and_read_back() {
208                test_write_and_read_back($make).await;
209            }
210
211            #[tokio::test]
212            async fn threaded() {
213                test_threaded($make).await;
214            }
215
216            #[tokio::test]
217            async fn is_empty() {
218                test_is_empty($make).await;
219            }
220
221            #[tokio::test]
222            async fn non_empty() {
223                test_non_empty($make).await;
224            }
225        }
226
227        #[cfg(all(test, feature = "unstable"))]
228        mod bench_queue {
229            use super::super::bench::*;
230            use etest::Bencher;
231
232            #[bench]
233            fn two_threads_read_write(b: &mut Bencher) {
234                bench_two_threads_read_write(b, $make);
235            }
236
237            #[bench]
238            fn one_thread_write_many_then_read(b: &mut Bencher) {
239                bench_one_thread_write_many_then_read(b, $make);
240            }
241
242            #[bench]
243            fn one_thread_repeat_write_read(b: &mut Bencher) {
244                bench_one_thread_repeat_write_read(b, $make);
245            }
246        }
247    };
248}