Skip to main content

bbqueue/traits/coordination/
cas.rs

1//! Lock-free coordination based on Compare and Swap atomics
2
3use const_init::ConstInit;
4
5use super::{Coord, ReadGrantError, WriteGrantError};
6use core::{
7    cmp::min,
8    sync::atomic::{AtomicBool, AtomicUsize, Ordering},
9};
10
11/// Coordination using CAS atomics
12pub struct AtomicCoord {
13    /// Where the next byte will be written
14    write: AtomicUsize,
15
16    /// Where the next byte will be read from
17    read: AtomicUsize,
18
19    /// Used in the inverted case to mark the end of the
20    /// readable streak. Otherwise will == sizeof::<self.buf>().
21    /// Writer is responsible for placing this at the correct
22    /// place when entering an inverted condition, and Reader
23    /// is responsible for moving it back to sizeof::<self.buf>()
24    /// when exiting the inverted condition
25    last: AtomicUsize,
26
27    /// Used by the Writer to remember what bytes are currently
28    /// allowed to be written to, but are not yet ready to be
29    /// read from
30    reserve: AtomicUsize,
31
32    /// Is there an active read grant?
33    read_in_progress: AtomicBool,
34
35    /// Is there an active write grant?
36    write_in_progress: AtomicBool,
37}
38
39impl AtomicCoord {
40    /// Create a new coordination structure that uses atomic CAS operations
41    pub const fn new() -> Self {
42        Self {
43            write: AtomicUsize::new(0),
44            read: AtomicUsize::new(0),
45            last: AtomicUsize::new(0),
46            reserve: AtomicUsize::new(0),
47            read_in_progress: AtomicBool::new(false),
48            write_in_progress: AtomicBool::new(false),
49        }
50    }
51}
52
53impl Default for AtomicCoord {
54    fn default() -> Self {
55        Self::new()
56    }
57}
58
59impl ConstInit for AtomicCoord {
60    #[allow(clippy::declare_interior_mutable_const)]
61    const INIT: Self = Self::new();
62}
63
64unsafe impl Coord for AtomicCoord {
65    fn reset(&self) {
66        // Re-initialize the buffer (not totally needed, but nice to do)
67        self.write.store(0, Ordering::Release);
68        self.read.store(0, Ordering::Release);
69        self.reserve.store(0, Ordering::Release);
70        self.last.store(0, Ordering::Release);
71    }
72
73    fn grant_max_remaining(
74        &self,
75        capacity: usize,
76        mut sz: usize,
77    ) -> Result<(usize, usize), WriteGrantError> {
78        if self.write_in_progress.swap(true, Ordering::AcqRel) {
79            return Err(WriteGrantError::GrantInProgress);
80        }
81
82        // Writer component. Must never write to `read`,
83        // be careful writing to `load`
84        let write = self.write.load(Ordering::Acquire);
85        let read = self.read.load(Ordering::Acquire);
86        let max = capacity;
87
88        let already_inverted = write < read;
89
90        let start = if already_inverted {
91            // In inverted case, read is always > write
92            let remain = read - write - 1;
93
94            if remain != 0 {
95                sz = min(remain, sz);
96                write
97            } else {
98                // Inverted, no room is available
99                self.write_in_progress.store(false, Ordering::Release);
100                return Err(WriteGrantError::InsufficientSize);
101            }
102        } else {
103            #[allow(clippy::collapsible_if)]
104            if write != max {
105                // Some (or all) room remaining in un-inverted case
106                sz = min(max - write, sz);
107                write
108            } else {
109                // Not inverted, but need to go inverted
110
111                // NOTE: We check read > 1, NOT read >= 1, because
112                // write must never == read in an inverted condition, since
113                // we will then not be able to tell if we are inverted or not
114                if read > 1 {
115                    sz = min(read - 1, sz);
116                    0
117                } else {
118                    // Not invertible, no space
119                    self.write_in_progress.store(false, Ordering::Release);
120                    return Err(WriteGrantError::InsufficientSize);
121                }
122            }
123        };
124
125        // Safe write, only viewed by this task
126        self.reserve.store(start + sz, Ordering::Release);
127
128        Ok((start, sz))
129    }
130
131    fn grant_exact(&self, capacity: usize, sz: usize) -> Result<usize, WriteGrantError> {
132        if self.write_in_progress.swap(true, Ordering::AcqRel) {
133            return Err(WriteGrantError::GrantInProgress);
134        }
135
136        // Writer component. Must never write to `read`,
137        // be careful writing to `load`
138        let write = self.write.load(Ordering::Acquire);
139        let read = self.read.load(Ordering::Acquire);
140        let max = capacity;
141        let already_inverted = write < read;
142
143        let start = if already_inverted {
144            if (write + sz) < read {
145                // Inverted, room is still available
146                write
147            } else {
148                // Inverted, no room is available
149                self.write_in_progress.store(false, Ordering::Release);
150                return Err(WriteGrantError::InsufficientSize);
151            }
152        } else {
153            #[allow(clippy::collapsible_if)]
154            if write + sz <= max {
155                // Non inverted condition
156                write
157            } else {
158                // Not inverted, but need to go inverted
159
160                // NOTE: We check sz < read, NOT <=, because
161                // write must never == read in an inverted condition, since
162                // we will then not be able to tell if we are inverted or not
163                if sz < read {
164                    // Invertible situation
165                    0
166                } else {
167                    // Not invertible, no space
168                    self.write_in_progress.store(false, Ordering::Release);
169                    return Err(WriteGrantError::InsufficientSize);
170                }
171            }
172        };
173
174        // Safe write, only viewed by this task
175        self.reserve.store(start + sz, Ordering::Release);
176
177        Ok(start)
178    }
179
180    fn read(&self) -> Result<(usize, usize), ReadGrantError> {
181        if self.read_in_progress.swap(true, Ordering::AcqRel) {
182            return Err(ReadGrantError::GrantInProgress);
183        }
184
185        let write = self.write.load(Ordering::Acquire);
186        let last = self.last.load(Ordering::Acquire);
187        let mut read = self.read.load(Ordering::Acquire);
188
189        // Resolve the inverted case or end of read
190        if (read == last) && (write < read) {
191            read = 0;
192            // This has some room for error, the other thread reads this
193            // Impact to Grant:
194            //   Grant checks if read < write to see if inverted. If not inverted, but
195            //     no space left, Grant will initiate an inversion, but will not trigger it
196            // Impact to Commit:
197            //   Commit does not check read, but if Grant has started an inversion,
198            //   grant could move Last to the prior write position
199            // MOVING READ BACKWARDS!
200            self.read.store(0, Ordering::Release);
201        }
202
203        let sz = if write < read {
204            // Inverted, only believe last
205            last
206        } else {
207            // Not inverted, only believe write
208            write
209        } - read;
210
211        if sz == 0 {
212            self.read_in_progress.store(false, Ordering::Release);
213            return Err(ReadGrantError::Empty);
214        }
215
216        Ok((read, sz))
217    }
218
219    fn commit_inner(&self, capacity: usize, grant_len: usize, used: usize) {
220        // If there is no grant in progress, return early. This
221        // generally means we are dropping the grant within a
222        // wrapper structure
223        if !self.write_in_progress.load(Ordering::Acquire) {
224            return;
225        }
226
227        // Writer component. Must never write to READ,
228        // be careful writing to LAST
229
230        // Saturate the grant commit
231        let len = grant_len;
232        let used = min(len, used);
233
234        let write = self.write.load(Ordering::Acquire);
235        self.reserve.fetch_sub(len - used, Ordering::AcqRel);
236
237        let max = capacity;
238        let last = self.last.load(Ordering::Acquire);
239        let new_write = self.reserve.load(Ordering::Acquire);
240
241        if (new_write < write) && (write != max) {
242            // We have already wrapped, but we are skipping some bytes at the end of the ring.
243            // Mark `last` where the write pointer used to be to hold the line here
244            self.last.store(write, Ordering::Release);
245        } else if new_write > last {
246            // We're about to pass the last pointer, which was previously the artificial
247            // end of the ring. Now that we've passed it, we can "unlock" the section
248            // that was previously skipped.
249            //
250            // Since new_write is strictly larger than last, it is safe to move this as
251            // the other thread will still be halted by the (about to be updated) write
252            // value
253            self.last.store(max, Ordering::Release);
254        }
255        // else: If new_write == last, either:
256        // * last == max, so no need to write, OR
257        // * If we write in the end chunk again, we'll update last to max next time
258        // * If we write to the start chunk in a wrap, we'll update last when we
259        //     move write backwards
260
261        // Write must be updated AFTER last, otherwise read could think it was
262        // time to invert early!
263        self.write.store(new_write, Ordering::Release);
264
265        // Allow subsequent grants
266        self.write_in_progress.store(false, Ordering::Release);
267    }
268
269    fn release_inner(&self, used: usize) {
270        // If there is no grant in progress, return early. This
271        // generally means we are dropping the grant within a
272        // wrapper structure
273        if !self.read_in_progress.load(Ordering::Acquire) {
274            return;
275        }
276
277        // // This should always be checked by the public interfaces
278        // debug_assert!(used <= self.buf.len());
279
280        // This should be fine, purely incrementing
281        let _ = self.read.fetch_add(used, Ordering::Release);
282
283        self.read_in_progress.store(false, Ordering::Release);
284    }
285}