async_stm/queues/
tbqueue.rs

1use super::TQueueLike;
2use crate::test_queue_mod;
3use crate::{guard, retry, Stm, TVar};
4use std::any::Any;
5
6/// Bounded queue using two vectors.
7///
8/// Similar to [TQueue] but every read and write touches a common [TVar]
9/// to track the current capacity, retrying if the queue is full.
10#[derive(Clone)]
11pub struct TBQueue<T> {
12    capacity: TVar<u32>,
13    read: TVar<Vec<T>>,
14    write: TVar<Vec<T>>,
15}
16
17impl<T> TBQueue<T>
18where
19    T: Any + Sync + Send + Clone,
20{
21    /// Create an empty [TBQueue].
22    pub fn new(capacity: u32) -> TBQueue<T> {
23        TBQueue {
24            capacity: TVar::new(capacity),
25            read: TVar::new(Vec::new()),
26            write: TVar::new(Vec::new()),
27        }
28    }
29}
30
31impl<T> TQueueLike<T> for TBQueue<T>
32where
33    T: Any + Sync + Send + Clone,
34{
35    fn write(&self, value: T) -> Stm<()> {
36        let capacity = self.capacity.read()?;
37        guard(*capacity > 0)?;
38        self.capacity.write(*capacity - 1)?;
39
40        // Same as TQueue.
41        let mut v = self.write.read_clone()?;
42        v.push(value);
43        self.write.write(v)
44    }
45
46    fn read(&self) -> Stm<T> {
47        let capacity = self.capacity.read()?;
48        self.capacity.write(*capacity + 1)?;
49
50        // Same as TQueue.
51        let mut rv = self.read.read_clone()?;
52        // Elements are stored in reverse order.
53        match rv.pop() {
54            Some(value) => {
55                self.read.write(rv)?; // XXX
56                Ok(value)
57            }
58            None => {
59                let mut wv = self.write.read_clone()?;
60                if wv.is_empty() {
61                    retry()
62                } else {
63                    wv.reverse();
64                    let value = wv.pop().unwrap();
65                    self.read.write(wv)?;
66                    self.write.write(Vec::new())?;
67                    Ok(value)
68                }
69            }
70        }
71    }
72
73    fn is_empty(&self) -> Stm<bool> {
74        if self.read.read()?.is_empty() {
75            Ok(self.write.read()?.is_empty())
76        } else {
77            Ok(false)
78        }
79    }
80}
81
82test_queue_mod!(|| { crate::queues::tbqueue::TBQueue::<i32>::new(1_000_000) });
83
84#[cfg(test)]
85mod test {
86    use super::{TBQueue, TQueueLike};
87    use crate::atomically;
88    use std::time::Duration;
89    use tokio::sync::mpsc;
90
91    #[tokio::test]
92    async fn threaded_bounded_blocks() {
93        let queue = TBQueue::<i32>::new(1);
94
95        let (sender, mut receiver) = mpsc::unbounded_channel();
96        tokio::spawn(async move {
97            atomically(|| {
98                queue.write(1)?;
99                queue.write(2)
100            })
101            .await;
102
103            sender.send(()).unwrap();
104        });
105
106        let terminated = tokio::time::timeout(Duration::from_millis(100), receiver.recv())
107            .await
108            .is_ok();
109
110        assert!(!terminated);
111    }
112
113    #[tokio::test]
114    async fn threaded_bounded_unblocks() {
115        let queue1 = TBQueue::<i32>::new(1);
116        let queue2 = queue1.clone();
117
118        let (sender, mut receiver) = mpsc::unbounded_channel();
119
120        tokio::spawn(async move {
121            // Don't try to write 2 items at the same time or both will be retried,
122            // and the reader will retry because of an empty queue.
123            atomically(|| queue2.write(1)).await;
124            atomically(|| queue2.write(2)).await;
125            sender.send(()).unwrap();
126        });
127
128        tokio::spawn(async move {
129            tokio::time::sleep(Duration::from_millis(100)).await;
130            atomically(|| queue1.read()).await;
131        });
132
133        let terminated = tokio::time::timeout(Duration::from_millis(500), receiver.recv())
134            .await
135            .is_ok();
136
137        assert!(terminated);
138    }
139}