async_stm/queues/
tbqueue.rs1use super::TQueueLike;
2use crate::test_queue_mod;
3use crate::{guard, retry, Stm, TVar};
4use std::any::Any;
5
6#[derive(Clone)]
11pub struct TBQueue<T> {
12 capacity: TVar<u32>,
13 read: TVar<Vec<T>>,
14 write: TVar<Vec<T>>,
15}
16
17impl<T> TBQueue<T>
18where
19 T: Any + Sync + Send + Clone,
20{
21 pub fn new(capacity: u32) -> TBQueue<T> {
23 TBQueue {
24 capacity: TVar::new(capacity),
25 read: TVar::new(Vec::new()),
26 write: TVar::new(Vec::new()),
27 }
28 }
29}
30
31impl<T> TQueueLike<T> for TBQueue<T>
32where
33 T: Any + Sync + Send + Clone,
34{
35 fn write(&self, value: T) -> Stm<()> {
36 let capacity = self.capacity.read()?;
37 guard(*capacity > 0)?;
38 self.capacity.write(*capacity - 1)?;
39
40 let mut v = self.write.read_clone()?;
42 v.push(value);
43 self.write.write(v)
44 }
45
46 fn read(&self) -> Stm<T> {
47 let capacity = self.capacity.read()?;
48 self.capacity.write(*capacity + 1)?;
49
50 let mut rv = self.read.read_clone()?;
52 match rv.pop() {
54 Some(value) => {
55 self.read.write(rv)?; Ok(value)
57 }
58 None => {
59 let mut wv = self.write.read_clone()?;
60 if wv.is_empty() {
61 retry()
62 } else {
63 wv.reverse();
64 let value = wv.pop().unwrap();
65 self.read.write(wv)?;
66 self.write.write(Vec::new())?;
67 Ok(value)
68 }
69 }
70 }
71 }
72
73 fn is_empty(&self) -> Stm<bool> {
74 if self.read.read()?.is_empty() {
75 Ok(self.write.read()?.is_empty())
76 } else {
77 Ok(false)
78 }
79 }
80}
81
82test_queue_mod!(|| { crate::queues::tbqueue::TBQueue::<i32>::new(1_000_000) });
83
84#[cfg(test)]
85mod test {
86 use super::{TBQueue, TQueueLike};
87 use crate::atomically;
88 use std::time::Duration;
89 use tokio::sync::mpsc;
90
91 #[tokio::test]
92 async fn threaded_bounded_blocks() {
93 let queue = TBQueue::<i32>::new(1);
94
95 let (sender, mut receiver) = mpsc::unbounded_channel();
96 tokio::spawn(async move {
97 atomically(|| {
98 queue.write(1)?;
99 queue.write(2)
100 })
101 .await;
102
103 sender.send(()).unwrap();
104 });
105
106 let terminated = tokio::time::timeout(Duration::from_millis(100), receiver.recv())
107 .await
108 .is_ok();
109
110 assert!(!terminated);
111 }
112
113 #[tokio::test]
114 async fn threaded_bounded_unblocks() {
115 let queue1 = TBQueue::<i32>::new(1);
116 let queue2 = queue1.clone();
117
118 let (sender, mut receiver) = mpsc::unbounded_channel();
119
120 tokio::spawn(async move {
121 atomically(|| queue2.write(1)).await;
124 atomically(|| queue2.write(2)).await;
125 sender.send(()).unwrap();
126 });
127
128 tokio::spawn(async move {
129 tokio::time::sleep(Duration::from_millis(100)).await;
130 atomically(|| queue1.read()).await;
131 });
132
133 let terminated = tokio::time::timeout(Duration::from_millis(500), receiver.recv())
134 .await
135 .is_ok();
136
137 assert!(terminated);
138 }
139}