1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use super::TQueueLike;
use crate::test_queue_mod;
use crate::{retry, StmResult, TVar};
use std::any::Any;

/// A `TVar` that can be empty, or be a cons cell of an item and
/// the tail of the list, which is also a `TVarList`.
type TVarList<T> = TVar<TList<T>>;

/// A linked list of `TVar`s.
#[derive(Clone)]
enum TList<T> {
    TNil,
    TCons(T, TVarList<T>),
}

/// Ubounded queue using a linked list of `TVar`s.
///
/// This implementation builds up a linked list of `TVar`s with a
/// read and a write pointer. The good thing is that the reads don't
/// cause retries in writes, unlike if it was just a single `TVar`
/// with one data structure in it. It may also help that it's more
/// granular, and `Transaction::downcast` will not clone a full
/// data structure.
#[derive(Clone)]
pub struct TChan<T> {
    read: TVar<TVarList<T>>,
    write: TVar<TVarList<T>>,
}

impl<T> TChan<T>
where
    T: Any + Sync + Send + Clone,
{
    /// Create an empty `TChan`.
    ///
    /// Both read and write `TVar`s will be pointing at a common `TVar`
    /// containing an empty list.
    /// ```text
    ///    [TNil]
    ///   / \
    /// [*]  [*]
    /// read write
    /// ```
    pub fn new() -> TChan<T> {
        let hole = TVar::new(TList::TNil);
        TChan {
            read: TVar::new(hole.clone()),
            write: TVar::new(hole),
        }
    }

    fn is_empty_list(tvl: &TVar<TVarList<T>>) -> StmResult<bool> {
        let list_var = tvl.read()?;
        let list = list_var.read()?;
        match list.as_ref() {
            TList::TNil => Ok(true),
            _ => Ok(false),
        }
    }
}

impl<T: Any + Send + Sync + Clone> Default for TChan<T> {
    fn default() -> Self {
        Self::new()
    }
}

impl<T> TQueueLike<T> for TChan<T>
where
    T: Any + Sync + Send + Clone,
{
    /// Pop the head of the queue, or retry until there is an element if it's empty.
    ///
    /// Moves the read `TVar` down the list to point at the next item.
    /// ```text
    ///  [TCons(x, [TCons(y, [TNil])])]
    ///  |         |         |
    /// [ ]       [*]       [*]
    /// read0 ->  read1     write
    /// ```
    fn read(&self) -> StmResult<T> {
        let var_list = self.read.read()?;
        let list = var_list.read_clone()?;
        match list {
            TList::TNil => retry(),
            TList::TCons(value, tail) => {
                self.read.write(tail)?;
                Ok(value)
            }
        }
    }

    /// Push to the end of the queue.
    ///
    /// Replaces the contents of the current write `TVar` with a `TCons` and points
    /// the write `TVar` at a new `TNil`.
    /// ```text
    ///  [TCons(x, [TCons(y, [TNil])])]
    ///  |         |         |
    /// [*]       [ ]       [*]
    /// read      write0 -> write1
    /// ```
    fn write(&self, value: T) -> StmResult<()> {
        let new_list_end = TVar::new(TList::TNil);
        let var_list = self.write.read()?;
        var_list.write(TList::TCons(value, new_list_end.clone()))?;
        self.write.write(new_list_end)?;
        Ok(())
    }

    fn is_empty(&self) -> StmResult<bool> {
        if TChan::<T>::is_empty_list(&self.read)? {
            TChan::<T>::is_empty_list(&self.write)
        } else {
            Ok(false)
        }
    }
}

test_queue_mod!(|| { crate::queues::tchan::TChan::<i32>::new() });