1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
use super::TQueueLike;
use crate::test_queue_mod;
use crate::{guard, retry, StmResult, TVar};
use std::any::Any;

/// Bounded queue using two vectors.
///
/// Similar to `TQueue` but every read and write touches a common `TVar`
/// to track the current capacity, retrying if the queue is full.
#[derive(Clone)]
pub struct TBQueue<T> {
    capacity: TVar<u32>,
    read: TVar<Vec<T>>,
    write: TVar<Vec<T>>,
}

impl<T> TBQueue<T>
where
    T: Any + Sync + Send + Clone,
{
    /// Create an empty `TBQueue`.
    pub fn new(capacity: u32) -> TBQueue<T> {
        TBQueue {
            capacity: TVar::new(capacity),
            read: TVar::new(Vec::new()),
            write: TVar::new(Vec::new()),
        }
    }
}

impl<T> TQueueLike<T> for TBQueue<T>
where
    T: Any + Sync + Send + Clone,
{
    fn write(&self, value: T) -> StmResult<()> {
        let capacity = self.capacity.read()?;
        guard(*capacity > 0)?;
        self.capacity.write(*capacity - 1)?;

        // Same as TQueue.
        let mut v = self.write.read_clone()?;
        v.push(value);
        self.write.write(v)
    }

    fn read(&self) -> StmResult<T> {
        let capacity = self.capacity.read()?;
        self.capacity.write(*capacity + 1)?;

        // Same as TQueue.
        let mut rv = self.read.read_clone()?;
        // Elements are stored in reverse order.
        match rv.pop() {
            Some(value) => {
                self.read.write(rv)?; // XXX
                Ok(value)
            }
            None => {
                let mut wv = self.write.read_clone()?;
                if wv.is_empty() {
                    retry()
                } else {
                    wv.reverse();
                    let value = wv.pop().unwrap();
                    self.read.write(wv)?;
                    self.write.write(Vec::new())?;
                    Ok(value)
                }
            }
        }
    }

    fn is_empty(&self) -> StmResult<bool> {
        if self.read.read()?.is_empty() {
            Ok(self.write.read()?.is_empty())
        } else {
            Ok(false)
        }
    }
}

test_queue_mod!(|| { crate::queues::tbqueue::TBQueue::<i32>::new(1_000_000) });

#[cfg(test)]
mod test {
    use super::{TBQueue, TQueueLike};
    use crate::atomically;
    use std::time::Duration;
    use tokio::sync::mpsc;

    #[tokio::test]
    async fn threaded_bounded_blocks() {
        let queue = TBQueue::<i32>::new(1);

        let (sender, mut receiver) = mpsc::unbounded_channel();
        tokio::spawn(async move {
            atomically(|| {
                queue.write(1)?;
                queue.write(2)
            })
            .await;

            sender.send(()).unwrap();
        });

        let terminated = tokio::time::timeout(Duration::from_millis(100), receiver.recv())
            .await
            .is_ok();

        assert!(!terminated);
    }

    #[tokio::test]
    async fn threaded_bounded_unblocks() {
        let queue1 = TBQueue::<i32>::new(1);
        let queue2 = queue1.clone();

        let (sender, mut receiver) = mpsc::unbounded_channel();

        tokio::spawn(async move {
            // Don't try to write 2 items at the same time or both will be retried,
            // and the reader will retry because of an empty queue.
            atomically(|| queue2.write(1)).await;
            atomically(|| queue2.write(2)).await;
            sender.send(()).unwrap();
        });

        tokio::spawn(async move {
            tokio::time::sleep(Duration::from_millis(100)).await;
            atomically(|| queue1.read()).await;
        });

        let terminated = tokio::time::timeout(Duration::from_millis(500), receiver.recv())
            .await
            .is_ok();

        assert!(terminated);
    }
}