1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
use core::{cell::UnsafeCell, fmt::Debug, sync::atomic};
/// The possible States of a Node
#[derive(Debug, PartialEq, Eq)]
pub enum NodeState {
/// The Node is either empty or currently being written to
Empty,
/// The Node contains a value
Set,
/// The Node's value has already been handled by the consumer
Handled,
}
impl NodeState {
/// Converts the State to its u8 representation for Storage
pub const fn to_u8(&self) -> u8 {
match self {
Self::Empty => 0,
Self::Set => 1,
Self::Handled => 2,
}
}
/// Decodes the stored Value into an actual State
pub const fn from_u8(val: u8) -> Option<Self> {
match val {
0 => Some(Self::Empty),
1 => Some(Self::Set),
2 => Some(Self::Handled),
_ => None,
}
}
}
/// A single Entry in the Queue
pub struct Node<T> {
/// The actual Datat itself that is stored in the Node
data: UnsafeCell<Option<T>>,
/// This holds one of three Values indicating the "State" of
/// the Value
is_set: atomic::AtomicU8,
}
impl<T> Node<T> {
pub fn new() -> Self {
Self {
data: UnsafeCell::new(None),
is_set: atomic::AtomicU8::new(NodeState::Empty.to_u8()),
}
}
/// Atomically loads the current `is_set` State and decodes it
/// as a NodeState enum
pub fn get_state(&self) -> NodeState {
let raw = self.is_set.load(atomic::Ordering::Acquire);
NodeState::from_u8(raw).unwrap()
}
/// Stores the given Data into the Node updating its Data-Field
/// as well as its `is_set` State to `NodeState::Set`
pub fn store(&self, data: T) {
// # Safety:
// This is safe because every Cell is only ever written to by a single
// Producer and will not be read by the Consumer until the State of the
// Node is changed to Set.
// This means that we have exclusive access to the current Data in the
// Node and therefore it is safe to mutate it directly without other
// forms of synchronization
let raw_ptr = self.data.get();
let mut_data = unsafe { &mut *raw_ptr };
mut_data.replace(data);
// Update the State of the Node to indicate to the Consumer that this
// node is now ready to be read/consumed
self.is_set
.store(NodeState::Set.to_u8(), atomic::Ordering::Release);
}
/// Attempts to load the Data from the Node itself, this can only be
/// done once, and automatically sets the node to being handled
pub fn load(&self) -> Option<T> {
if self.get_state() != NodeState::Set {
return None;
}
// # Safety:
// This is save to do, because the value is only ever consumed by a
// single Consumer and after the value has been set, no more procuder
// will touch this entire Node again.
let raw_ptr = self.data.get();
let mut_data = unsafe { &mut *raw_ptr };
// We can safely unwrap this value as well, because this function
// is only ever called once and before it is called, the consumer will
// check that the Node is marked as Set. After this Node was visited it
// will never again be visited and therefore this wont be called again
// with a now empty data entry.
let data = mut_data.take().unwrap();
self.is_set
.store(NodeState::Handled.to_u8(), atomic::Ordering::Release);
Some(data)
}
}
impl<T> Default for Node<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> Debug for Node<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(
f,
"Node ( is_set = {} )",
self.is_set.load(atomic::Ordering::SeqCst)
)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn node_store_load() {
let node: Node<u64> = Default::default();
node.store(15);
assert_eq!(Some(15), node.load());
}
#[test]
fn node_store_load_multiple() {
let node: Node<u64> = Default::default();
node.store(15);
assert_eq!(Some(15), node.load());
assert_eq!(None, node.load());
}
#[test]
fn node_state_store_state() {
let node: Node<u64> = Default::default();
assert_eq!(NodeState::Empty, node.get_state());
node.store(13);
assert_eq!(NodeState::Set, node.get_state());
}
}