flize/queue.rs
1// LICENSE NOTICE: Most of this code has been copied from the crossbeam repository with the MIT license.
2
3use crate::{Backoff, CachePadded};
4use core::cell::UnsafeCell;
5use core::fmt;
6use core::marker::PhantomData;
7use core::mem::MaybeUninit;
8use core::ptr;
9use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
10use std::boxed::Box;
11
12// Bits indicating the state of a slot:
13// * If a value has been written into the slot, `WRITE` is set.
14// * If a value has been read from the slot, `READ` is set.
15// * If the block is being destroyed, `DESTROY` is set.
16const WRITE: usize = 1;
17const READ: usize = 2;
18const DESTROY: usize = 4;
19
20// Each block covers one "lap" of indices.
21const LAP: usize = 32;
22// The maximum number of values a block can hold.
23const BLOCK_CAP: usize = LAP - 1;
24// How many lower bits are reserved for metadata.
25const SHIFT: usize = 1;
26// Indicates that the block is not the last one.
27const HAS_NEXT: usize = 1;
28
29/// A slot in a block.
30struct Slot<T> {
31 /// The value.
32 value: UnsafeCell<MaybeUninit<T>>,
33
34 /// The state of the slot.
35 state: AtomicUsize,
36}
37
38impl<T> Slot<T> {
39 /// Waits until a value is written into the slot.
40 fn wait_write(&self) {
41 let backoff = Backoff::new();
42 while self.state.load(Ordering::Acquire) & WRITE == 0 {
43 backoff.snooze();
44 }
45 }
46}
47
48/// A block in a linked list.
49///
50/// Each block in the list can hold up to `BLOCK_CAP` values.
51struct Block<T> {
52 /// The next block in the linked list.
53 next: AtomicPtr<Block<T>>,
54
55 /// Slots for values.
56 slots: [Slot<T>; BLOCK_CAP],
57}
58
59impl<T> Block<T> {
60 /// Creates an empty block that starts at `start_index`.
61 fn new() -> Block<T> {
62 // SAFETY: This is safe because:
63 // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
64 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
65 // [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it
66 // holds a MaybeUninit.
67 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
68 unsafe { MaybeUninit::zeroed().assume_init() }
69 }
70
71 /// Waits until the next pointer is set.
72 fn wait_next(&self) -> *mut Block<T> {
73 let backoff = Backoff::new();
74 loop {
75 let next = self.next.load(Ordering::Acquire);
76 if !next.is_null() {
77 return next;
78 }
79 backoff.snooze();
80 }
81 }
82
83 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
84 unsafe fn destroy(this: *mut Block<T>, start: usize) {
85 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
86 // begun destruction of the block.
87 for i in start..BLOCK_CAP - 1 {
88 let slot = (*this).slots.get_unchecked(i);
89
90 // Mark the `DESTROY` bit if a thread is still using the slot.
91 if slot.state.load(Ordering::Acquire) & READ == 0
92 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
93 {
94 // If a thread is still using the slot, it will continue destruction of the block.
95 return;
96 }
97 }
98
99 // No thread is using the block, now it is safe to destroy it.
100 drop(Box::from_raw(this));
101 }
102}
103
104/// A position in a queue.
105struct Position<T> {
106 /// The index in the queue.
107 index: AtomicUsize,
108
109 /// The block in the linked list.
110 block: AtomicPtr<Block<T>>,
111}
112
113/// An unbounded multi-producer multi-consumer queue.
114///
115/// This queue is implemented as a linked list of segments, where each segment is a small buffer
116/// that can hold a handful of elements. There is no limit to how many elements can be in the queue
117/// at a time. However, since segments need to be dynamically allocated as elements get pushed,
118/// this queue is somewhat slower than [`ArrayQueue`].
119pub struct Queue<T> {
120 /// The head of the queue.
121 head: CachePadded<Position<T>>,
122
123 /// The tail of the queue.
124 tail: CachePadded<Position<T>>,
125
126 /// Indicates that dropping a `Queue<T>` may drop values of type `T`.
127 _marker: PhantomData<T>,
128}
129
130unsafe impl<T: Send> Send for Queue<T> {}
131unsafe impl<T: Send> Sync for Queue<T> {}
132
133impl<T> Queue<T> {
134 /// Creates a new unbounded queue.
135 pub const fn new() -> Queue<T> {
136 Queue {
137 head: CachePadded::new(Position {
138 block: AtomicPtr::new(ptr::null_mut()),
139 index: AtomicUsize::new(0),
140 }),
141 tail: CachePadded::new(Position {
142 block: AtomicPtr::new(ptr::null_mut()),
143 index: AtomicUsize::new(0),
144 }),
145 _marker: PhantomData,
146 }
147 }
148
149 /// Pushes an element into the queue.
150 pub fn push(&self, value: T) {
151 let backoff = Backoff::new();
152 let mut tail = self.tail.index.load(Ordering::Acquire);
153 let mut block = self.tail.block.load(Ordering::Acquire);
154 let mut next_block = None;
155
156 loop {
157 // Calculate the offset of the index into the block.
158 let offset = (tail >> SHIFT) % LAP;
159
160 // If we reached the end of the block, wait until the next one is installed.
161 if offset == BLOCK_CAP {
162 backoff.snooze();
163 tail = self.tail.index.load(Ordering::Acquire);
164 block = self.tail.block.load(Ordering::Acquire);
165 continue;
166 }
167
168 // If we're going to have to install the next block, allocate it in advance in order to
169 // make the wait for other threads as short as possible.
170 if offset + 1 == BLOCK_CAP && next_block.is_none() {
171 next_block = Some(Box::new(Block::<T>::new()));
172 }
173
174 // If this is the first push operation, we need to allocate the first block.
175 if block.is_null() {
176 let new = Box::into_raw(Box::new(Block::<T>::new()));
177
178 if self
179 .tail
180 .block
181 .compare_and_swap(block, new, Ordering::Release)
182 == block
183 {
184 self.head.block.store(new, Ordering::Release);
185 block = new;
186 } else {
187 next_block = unsafe { Some(Box::from_raw(new)) };
188 tail = self.tail.index.load(Ordering::Acquire);
189 block = self.tail.block.load(Ordering::Acquire);
190 continue;
191 }
192 }
193
194 let new_tail = tail + (1 << SHIFT);
195
196 // Try advancing the tail forward.
197 match self.tail.index.compare_exchange_weak(
198 tail,
199 new_tail,
200 Ordering::SeqCst,
201 Ordering::Acquire,
202 ) {
203 Ok(_) => unsafe {
204 // If we've reached the end of the block, install the next one.
205 if offset + 1 == BLOCK_CAP {
206 let next_block = Box::into_raw(next_block.unwrap());
207 let next_index = new_tail.wrapping_add(1 << SHIFT);
208
209 self.tail.block.store(next_block, Ordering::Release);
210 self.tail.index.store(next_index, Ordering::Release);
211 (*block).next.store(next_block, Ordering::Release);
212 }
213
214 // Write the value into the slot.
215 let slot = (*block).slots.get_unchecked(offset);
216 slot.value.get().write(MaybeUninit::new(value));
217 slot.state.fetch_or(WRITE, Ordering::Release);
218
219 return;
220 },
221 Err(t) => {
222 tail = t;
223 block = self.tail.block.load(Ordering::Acquire);
224 backoff.spin();
225 }
226 }
227 }
228 }
229
230 /// Pops an element from the queue.
231 pub fn pop(&self) -> Option<T> {
232 let backoff = Backoff::new();
233 let mut head = self.head.index.load(Ordering::Acquire);
234 let mut block = self.head.block.load(Ordering::Acquire);
235
236 loop {
237 // Calculate the offset of the index into the block.
238 let offset = (head >> SHIFT) % LAP;
239
240 // If we reached the end of the block, wait until the next one is installed.
241 if offset == BLOCK_CAP {
242 backoff.snooze();
243 head = self.head.index.load(Ordering::Acquire);
244 block = self.head.block.load(Ordering::Acquire);
245 continue;
246 }
247
248 let mut new_head = head + (1 << SHIFT);
249
250 if new_head & HAS_NEXT == 0 {
251 atomic::fence(Ordering::SeqCst);
252 let tail = self.tail.index.load(Ordering::Relaxed);
253
254 // If the tail equals the head, that means the queue is empty.
255 if head >> SHIFT == tail >> SHIFT {
256 return None;
257 }
258
259 // If head and tail are not in the same block, set `HAS_NEXT` in head.
260 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
261 new_head |= HAS_NEXT;
262 }
263 }
264
265 // The block can be null here only if the first push operation is in progress. In that
266 // case, just wait until it gets initialized.
267 if block.is_null() {
268 backoff.snooze();
269 head = self.head.index.load(Ordering::Acquire);
270 block = self.head.block.load(Ordering::Acquire);
271 continue;
272 }
273
274 // Try moving the head index forward.
275 match self.head.index.compare_exchange_weak(
276 head,
277 new_head,
278 Ordering::SeqCst,
279 Ordering::Acquire,
280 ) {
281 Ok(_) => unsafe {
282 // If we've reached the end of the block, move to the next one.
283 if offset + 1 == BLOCK_CAP {
284 let next = (*block).wait_next();
285 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
286 if !(*next).next.load(Ordering::Relaxed).is_null() {
287 next_index |= HAS_NEXT;
288 }
289
290 self.head.block.store(next, Ordering::Release);
291 self.head.index.store(next_index, Ordering::Release);
292 }
293
294 // Read the value.
295 let slot = (*block).slots.get_unchecked(offset);
296 slot.wait_write();
297 let value = slot.value.get().read().assume_init();
298
299 // Destroy the block if we've reached the end, or if another thread wanted to
300 // destroy but couldn't because we were busy reading from the slot.
301 if offset + 1 == BLOCK_CAP {
302 Block::destroy(block, 0);
303 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
304 Block::destroy(block, offset + 1);
305 }
306
307 return Some(value);
308 },
309 Err(h) => {
310 head = h;
311 block = self.head.block.load(Ordering::Acquire);
312 backoff.spin();
313 }
314 }
315 }
316 }
317}
318
319impl<T> Drop for Queue<T> {
320 fn drop(&mut self) {
321 let mut head = self.head.index.load(Ordering::Relaxed);
322 let mut tail = self.tail.index.load(Ordering::Relaxed);
323 let mut block = self.head.block.load(Ordering::Relaxed);
324
325 // Erase the lower bits.
326 head &= !((1 << SHIFT) - 1);
327 tail &= !((1 << SHIFT) - 1);
328
329 unsafe {
330 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
331 while head != tail {
332 let offset = (head >> SHIFT) % LAP;
333
334 if offset < BLOCK_CAP {
335 // Drop the value in the slot.
336 let slot = (*block).slots.get_unchecked(offset);
337 let p = &mut *slot.value.get();
338 p.as_mut_ptr().drop_in_place();
339 } else {
340 // Deallocate the block and move to the next one.
341 let next = (*block).next.load(Ordering::Relaxed);
342 drop(Box::from_raw(block));
343 block = next;
344 }
345
346 head = head.wrapping_add(1 << SHIFT);
347 }
348
349 // Deallocate the last remaining block.
350 if !block.is_null() {
351 drop(Box::from_raw(block));
352 }
353 }
354 }
355}
356
357impl<T> fmt::Debug for Queue<T> {
358 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
359 f.pad("Queue { .. }")
360 }
361}
362
363impl<T> Default for Queue<T> {
364 fn default() -> Queue<T> {
365 Queue::new()
366 }
367}