async_stm/queues/
tqueue.rs1use super::TQueueLike;
2use crate::test_queue_mod;
3use crate::{retry, Stm, TVar};
4use std::any::Any;
5
6#[derive(Clone)]
13pub struct TQueue<T> {
14 read: TVar<Vec<T>>,
15 write: TVar<Vec<T>>,
16}
17
18impl<T> TQueue<T>
19where
20 T: Any + Sync + Send + Clone,
21{
22 pub fn new() -> TQueue<T> {
24 TQueue {
25 read: TVar::new(Vec::new()),
26 write: TVar::new(Vec::new()),
27 }
28 }
29}
30
31impl<T: Any + Send + Sync + Clone> Default for TQueue<T> {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37impl<T> TQueueLike<T> for TQueue<T>
38where
39 T: Any + Sync + Send + Clone,
40{
41 fn write(&self, value: T) -> Stm<()> {
42 let mut v = self.write.read_clone()?;
43 v.push(value);
44 self.write.write(v)
45 }
46
47 fn read(&self) -> Stm<T> {
48 let mut rv = self.read.read_clone()?;
49 match rv.pop() {
51 Some(value) => {
52 self.read.write(rv)?;
53 Ok(value)
54 }
55 None => {
56 let mut wv = self.write.read_clone()?;
57 if wv.is_empty() {
58 retry()
59 } else {
60 wv.reverse();
61 let value = wv.pop().unwrap();
62 self.read.write(wv)?;
63 self.write.write(Vec::new())?;
64 Ok(value)
65 }
66 }
67 }
68 }
69
70 fn is_empty(&self) -> Stm<bool> {
71 if self.read.read()?.is_empty() {
72 Ok(self.write.read()?.is_empty())
73 } else {
74 Ok(false)
75 }
76 }
77}
78
79test_queue_mod!(|| { crate::queues::tqueue::TQueue::<i32>::new() });