kovan_queue/array_queue.rs
1use std::cell::UnsafeCell;
2use std::mem::MaybeUninit;
3use std::sync::atomic::Ordering;
4
5use crate::utils::CacheAligned;
6
7// vertexia: head/tail/stamp are this queue's entire concurrency surface
8// (push/pop is a closed CAS protocol over just these three, no kovan
9// dependency). Swapping them for shuttle's `AtomicUsize` under the
10// `shuttle` feature gives the scheduler a yield point at every stamp check
11// and every head/tail CAS -- where a lost-update or torn-slot bug would
12// show up.
13#[cfg(feature = "shuttle")]
14use shuttle::sync::atomic::AtomicUsize;
15#[cfg(not(feature = "shuttle"))]
16use std::sync::atomic::AtomicUsize;
17
18/// Contention backoff for `push`/`pop`'s retry loops. Under a normal build,
19/// `crossbeam_utils::Backoff`'s usual exponential spin-then-yield. Under
20/// `shuttle`, `crossbeam_utils::Backoff::snooze`'s eventual
21/// `std::thread::yield_now` is a no-op for shuttle's cooperative scheduler
22/// (every other task is genuinely parked, not merely de-prioritized) and,
23/// worse, doesn't tell the scheduler this thread yielded -- a plain
24/// instrumented `.load()` in the loop is a valid scheduling point but not a
25/// *fair* one, so an unlucky priority draw can re-select the same spinning
26/// thread until shuttle's step budget is exhausted ("exceeded max_steps
27/// bound", an unfair schedule, not a real bug -- see the identical failure
28/// mode and full reasoning on `kovan-map`'s `resize_spin_hint`).
29/// `shuttle::hint::spin_loop` (which calls `shuttle::thread::yield_now`) is
30/// the explicit signal PCT/random need to guarantee the other side a turn.
31#[inline(always)]
32fn backoff_hint(backoff: &crossbeam_utils::Backoff) {
33 #[cfg(feature = "shuttle")]
34 {
35 let _ = backoff;
36 shuttle::hint::spin_loop();
37 }
38 #[cfg(not(feature = "shuttle"))]
39 {
40 backoff.snooze();
41 }
42}
43
44/// SeqCst fence ordering the head/tail re-check after a stamp load in
45/// `push`'s full test and `pop`'s empty test, so a stale counter cannot
46/// misreport full/empty. Under `shuttle` this is a no-op: shuttle explores
47/// schedules over sequentially consistent atomics, so the fence adds no
48/// behavior there.
49#[inline(always)]
50fn full_fence() {
51 #[cfg(not(feature = "shuttle"))]
52 std::sync::atomic::fence(Ordering::SeqCst);
53}
54
55/// A slot in a queue.
56struct Slot<T> {
57 /// The current stamp.
58 stamp: AtomicUsize,
59
60 /// The value in this slot.
61 value: UnsafeCell<MaybeUninit<T>>,
62}
63
64/// A bounded multi-producer multi-consumer queue.
65pub struct ArrayQueue<T> {
66 /// The head of the queue.
67 head: CacheAligned<AtomicUsize>,
68
69 /// The tail of the queue.
70 tail: CacheAligned<AtomicUsize>,
71
72 /// The buffer holding slots.
73 buffer: Box<[Slot<T>]>,
74
75 /// A mask for indices.
76 mask: usize,
77
78 /// One full lap of head/tail sequence space: `2 * capacity`.
79 ///
80 /// Head and tail advance `+1` within a lap and jump to the next lap
81 /// boundary at the ring's end, so the stamp `pop` leaves for the next
82 /// lap (`head + one_lap`) can never equal the stamp `push` stores when
83 /// occupying a slot (`tail + 1`). With the previous `+1`-everywhere
84 /// scheme those collided when `capacity == 1` ("occupied" `x + 1` ==
85 /// next lap's "free" `x + capacity`), so a second push saw a full slot
86 /// as free, overwrote the unpopped value, and desynced the stamps -
87 /// after which `pop` (and `Drop`'s drain loop) spun forever.
88 one_lap: usize,
89}
90
91unsafe impl<T: Send> Send for ArrayQueue<T> {}
92unsafe impl<T: Send> Sync for ArrayQueue<T> {}
93
94impl<T> ArrayQueue<T> {
95 /// Creates a new bounded queue with the given capacity.
96 ///
97 /// The capacity will be rounded up to the next power of two.
98 pub fn new(cap: usize) -> ArrayQueue<T> {
99 let capacity = if cap < 1 { 1 } else { cap.next_power_of_two() };
100 let mut buffer = Vec::with_capacity(capacity);
101
102 for i in 0..capacity {
103 buffer.push(Slot {
104 stamp: AtomicUsize::new(i),
105 value: UnsafeCell::new(MaybeUninit::uninit()),
106 });
107 }
108
109 ArrayQueue {
110 buffer: buffer.into_boxed_slice(),
111 mask: capacity - 1,
112 one_lap: capacity * 2,
113 head: CacheAligned::new(AtomicUsize::new(0)),
114 tail: CacheAligned::new(AtomicUsize::new(0)),
115 }
116 }
117
118 /// The head/tail value that starts the lap after the one containing
119 /// `pos`: advance `+1` within a lap, jump here at the ring's end.
120 #[inline(always)]
121 fn next_lap(&self, pos: usize) -> usize {
122 (pos & !(self.one_lap - 1)).wrapping_add(self.one_lap)
123 }
124
125 /// Pushes an element into the queue.
126 pub fn push(&self, value: T) -> Result<(), T> {
127 let backoff = crossbeam_utils::Backoff::new();
128 let mut tail = self.tail.load(Ordering::Relaxed);
129
130 loop {
131 let index = tail & self.mask;
132 let slot = &self.buffer[index];
133 let stamp = slot.stamp.load(Ordering::Acquire);
134
135 if tail == stamp {
136 let next = if index + 1 < self.buffer.len() {
137 tail + 1
138 } else {
139 self.next_lap(tail)
140 };
141 if self
142 .tail
143 .compare_exchange_weak(tail, next, Ordering::SeqCst, Ordering::Relaxed)
144 .is_ok()
145 {
146 unsafe {
147 slot.value.get().write(MaybeUninit::new(value));
148 }
149 slot.stamp.store(tail + 1, Ordering::Release);
150 return Ok(());
151 }
152 } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
153 // The slot still holds the value pushed one lap back. Full
154 // unless a pop has advanced head since; the fence orders the
155 // head load after the stamp load so a stale head cannot
156 // report full when the slot was already drained.
157 full_fence();
158 let head = self.head.load(Ordering::Relaxed);
159 if head.wrapping_add(self.one_lap) == tail {
160 return Err(value);
161 }
162 backoff_hint(&backoff);
163 } else {
164 backoff_hint(&backoff);
165 }
166 tail = self.tail.load(Ordering::Relaxed);
167 }
168 }
169
170 /// Pops an element from the queue.
171 pub fn pop(&self) -> Option<T> {
172 let backoff = crossbeam_utils::Backoff::new();
173 let mut head = self.head.load(Ordering::Relaxed);
174
175 loop {
176 let index = head & self.mask;
177 let slot = &self.buffer[index];
178 let stamp = slot.stamp.load(Ordering::Acquire);
179
180 if head + 1 == stamp {
181 let next = if index + 1 < self.buffer.len() {
182 head + 1
183 } else {
184 self.next_lap(head)
185 };
186 if self
187 .head
188 .compare_exchange_weak(head, next, Ordering::SeqCst, Ordering::Relaxed)
189 .is_ok()
190 {
191 let value = unsafe { slot.value.get().read().assume_init() };
192 // Free the slot for the push one lap ahead, whose tail
193 // will equal exactly `head + one_lap`.
194 slot.stamp
195 .store(head.wrapping_add(self.one_lap), Ordering::Release);
196 return Some(value);
197 }
198 } else if head == stamp {
199 // Slot not yet pushed this lap: empty unless a push has
200 // advanced tail since; fence for the same reason as `push`.
201 full_fence();
202 let tail = self.tail.load(Ordering::Relaxed);
203 if tail == head {
204 return None;
205 }
206 backoff_hint(&backoff);
207 } else {
208 backoff_hint(&backoff);
209 }
210 head = self.head.load(Ordering::Relaxed);
211 }
212 }
213
214 /// Returns the capacity of the queue.
215 pub fn capacity(&self) -> usize {
216 self.buffer.len()
217 }
218
219 /// Returns `true` if the queue is empty.
220 pub fn is_empty(&self) -> bool {
221 let head = self.head.load(Ordering::SeqCst);
222 let tail = self.tail.load(Ordering::SeqCst);
223 head == tail
224 }
225
226 /// Returns `true` if the queue is full.
227 pub fn is_full(&self) -> bool {
228 let head = self.head.load(Ordering::SeqCst);
229 let tail = self.tail.load(Ordering::SeqCst);
230 // Full when tail is exactly one lap ahead of head.
231 tail == head.wrapping_add(self.one_lap)
232 }
233}
234
235/// Drop all remaining values in the queue when it is dropped.
236///
237/// Without this, `T` values sitting in initialized slots would have their
238/// memory freed (the `Box<[Slot<T>]>` backing is released) without ever
239/// calling `T::drop()`, leaking resources such as file handles or locks.
240impl<T> Drop for ArrayQueue<T> {
241 fn drop(&mut self) {
242 // Drain all remaining elements. Each `pop()` call moves the value out
243 // of its `MaybeUninit` slot and returns it as `Some(T)`; when that
244 // `Some(T)` goes out of scope at the end of the loop body, `T::drop()`
245 // runs automatically.
246 while self.pop().is_some() {}
247 }
248}