bbqueue/traits/coordination/cas.rs
1//! Lock-free coordination based on Compare and Swap atomics
2
3use const_init::ConstInit;
4
5use super::{Coord, ReadGrantError, WriteGrantError};
6use core::{
7 cmp::min,
8 sync::atomic::{AtomicBool, AtomicUsize, Ordering},
9};
10
11/// Coordination using CAS atomics
12pub struct AtomicCoord {
13 /// Where the next byte will be written
14 write: AtomicUsize,
15
16 /// Where the next byte will be read from
17 read: AtomicUsize,
18
19 /// Used in the inverted case to mark the end of the
20 /// readable streak. Otherwise will == sizeof::<self.buf>().
21 /// Writer is responsible for placing this at the correct
22 /// place when entering an inverted condition, and Reader
23 /// is responsible for moving it back to sizeof::<self.buf>()
24 /// when exiting the inverted condition
25 last: AtomicUsize,
26
27 /// Used by the Writer to remember what bytes are currently
28 /// allowed to be written to, but are not yet ready to be
29 /// read from
30 reserve: AtomicUsize,
31
32 /// Is there an active read grant?
33 read_in_progress: AtomicBool,
34
35 /// Is there an active write grant?
36 write_in_progress: AtomicBool,
37}
38
39impl AtomicCoord {
40 /// Create a new coordination structure that uses atomic CAS operations
41 pub const fn new() -> Self {
42 Self {
43 write: AtomicUsize::new(0),
44 read: AtomicUsize::new(0),
45 last: AtomicUsize::new(0),
46 reserve: AtomicUsize::new(0),
47 read_in_progress: AtomicBool::new(false),
48 write_in_progress: AtomicBool::new(false),
49 }
50 }
51}
52
53impl Default for AtomicCoord {
54 fn default() -> Self {
55 Self::new()
56 }
57}
58
59impl ConstInit for AtomicCoord {
60 #[allow(clippy::declare_interior_mutable_const)]
61 const INIT: Self = Self::new();
62}
63
64unsafe impl Coord for AtomicCoord {
65 fn reset(&self) {
66 // Re-initialize the buffer (not totally needed, but nice to do)
67 self.write.store(0, Ordering::Release);
68 self.read.store(0, Ordering::Release);
69 self.reserve.store(0, Ordering::Release);
70 self.last.store(0, Ordering::Release);
71 }
72
73 fn grant_max_remaining(
74 &self,
75 capacity: usize,
76 mut sz: usize,
77 ) -> Result<(usize, usize), WriteGrantError> {
78 if self.write_in_progress.swap(true, Ordering::AcqRel) {
79 return Err(WriteGrantError::GrantInProgress);
80 }
81
82 // Writer component. Must never write to `read`,
83 // be careful writing to `load`
84 let write = self.write.load(Ordering::Acquire);
85 let read = self.read.load(Ordering::Acquire);
86 let max = capacity;
87
88 let already_inverted = write < read;
89
90 let start = if already_inverted {
91 // In inverted case, read is always > write
92 let remain = read - write - 1;
93
94 if remain != 0 {
95 sz = min(remain, sz);
96 write
97 } else {
98 // Inverted, no room is available
99 self.write_in_progress.store(false, Ordering::Release);
100 return Err(WriteGrantError::InsufficientSize);
101 }
102 } else {
103 #[allow(clippy::collapsible_if)]
104 if write != max {
105 // Some (or all) room remaining in un-inverted case
106 sz = min(max - write, sz);
107 write
108 } else {
109 // Not inverted, but need to go inverted
110
111 // NOTE: We check read > 1, NOT read >= 1, because
112 // write must never == read in an inverted condition, since
113 // we will then not be able to tell if we are inverted or not
114 if read > 1 {
115 sz = min(read - 1, sz);
116 0
117 } else {
118 // Not invertible, no space
119 self.write_in_progress.store(false, Ordering::Release);
120 return Err(WriteGrantError::InsufficientSize);
121 }
122 }
123 };
124
125 // Safe write, only viewed by this task
126 self.reserve.store(start + sz, Ordering::Release);
127
128 Ok((start, sz))
129 }
130
131 fn grant_exact(&self, capacity: usize, sz: usize) -> Result<usize, WriteGrantError> {
132 if self.write_in_progress.swap(true, Ordering::AcqRel) {
133 return Err(WriteGrantError::GrantInProgress);
134 }
135
136 // Writer component. Must never write to `read`,
137 // be careful writing to `load`
138 let write = self.write.load(Ordering::Acquire);
139 let read = self.read.load(Ordering::Acquire);
140 let max = capacity;
141 let already_inverted = write < read;
142
143 let start = if already_inverted {
144 if (write + sz) < read {
145 // Inverted, room is still available
146 write
147 } else {
148 // Inverted, no room is available
149 self.write_in_progress.store(false, Ordering::Release);
150 return Err(WriteGrantError::InsufficientSize);
151 }
152 } else {
153 #[allow(clippy::collapsible_if)]
154 if write + sz <= max {
155 // Non inverted condition
156 write
157 } else {
158 // Not inverted, but need to go inverted
159
160 // NOTE: We check sz < read, NOT <=, because
161 // write must never == read in an inverted condition, since
162 // we will then not be able to tell if we are inverted or not
163 if sz < read {
164 // Invertible situation
165 0
166 } else {
167 // Not invertible, no space
168 self.write_in_progress.store(false, Ordering::Release);
169 return Err(WriteGrantError::InsufficientSize);
170 }
171 }
172 };
173
174 // Safe write, only viewed by this task
175 self.reserve.store(start + sz, Ordering::Release);
176
177 Ok(start)
178 }
179
180 fn read(&self) -> Result<(usize, usize), ReadGrantError> {
181 if self.read_in_progress.swap(true, Ordering::AcqRel) {
182 return Err(ReadGrantError::GrantInProgress);
183 }
184
185 let write = self.write.load(Ordering::Acquire);
186 let last = self.last.load(Ordering::Acquire);
187 let mut read = self.read.load(Ordering::Acquire);
188
189 // Resolve the inverted case or end of read
190 if (read == last) && (write < read) {
191 read = 0;
192 // This has some room for error, the other thread reads this
193 // Impact to Grant:
194 // Grant checks if read < write to see if inverted. If not inverted, but
195 // no space left, Grant will initiate an inversion, but will not trigger it
196 // Impact to Commit:
197 // Commit does not check read, but if Grant has started an inversion,
198 // grant could move Last to the prior write position
199 // MOVING READ BACKWARDS!
200 self.read.store(0, Ordering::Release);
201 }
202
203 let sz = if write < read {
204 // Inverted, only believe last
205 last
206 } else {
207 // Not inverted, only believe write
208 write
209 } - read;
210
211 if sz == 0 {
212 self.read_in_progress.store(false, Ordering::Release);
213 return Err(ReadGrantError::Empty);
214 }
215
216 Ok((read, sz))
217 }
218
219 fn commit_inner(&self, capacity: usize, grant_len: usize, used: usize) {
220 // If there is no grant in progress, return early. This
221 // generally means we are dropping the grant within a
222 // wrapper structure
223 if !self.write_in_progress.load(Ordering::Acquire) {
224 return;
225 }
226
227 // Writer component. Must never write to READ,
228 // be careful writing to LAST
229
230 // Saturate the grant commit
231 let len = grant_len;
232 let used = min(len, used);
233
234 let write = self.write.load(Ordering::Acquire);
235 self.reserve.fetch_sub(len - used, Ordering::AcqRel);
236
237 let max = capacity;
238 let last = self.last.load(Ordering::Acquire);
239 let new_write = self.reserve.load(Ordering::Acquire);
240
241 if (new_write < write) && (write != max) {
242 // We have already wrapped, but we are skipping some bytes at the end of the ring.
243 // Mark `last` where the write pointer used to be to hold the line here
244 self.last.store(write, Ordering::Release);
245 } else if new_write > last {
246 // We're about to pass the last pointer, which was previously the artificial
247 // end of the ring. Now that we've passed it, we can "unlock" the section
248 // that was previously skipped.
249 //
250 // Since new_write is strictly larger than last, it is safe to move this as
251 // the other thread will still be halted by the (about to be updated) write
252 // value
253 self.last.store(max, Ordering::Release);
254 }
255 // else: If new_write == last, either:
256 // * last == max, so no need to write, OR
257 // * If we write in the end chunk again, we'll update last to max next time
258 // * If we write to the start chunk in a wrap, we'll update last when we
259 // move write backwards
260
261 // Write must be updated AFTER last, otherwise read could think it was
262 // time to invert early!
263 self.write.store(new_write, Ordering::Release);
264
265 // Allow subsequent grants
266 self.write_in_progress.store(false, Ordering::Release);
267 }
268
269 fn release_inner(&self, used: usize) {
270 // If there is no grant in progress, return early. This
271 // generally means we are dropping the grant within a
272 // wrapper structure
273 if !self.read_in_progress.load(Ordering::Acquire) {
274 return;
275 }
276
277 // // This should always be checked by the public interfaces
278 // debug_assert!(used <= self.buf.len());
279
280 // This should be fine, purely incrementing
281 let _ = self.read.fetch_add(used, Ordering::Release);
282
283 self.read_in_progress.store(false, Ordering::Release);
284 }
285}