async_stm/queues/
tqueue.rs

1use super::TQueueLike;
2use crate::test_queue_mod;
3use crate::{retry, Stm, TVar};
4use std::any::Any;
5
6/// Unbounded queue using two vectors.
7///
8/// This implementation writes to one vector and reads from the other
9/// until the reads vector becomes empty and the two need to be swapped.
10/// Again reads don't block writes most of the time. It has an amortised
11/// cost of O(1).
12#[derive(Clone)]
13pub struct TQueue<T> {
14    read: TVar<Vec<T>>,
15    write: TVar<Vec<T>>,
16}
17
18impl<T> TQueue<T>
19where
20    T: Any + Sync + Send + Clone,
21{
22    /// Create an empty [TQueue].
23    pub fn new() -> TQueue<T> {
24        TQueue {
25            read: TVar::new(Vec::new()),
26            write: TVar::new(Vec::new()),
27        }
28    }
29}
30
31impl<T: Any + Send + Sync + Clone> Default for TQueue<T> {
32    fn default() -> Self {
33        Self::new()
34    }
35}
36
37impl<T> TQueueLike<T> for TQueue<T>
38where
39    T: Any + Sync + Send + Clone,
40{
41    fn write(&self, value: T) -> Stm<()> {
42        let mut v = self.write.read_clone()?;
43        v.push(value);
44        self.write.write(v)
45    }
46
47    fn read(&self) -> Stm<T> {
48        let mut rv = self.read.read_clone()?;
49        // Elements are stored in reverse order.
50        match rv.pop() {
51            Some(value) => {
52                self.read.write(rv)?;
53                Ok(value)
54            }
55            None => {
56                let mut wv = self.write.read_clone()?;
57                if wv.is_empty() {
58                    retry()
59                } else {
60                    wv.reverse();
61                    let value = wv.pop().unwrap();
62                    self.read.write(wv)?;
63                    self.write.write(Vec::new())?;
64                    Ok(value)
65                }
66            }
67        }
68    }
69
70    fn is_empty(&self) -> Stm<bool> {
71        if self.read.read()?.is_empty() {
72            Ok(self.write.read()?.is_empty())
73        } else {
74            Ok(false)
75        }
76    }
77}
78
79test_queue_mod!(|| { crate::queues::tqueue::TQueue::<i32>::new() });