async_stm/queues/
tchan.rs

1use super::TQueueLike;
2use crate::test_queue_mod;
3use crate::{retry, Stm, TVar};
4use std::any::Any;
5
6/// A [TVar] that can be empty, or be a cons cell of an item and
7/// the tail of the list, which is also a [TVarList].
8type TVarList<T> = TVar<TList<T>>;
9
10/// A linked list of [TVar]s.
11#[derive(Clone)]
12enum TList<T> {
13    TNil,
14    TCons(T, TVarList<T>),
15}
16
17/// Ubounded queue using a linked list of [TVar]s.
18///
19/// This implementation builds up a linked list of [TVar]s with a
20/// read and a write pointer. The good thing is that the reads don't
21/// cause retries in writes, unlike if it was just a single [TVar]
22/// with one data structure in it. It may also help that it's more
23/// granular, and `Transaction::downcast` will not clone a full
24/// data structure.
25#[derive(Clone)]
26pub struct TChan<T> {
27    read: TVar<TVarList<T>>,
28    write: TVar<TVarList<T>>,
29}
30
31impl<T> TChan<T>
32where
33    T: Any + Sync + Send + Clone,
34{
35    /// Create an empty [TChan].
36    ///
37    /// Both read and write [TVar]s will be pointing at a common [TVar]
38    /// containing an empty list.
39    /// ```text
40    ///    [TNil]
41    ///   / \
42    /// [*]  [*]
43    /// read write
44    /// ```
45    pub fn new() -> TChan<T> {
46        let hole = TVar::new(TList::TNil);
47        TChan {
48            read: TVar::new(hole.clone()),
49            write: TVar::new(hole),
50        }
51    }
52
53    fn is_empty_list(tvl: &TVar<TVarList<T>>) -> Stm<bool> {
54        let list_var = tvl.read()?;
55        let list = list_var.read()?;
56        match list.as_ref() {
57            TList::TNil => Ok(true),
58            _ => Ok(false),
59        }
60    }
61}
62
63impl<T: Any + Send + Sync + Clone> Default for TChan<T> {
64    fn default() -> Self {
65        Self::new()
66    }
67}
68
69impl<T> TQueueLike<T> for TChan<T>
70where
71    T: Any + Sync + Send + Clone,
72{
73    /// Pop the head of the queue, or retry until there is an element if it's empty.
74    ///
75    /// Moves the read [TVar] down the list to point at the next item.
76    /// ```text
77    ///  [TCons(x, [TCons(y, [TNil])])]
78    ///  |         |         |
79    /// [ ]       [*]       [*]
80    /// read0 ->  read1     write
81    /// ```
82    fn read(&self) -> Stm<T> {
83        let var_list = self.read.read()?;
84        let list = var_list.read_clone()?;
85        match list {
86            TList::TNil => retry(),
87            TList::TCons(value, tail) => {
88                self.read.write(tail)?;
89                Ok(value)
90            }
91        }
92    }
93
94    /// Push to the end of the queue.
95    ///
96    /// Replaces the contents of the current write [TVar] with a [TList::TCons] and points
97    /// the write [TVar] at a new [TList::TNil].
98    /// ```text
99    ///  [TCons(x, [TCons(y, [TNil])])]
100    ///  |         |         |
101    /// [*]       [ ]       [*]
102    /// read      write0 -> write1
103    /// ```
104    fn write(&self, value: T) -> Stm<()> {
105        let new_list_end = TVar::new(TList::TNil);
106        let var_list = self.write.read()?;
107        var_list.write(TList::TCons(value, new_list_end.clone()))?;
108        self.write.write(new_list_end)?;
109        Ok(())
110    }
111
112    fn is_empty(&self) -> Stm<bool> {
113        if TChan::<T>::is_empty_list(&self.read)? {
114            TChan::<T>::is_empty_list(&self.write)
115        } else {
116            Ok(false)
117        }
118    }
119}
120
121test_queue_mod!(|| { crate::queues::tchan::TChan::<i32>::new() });