Skip to main content

kovan_queue/
array_queue.rs

1use std::cell::UnsafeCell;
2use std::mem::MaybeUninit;
3use std::sync::atomic::Ordering;
4
5use crate::utils::CacheAligned;
6
7// vertexia: head/tail/stamp are this queue's entire concurrency surface
8// (push/pop is a closed CAS protocol over just these three, no kovan
9// dependency). Swapping them for shuttle's `AtomicUsize` under the
10// `shuttle` feature gives the scheduler a yield point at every stamp check
11// and every head/tail CAS -- where a lost-update or torn-slot bug would
12// show up.
13#[cfg(feature = "shuttle")]
14use shuttle::sync::atomic::AtomicUsize;
15#[cfg(not(feature = "shuttle"))]
16use std::sync::atomic::AtomicUsize;
17
18/// Contention backoff for `push`/`pop`'s retry loops. Under a normal build,
19/// `crossbeam_utils::Backoff`'s usual exponential spin-then-yield. Under
20/// `shuttle`, `crossbeam_utils::Backoff::snooze`'s eventual
21/// `std::thread::yield_now` is a no-op for shuttle's cooperative scheduler
22/// (every other task is genuinely parked, not merely de-prioritized) and,
23/// worse, doesn't tell the scheduler this thread yielded -- a plain
24/// instrumented `.load()` in the loop is a valid scheduling point but not a
25/// *fair* one, so an unlucky priority draw can re-select the same spinning
26/// thread until shuttle's step budget is exhausted ("exceeded max_steps
27/// bound", an unfair schedule, not a real bug -- see the identical failure
28/// mode and full reasoning on `kovan-map`'s `resize_spin_hint`).
29/// `shuttle::hint::spin_loop` (which calls `shuttle::thread::yield_now`) is
30/// the explicit signal PCT/random need to guarantee the other side a turn.
31#[inline(always)]
32fn backoff_hint(backoff: &crossbeam_utils::Backoff) {
33    #[cfg(feature = "shuttle")]
34    {
35        let _ = backoff;
36        shuttle::hint::spin_loop();
37    }
38    #[cfg(not(feature = "shuttle"))]
39    {
40        backoff.snooze();
41    }
42}
43
44/// SeqCst fence ordering the head/tail re-check after a stamp load in
45/// `push`'s full test and `pop`'s empty test, so a stale counter cannot
46/// misreport full/empty. Under `shuttle` this is a no-op: shuttle explores
47/// schedules over sequentially consistent atomics, so the fence adds no
48/// behavior there.
49#[inline(always)]
50fn full_fence() {
51    #[cfg(not(feature = "shuttle"))]
52    std::sync::atomic::fence(Ordering::SeqCst);
53}
54
55/// A slot in a queue.
56struct Slot<T> {
57    /// The current stamp.
58    stamp: AtomicUsize,
59
60    /// The value in this slot.
61    value: UnsafeCell<MaybeUninit<T>>,
62}
63
64/// A bounded multi-producer multi-consumer queue.
65pub struct ArrayQueue<T> {
66    /// The head of the queue.
67    head: CacheAligned<AtomicUsize>,
68
69    /// The tail of the queue.
70    tail: CacheAligned<AtomicUsize>,
71
72    /// The buffer holding slots.
73    buffer: Box<[Slot<T>]>,
74
75    /// A mask for indices.
76    mask: usize,
77
78    /// One full lap of head/tail sequence space: `2 * capacity`.
79    ///
80    /// Head and tail advance `+1` within a lap and jump to the next lap
81    /// boundary at the ring's end, so the stamp `pop` leaves for the next
82    /// lap (`head + one_lap`) can never equal the stamp `push` stores when
83    /// occupying a slot (`tail + 1`). With the previous `+1`-everywhere
84    /// scheme those collided when `capacity == 1` ("occupied" `x + 1` ==
85    /// next lap's "free" `x + capacity`), so a second push saw a full slot
86    /// as free, overwrote the unpopped value, and desynced the stamps -
87    /// after which `pop` (and `Drop`'s drain loop) spun forever.
88    one_lap: usize,
89}
90
91unsafe impl<T: Send> Send for ArrayQueue<T> {}
92unsafe impl<T: Send> Sync for ArrayQueue<T> {}
93
94impl<T> ArrayQueue<T> {
95    /// Creates a new bounded queue with the given capacity.
96    ///
97    /// The capacity will be rounded up to the next power of two.
98    pub fn new(cap: usize) -> ArrayQueue<T> {
99        let capacity = if cap < 1 { 1 } else { cap.next_power_of_two() };
100        let mut buffer = Vec::with_capacity(capacity);
101
102        for i in 0..capacity {
103            buffer.push(Slot {
104                stamp: AtomicUsize::new(i),
105                value: UnsafeCell::new(MaybeUninit::uninit()),
106            });
107        }
108
109        ArrayQueue {
110            buffer: buffer.into_boxed_slice(),
111            mask: capacity - 1,
112            one_lap: capacity * 2,
113            head: CacheAligned::new(AtomicUsize::new(0)),
114            tail: CacheAligned::new(AtomicUsize::new(0)),
115        }
116    }
117
118    /// The head/tail value that starts the lap after the one containing
119    /// `pos`: advance `+1` within a lap, jump here at the ring's end.
120    #[inline(always)]
121    fn next_lap(&self, pos: usize) -> usize {
122        (pos & !(self.one_lap - 1)).wrapping_add(self.one_lap)
123    }
124
125    /// Pushes an element into the queue.
126    pub fn push(&self, value: T) -> Result<(), T> {
127        let backoff = crossbeam_utils::Backoff::new();
128        let mut tail = self.tail.load(Ordering::Relaxed);
129
130        loop {
131            let index = tail & self.mask;
132            let slot = &self.buffer[index];
133            let stamp = slot.stamp.load(Ordering::Acquire);
134
135            if tail == stamp {
136                let next = if index + 1 < self.buffer.len() {
137                    tail + 1
138                } else {
139                    self.next_lap(tail)
140                };
141                if self
142                    .tail
143                    .compare_exchange_weak(tail, next, Ordering::SeqCst, Ordering::Relaxed)
144                    .is_ok()
145                {
146                    unsafe {
147                        slot.value.get().write(MaybeUninit::new(value));
148                    }
149                    slot.stamp.store(tail + 1, Ordering::Release);
150                    return Ok(());
151                }
152            } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
153                // The slot still holds the value pushed one lap back. Full
154                // unless a pop has advanced head since; the fence orders the
155                // head load after the stamp load so a stale head cannot
156                // report full when the slot was already drained.
157                full_fence();
158                let head = self.head.load(Ordering::Relaxed);
159                if head.wrapping_add(self.one_lap) == tail {
160                    return Err(value);
161                }
162                backoff_hint(&backoff);
163            } else {
164                backoff_hint(&backoff);
165            }
166            tail = self.tail.load(Ordering::Relaxed);
167        }
168    }
169
170    /// Pops an element from the queue.
171    pub fn pop(&self) -> Option<T> {
172        let backoff = crossbeam_utils::Backoff::new();
173        let mut head = self.head.load(Ordering::Relaxed);
174
175        loop {
176            let index = head & self.mask;
177            let slot = &self.buffer[index];
178            let stamp = slot.stamp.load(Ordering::Acquire);
179
180            if head + 1 == stamp {
181                let next = if index + 1 < self.buffer.len() {
182                    head + 1
183                } else {
184                    self.next_lap(head)
185                };
186                if self
187                    .head
188                    .compare_exchange_weak(head, next, Ordering::SeqCst, Ordering::Relaxed)
189                    .is_ok()
190                {
191                    let value = unsafe { slot.value.get().read().assume_init() };
192                    // Free the slot for the push one lap ahead, whose tail
193                    // will equal exactly `head + one_lap`.
194                    slot.stamp
195                        .store(head.wrapping_add(self.one_lap), Ordering::Release);
196                    return Some(value);
197                }
198            } else if head == stamp {
199                // Slot not yet pushed this lap: empty unless a push has
200                // advanced tail since; fence for the same reason as `push`.
201                full_fence();
202                let tail = self.tail.load(Ordering::Relaxed);
203                if tail == head {
204                    return None;
205                }
206                backoff_hint(&backoff);
207            } else {
208                backoff_hint(&backoff);
209            }
210            head = self.head.load(Ordering::Relaxed);
211        }
212    }
213
214    /// Returns the capacity of the queue.
215    pub fn capacity(&self) -> usize {
216        self.buffer.len()
217    }
218
219    /// Returns `true` if the queue is empty.
220    pub fn is_empty(&self) -> bool {
221        let head = self.head.load(Ordering::SeqCst);
222        let tail = self.tail.load(Ordering::SeqCst);
223        head == tail
224    }
225
226    /// Returns `true` if the queue is full.
227    pub fn is_full(&self) -> bool {
228        let head = self.head.load(Ordering::SeqCst);
229        let tail = self.tail.load(Ordering::SeqCst);
230        // Full when tail is exactly one lap ahead of head.
231        tail == head.wrapping_add(self.one_lap)
232    }
233}
234
235/// Drop all remaining values in the queue when it is dropped.
236///
237/// Without this, `T` values sitting in initialized slots would have their
238/// memory freed (the `Box<[Slot<T>]>` backing is released) without ever
239/// calling `T::drop()`, leaking resources such as file handles or locks.
240impl<T> Drop for ArrayQueue<T> {
241    fn drop(&mut self) {
242        // Drain all remaining elements.  Each `pop()` call moves the value out
243        // of its `MaybeUninit` slot and returns it as `Some(T)`; when that
244        // `Some(T)` goes out of scope at the end of the loop body, `T::drop()`
245        // runs automatically.
246        while self.pop().is_some() {}
247    }
248}