kovan_queue/array_queue.rs
1use std::cell::UnsafeCell;
2use std::mem::MaybeUninit;
3use std::sync::atomic::Ordering;
4
5use crate::utils::CacheAligned;
6
7// vertexia: head/tail/stamp are this queue's entire concurrency surface
8// (push/pop is a closed CAS protocol over just these three, no kovan
9// dependency). Swapping them for shuttle's `AtomicUsize` under the
10// `shuttle` feature gives the scheduler a yield point at every stamp check
11// and every head/tail CAS -- where a lost-update or torn-slot bug would
12// show up.
13#[cfg(feature = "shuttle")]
14use shuttle::sync::atomic::AtomicUsize;
15#[cfg(not(feature = "shuttle"))]
16use std::sync::atomic::AtomicUsize;
17
18/// Contention backoff for `push`/`pop`'s retry loops. Under a normal build,
19/// `crossbeam_utils::Backoff`'s usual exponential spin-then-yield. Under
20/// `shuttle`, `crossbeam_utils::Backoff::snooze`'s eventual
21/// `std::thread::yield_now` is a no-op for shuttle's cooperative scheduler
22/// (every other task is genuinely parked, not merely de-prioritized) and,
23/// worse, doesn't tell the scheduler this thread yielded -- a plain
24/// instrumented `.load()` in the loop is a valid scheduling point but not a
25/// *fair* one, so an unlucky priority draw can re-select the same spinning
26/// thread until shuttle's step budget is exhausted ("exceeded max_steps
27/// bound", an unfair schedule, not a real bug -- see the identical failure
28/// mode and full reasoning on `kovan-map`'s `resize_spin_hint`).
29/// `shuttle::hint::spin_loop` (which calls `shuttle::thread::yield_now`) is
30/// the explicit signal PCT/random need to guarantee the other side a turn.
31#[inline(always)]
32fn backoff_hint(backoff: &crossbeam_utils::Backoff) {
33 #[cfg(feature = "shuttle")]
34 {
35 let _ = backoff;
36 shuttle::hint::spin_loop();
37 }
38 #[cfg(not(feature = "shuttle"))]
39 {
40 backoff.snooze();
41 }
42}
43
44/// A slot in a queue.
45struct Slot<T> {
46 /// The current stamp.
47 stamp: AtomicUsize,
48
49 /// The value in this slot.
50 value: UnsafeCell<MaybeUninit<T>>,
51}
52
53/// A bounded multi-producer multi-consumer queue.
54pub struct ArrayQueue<T> {
55 /// The head of the queue.
56 head: CacheAligned<AtomicUsize>,
57
58 /// The tail of the queue.
59 tail: CacheAligned<AtomicUsize>,
60
61 /// The buffer holding slots.
62 buffer: Box<[Slot<T>]>,
63
64 /// A mask for indices.
65 mask: usize,
66}
67
68unsafe impl<T: Send> Send for ArrayQueue<T> {}
69unsafe impl<T: Send> Sync for ArrayQueue<T> {}
70
71impl<T> ArrayQueue<T> {
72 /// Creates a new bounded queue with the given capacity.
73 ///
74 /// The capacity will be rounded up to the next power of two.
75 pub fn new(cap: usize) -> ArrayQueue<T> {
76 let capacity = if cap < 1 { 1 } else { cap.next_power_of_two() };
77 let mut buffer = Vec::with_capacity(capacity);
78
79 for i in 0..capacity {
80 buffer.push(Slot {
81 stamp: AtomicUsize::new(i),
82 value: UnsafeCell::new(MaybeUninit::uninit()),
83 });
84 }
85
86 ArrayQueue {
87 buffer: buffer.into_boxed_slice(),
88 mask: capacity - 1,
89 head: CacheAligned::new(AtomicUsize::new(0)),
90 tail: CacheAligned::new(AtomicUsize::new(0)),
91 }
92 }
93
94 /// Pushes an element into the queue.
95 pub fn push(&self, value: T) -> Result<(), T> {
96 let backoff = crossbeam_utils::Backoff::new();
97 let mut tail = self.tail.load(Ordering::Relaxed);
98
99 loop {
100 let index = tail & self.mask;
101 let slot = &self.buffer[index];
102 let stamp = slot.stamp.load(Ordering::Acquire);
103
104 if tail == stamp {
105 let next = tail + 1;
106 if self
107 .tail
108 .compare_exchange_weak(tail, next, Ordering::SeqCst, Ordering::Relaxed)
109 .is_ok()
110 {
111 unsafe {
112 slot.value.get().write(MaybeUninit::new(value));
113 }
114 slot.stamp.store(tail + 1, Ordering::Release);
115 return Ok(());
116 }
117 } else if tail + 1 > stamp {
118 let head = self.head.load(Ordering::Relaxed);
119 if tail >= head + self.buffer.len() {
120 return Err(value);
121 }
122 backoff_hint(&backoff);
123 } else {
124 backoff_hint(&backoff);
125 }
126 tail = self.tail.load(Ordering::Relaxed);
127 }
128 }
129
130 /// Pops an element from the queue.
131 pub fn pop(&self) -> Option<T> {
132 let backoff = crossbeam_utils::Backoff::new();
133 let mut head = self.head.load(Ordering::Relaxed);
134
135 loop {
136 let index = head & self.mask;
137 let slot = &self.buffer[index];
138 let stamp = slot.stamp.load(Ordering::Acquire);
139
140 if head + 1 == stamp {
141 let next = head + 1;
142 if self
143 .head
144 .compare_exchange_weak(head, next, Ordering::SeqCst, Ordering::Relaxed)
145 .is_ok()
146 {
147 let value = unsafe { slot.value.get().read().assume_init() };
148 slot.stamp
149 .store(head + self.buffer.len(), Ordering::Release);
150 return Some(value);
151 }
152 } else if head == stamp {
153 let tail = self.tail.load(Ordering::Relaxed);
154 if tail == head {
155 return None;
156 }
157 backoff_hint(&backoff);
158 } else {
159 backoff_hint(&backoff);
160 }
161 head = self.head.load(Ordering::Relaxed);
162 }
163 }
164
165 /// Returns the capacity of the queue.
166 pub fn capacity(&self) -> usize {
167 self.buffer.len()
168 }
169
170 /// Returns `true` if the queue is empty.
171 pub fn is_empty(&self) -> bool {
172 let head = self.head.load(Ordering::SeqCst);
173 let tail = self.tail.load(Ordering::SeqCst);
174 head == tail
175 }
176
177 /// Returns `true` if the queue is full.
178 pub fn is_full(&self) -> bool {
179 let head = self.head.load(Ordering::SeqCst);
180 let tail = self.tail.load(Ordering::SeqCst);
181 tail == head + self.buffer.len()
182 }
183}
184
185/// Drop all remaining values in the queue when it is dropped.
186///
187/// Without this, `T` values sitting in initialized slots would have their
188/// memory freed (the `Box<[Slot<T>]>` backing is released) without ever
189/// calling `T::drop()`, leaking resources such as file handles or locks.
190impl<T> Drop for ArrayQueue<T> {
191 fn drop(&mut self) {
192 // Drain all remaining elements. Each `pop()` call moves the value out
193 // of its `MaybeUninit` slot and returns it as `Some(T)`; when that
194 // `Some(T)` goes out of scope at the end of the loop body, `T::drop()`
195 // runs automatically.
196 while self.pop().is_some() {}
197 }
198}