Skip to main content

kovan_queue/
array_queue.rs

1use std::cell::UnsafeCell;
2use std::mem::MaybeUninit;
3use std::sync::atomic::Ordering;
4
5use crate::utils::CacheAligned;
6
7// vertexia: head/tail/stamp are this queue's entire concurrency surface
8// (push/pop is a closed CAS protocol over just these three, no kovan
9// dependency). Swapping them for shuttle's `AtomicUsize` under the
10// `shuttle` feature gives the scheduler a yield point at every stamp check
11// and every head/tail CAS -- where a lost-update or torn-slot bug would
12// show up.
13#[cfg(feature = "shuttle")]
14use shuttle::sync::atomic::AtomicUsize;
15#[cfg(not(feature = "shuttle"))]
16use std::sync::atomic::AtomicUsize;
17
18/// Contention backoff for `push`/`pop`'s retry loops. Under a normal build,
19/// `crossbeam_utils::Backoff`'s usual exponential spin-then-yield. Under
20/// `shuttle`, `crossbeam_utils::Backoff::snooze`'s eventual
21/// `std::thread::yield_now` is a no-op for shuttle's cooperative scheduler
22/// (every other task is genuinely parked, not merely de-prioritized) and,
23/// worse, doesn't tell the scheduler this thread yielded -- a plain
24/// instrumented `.load()` in the loop is a valid scheduling point but not a
25/// *fair* one, so an unlucky priority draw can re-select the same spinning
26/// thread until shuttle's step budget is exhausted ("exceeded max_steps
27/// bound", an unfair schedule, not a real bug -- see the identical failure
28/// mode and full reasoning on `kovan-map`'s `resize_spin_hint`).
29/// `shuttle::hint::spin_loop` (which calls `shuttle::thread::yield_now`) is
30/// the explicit signal PCT/random need to guarantee the other side a turn.
31#[inline(always)]
32fn backoff_hint(backoff: &crossbeam_utils::Backoff) {
33    #[cfg(feature = "shuttle")]
34    {
35        let _ = backoff;
36        shuttle::hint::spin_loop();
37    }
38    #[cfg(not(feature = "shuttle"))]
39    {
40        backoff.snooze();
41    }
42}
43
44/// A slot in a queue.
45struct Slot<T> {
46    /// The current stamp.
47    stamp: AtomicUsize,
48
49    /// The value in this slot.
50    value: UnsafeCell<MaybeUninit<T>>,
51}
52
53/// A bounded multi-producer multi-consumer queue.
54pub struct ArrayQueue<T> {
55    /// The head of the queue.
56    head: CacheAligned<AtomicUsize>,
57
58    /// The tail of the queue.
59    tail: CacheAligned<AtomicUsize>,
60
61    /// The buffer holding slots.
62    buffer: Box<[Slot<T>]>,
63
64    /// A mask for indices.
65    mask: usize,
66}
67
68unsafe impl<T: Send> Send for ArrayQueue<T> {}
69unsafe impl<T: Send> Sync for ArrayQueue<T> {}
70
71impl<T> ArrayQueue<T> {
72    /// Creates a new bounded queue with the given capacity.
73    ///
74    /// The capacity will be rounded up to the next power of two.
75    pub fn new(cap: usize) -> ArrayQueue<T> {
76        let capacity = if cap < 1 { 1 } else { cap.next_power_of_two() };
77        let mut buffer = Vec::with_capacity(capacity);
78
79        for i in 0..capacity {
80            buffer.push(Slot {
81                stamp: AtomicUsize::new(i),
82                value: UnsafeCell::new(MaybeUninit::uninit()),
83            });
84        }
85
86        ArrayQueue {
87            buffer: buffer.into_boxed_slice(),
88            mask: capacity - 1,
89            head: CacheAligned::new(AtomicUsize::new(0)),
90            tail: CacheAligned::new(AtomicUsize::new(0)),
91        }
92    }
93
94    /// Pushes an element into the queue.
95    pub fn push(&self, value: T) -> Result<(), T> {
96        let backoff = crossbeam_utils::Backoff::new();
97        let mut tail = self.tail.load(Ordering::Relaxed);
98
99        loop {
100            let index = tail & self.mask;
101            let slot = &self.buffer[index];
102            let stamp = slot.stamp.load(Ordering::Acquire);
103
104            if tail == stamp {
105                let next = tail + 1;
106                if self
107                    .tail
108                    .compare_exchange_weak(tail, next, Ordering::SeqCst, Ordering::Relaxed)
109                    .is_ok()
110                {
111                    unsafe {
112                        slot.value.get().write(MaybeUninit::new(value));
113                    }
114                    slot.stamp.store(tail + 1, Ordering::Release);
115                    return Ok(());
116                }
117            } else if tail + 1 > stamp {
118                let head = self.head.load(Ordering::Relaxed);
119                if tail >= head + self.buffer.len() {
120                    return Err(value);
121                }
122                backoff_hint(&backoff);
123            } else {
124                backoff_hint(&backoff);
125            }
126            tail = self.tail.load(Ordering::Relaxed);
127        }
128    }
129
130    /// Pops an element from the queue.
131    pub fn pop(&self) -> Option<T> {
132        let backoff = crossbeam_utils::Backoff::new();
133        let mut head = self.head.load(Ordering::Relaxed);
134
135        loop {
136            let index = head & self.mask;
137            let slot = &self.buffer[index];
138            let stamp = slot.stamp.load(Ordering::Acquire);
139
140            if head + 1 == stamp {
141                let next = head + 1;
142                if self
143                    .head
144                    .compare_exchange_weak(head, next, Ordering::SeqCst, Ordering::Relaxed)
145                    .is_ok()
146                {
147                    let value = unsafe { slot.value.get().read().assume_init() };
148                    slot.stamp
149                        .store(head + self.buffer.len(), Ordering::Release);
150                    return Some(value);
151                }
152            } else if head == stamp {
153                let tail = self.tail.load(Ordering::Relaxed);
154                if tail == head {
155                    return None;
156                }
157                backoff_hint(&backoff);
158            } else {
159                backoff_hint(&backoff);
160            }
161            head = self.head.load(Ordering::Relaxed);
162        }
163    }
164
165    /// Returns the capacity of the queue.
166    pub fn capacity(&self) -> usize {
167        self.buffer.len()
168    }
169
170    /// Returns `true` if the queue is empty.
171    pub fn is_empty(&self) -> bool {
172        let head = self.head.load(Ordering::SeqCst);
173        let tail = self.tail.load(Ordering::SeqCst);
174        head == tail
175    }
176
177    /// Returns `true` if the queue is full.
178    pub fn is_full(&self) -> bool {
179        let head = self.head.load(Ordering::SeqCst);
180        let tail = self.tail.load(Ordering::SeqCst);
181        tail == head + self.buffer.len()
182    }
183}
184
185/// Drop all remaining values in the queue when it is dropped.
186///
187/// Without this, `T` values sitting in initialized slots would have their
188/// memory freed (the `Box<[Slot<T>]>` backing is released) without ever
189/// calling `T::drop()`, leaking resources such as file handles or locks.
190impl<T> Drop for ArrayQueue<T> {
191    fn drop(&mut self) {
192        // Drain all remaining elements.  Each `pop()` call moves the value out
193        // of its `MaybeUninit` slot and returns it as `Some(T)`; when that
194        // `Some(T)` goes out of scope at the end of the loop body, `T::drop()`
195        // runs automatically.
196        while self.pop().is_some() {}
197    }
198}