async_stm/queues/
tvecdequeue.rs

1use super::TQueueLike;
2use crate::test_queue_mod;
3use crate::{retry, Stm, TVar};
4use std::{any::Any, collections::VecDeque};
5
6#[derive(Clone)]
7/// Unbounded queue backed by a single `VecDequeue`.
8///
9/// The drawback is that reads and writes both touch the same [TVar].
10pub struct TVecDequeue<T> {
11    queue: TVar<VecDeque<T>>,
12}
13
14impl<T> TVecDequeue<T>
15where
16    T: Any + Sync + Send + Clone,
17{
18    /// Create an empty [TVecDequeue].
19    #[allow(dead_code)]
20    pub fn new() -> TVecDequeue<T> {
21        TVecDequeue {
22            queue: TVar::new(VecDeque::new()),
23        }
24    }
25}
26
27impl<T: Any + Send + Sync + Clone> Default for TVecDequeue<T> {
28    fn default() -> Self {
29        Self::new()
30    }
31}
32
33impl<T> TQueueLike<T> for TVecDequeue<T>
34where
35    T: Any + Sync + Send + Clone,
36{
37    fn write(&self, value: T) -> Stm<()> {
38        let mut queue = self.queue.read_clone()?;
39        queue.push_back(value);
40        self.queue.write(queue)
41    }
42
43    fn read(&self) -> Stm<T> {
44        let mut queue = self.queue.read_clone()?;
45        match queue.pop_front() {
46            None => retry(),
47            Some(value) => {
48                self.queue.write(queue)?;
49                Ok(value)
50            }
51        }
52    }
53
54    fn is_empty(&self) -> Stm<bool> {
55        self.queue.read().map(|v| v.is_empty())
56    }
57}
58
59test_queue_mod!(|| { crate::queues::tvecdequeue::TVecDequeue::<i32>::new() });