async_stm/queues/tchan.rs
1use super::TQueueLike;
2use crate::test_queue_mod;
3use crate::{retry, Stm, TVar};
4use std::any::Any;
5
6/// A [TVar] that can be empty, or be a cons cell of an item and
7/// the tail of the list, which is also a [TVarList].
8type TVarList<T> = TVar<TList<T>>;
9
10/// A linked list of [TVar]s.
11#[derive(Clone)]
12enum TList<T> {
13 TNil,
14 TCons(T, TVarList<T>),
15}
16
17/// Ubounded queue using a linked list of [TVar]s.
18///
19/// This implementation builds up a linked list of [TVar]s with a
20/// read and a write pointer. The good thing is that the reads don't
21/// cause retries in writes, unlike if it was just a single [TVar]
22/// with one data structure in it. It may also help that it's more
23/// granular, and `Transaction::downcast` will not clone a full
24/// data structure.
25#[derive(Clone)]
26pub struct TChan<T> {
27 read: TVar<TVarList<T>>,
28 write: TVar<TVarList<T>>,
29}
30
31impl<T> TChan<T>
32where
33 T: Any + Sync + Send + Clone,
34{
35 /// Create an empty [TChan].
36 ///
37 /// Both read and write [TVar]s will be pointing at a common [TVar]
38 /// containing an empty list.
39 /// ```text
40 /// [TNil]
41 /// / \
42 /// [*] [*]
43 /// read write
44 /// ```
45 pub fn new() -> TChan<T> {
46 let hole = TVar::new(TList::TNil);
47 TChan {
48 read: TVar::new(hole.clone()),
49 write: TVar::new(hole),
50 }
51 }
52
53 fn is_empty_list(tvl: &TVar<TVarList<T>>) -> Stm<bool> {
54 let list_var = tvl.read()?;
55 let list = list_var.read()?;
56 match list.as_ref() {
57 TList::TNil => Ok(true),
58 _ => Ok(false),
59 }
60 }
61}
62
63impl<T: Any + Send + Sync + Clone> Default for TChan<T> {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69impl<T> TQueueLike<T> for TChan<T>
70where
71 T: Any + Sync + Send + Clone,
72{
73 /// Pop the head of the queue, or retry until there is an element if it's empty.
74 ///
75 /// Moves the read [TVar] down the list to point at the next item.
76 /// ```text
77 /// [TCons(x, [TCons(y, [TNil])])]
78 /// | | |
79 /// [ ] [*] [*]
80 /// read0 -> read1 write
81 /// ```
82 fn read(&self) -> Stm<T> {
83 let var_list = self.read.read()?;
84 let list = var_list.read_clone()?;
85 match list {
86 TList::TNil => retry(),
87 TList::TCons(value, tail) => {
88 self.read.write(tail)?;
89 Ok(value)
90 }
91 }
92 }
93
94 /// Push to the end of the queue.
95 ///
96 /// Replaces the contents of the current write [TVar] with a [TList::TCons] and points
97 /// the write [TVar] at a new [TList::TNil].
98 /// ```text
99 /// [TCons(x, [TCons(y, [TNil])])]
100 /// | | |
101 /// [*] [ ] [*]
102 /// read write0 -> write1
103 /// ```
104 fn write(&self, value: T) -> Stm<()> {
105 let new_list_end = TVar::new(TList::TNil);
106 let var_list = self.write.read()?;
107 var_list.write(TList::TCons(value, new_list_end.clone()))?;
108 self.write.write(new_list_end)?;
109 Ok(())
110 }
111
112 fn is_empty(&self) -> Stm<bool> {
113 if TChan::<T>::is_empty_list(&self.read)? {
114 TChan::<T>::is_empty_list(&self.write)
115 } else {
116 Ok(false)
117 }
118 }
119}
120
121test_queue_mod!(|| { crate::queues::tchan::TChan::<i32>::new() });