flize/
queue.rs

1// LICENSE NOTICE: Most of this code has been copied from the crossbeam repository with the MIT license.
2
3use crate::{Backoff, CachePadded};
4use core::cell::UnsafeCell;
5use core::fmt;
6use core::marker::PhantomData;
7use core::mem::MaybeUninit;
8use core::ptr;
9use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
10use std::boxed::Box;
11
12// Bits indicating the state of a slot:
13// * If a value has been written into the slot, `WRITE` is set.
14// * If a value has been read from the slot, `READ` is set.
15// * If the block is being destroyed, `DESTROY` is set.
16const WRITE: usize = 1;
17const READ: usize = 2;
18const DESTROY: usize = 4;
19
20// Each block covers one "lap" of indices.
21const LAP: usize = 32;
22// The maximum number of values a block can hold.
23const BLOCK_CAP: usize = LAP - 1;
24// How many lower bits are reserved for metadata.
25const SHIFT: usize = 1;
26// Indicates that the block is not the last one.
27const HAS_NEXT: usize = 1;
28
29/// A slot in a block.
30struct Slot<T> {
31    /// The value.
32    value: UnsafeCell<MaybeUninit<T>>,
33
34    /// The state of the slot.
35    state: AtomicUsize,
36}
37
38impl<T> Slot<T> {
39    /// Waits until a value is written into the slot.
40    fn wait_write(&self) {
41        let backoff = Backoff::new();
42        while self.state.load(Ordering::Acquire) & WRITE == 0 {
43            backoff.snooze();
44        }
45    }
46}
47
48/// A block in a linked list.
49///
50/// Each block in the list can hold up to `BLOCK_CAP` values.
51struct Block<T> {
52    /// The next block in the linked list.
53    next: AtomicPtr<Block<T>>,
54
55    /// Slots for values.
56    slots: [Slot<T>; BLOCK_CAP],
57}
58
59impl<T> Block<T> {
60    /// Creates an empty block that starts at `start_index`.
61    fn new() -> Block<T> {
62        // SAFETY: This is safe because:
63        //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
64        //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
65        //  [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it
66        //       holds a MaybeUninit.
67        //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
68        unsafe { MaybeUninit::zeroed().assume_init() }
69    }
70
71    /// Waits until the next pointer is set.
72    fn wait_next(&self) -> *mut Block<T> {
73        let backoff = Backoff::new();
74        loop {
75            let next = self.next.load(Ordering::Acquire);
76            if !next.is_null() {
77                return next;
78            }
79            backoff.snooze();
80        }
81    }
82
83    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
84    unsafe fn destroy(this: *mut Block<T>, start: usize) {
85        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
86        // begun destruction of the block.
87        for i in start..BLOCK_CAP - 1 {
88            let slot = (*this).slots.get_unchecked(i);
89
90            // Mark the `DESTROY` bit if a thread is still using the slot.
91            if slot.state.load(Ordering::Acquire) & READ == 0
92                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
93            {
94                // If a thread is still using the slot, it will continue destruction of the block.
95                return;
96            }
97        }
98
99        // No thread is using the block, now it is safe to destroy it.
100        drop(Box::from_raw(this));
101    }
102}
103
104/// A position in a queue.
105struct Position<T> {
106    /// The index in the queue.
107    index: AtomicUsize,
108
109    /// The block in the linked list.
110    block: AtomicPtr<Block<T>>,
111}
112
113/// An unbounded multi-producer multi-consumer queue.
114///
115/// This queue is implemented as a linked list of segments, where each segment is a small buffer
116/// that can hold a handful of elements. There is no limit to how many elements can be in the queue
117/// at a time. However, since segments need to be dynamically allocated as elements get pushed,
118/// this queue is somewhat slower than [`ArrayQueue`].
119pub struct Queue<T> {
120    /// The head of the queue.
121    head: CachePadded<Position<T>>,
122
123    /// The tail of the queue.
124    tail: CachePadded<Position<T>>,
125
126    /// Indicates that dropping a `Queue<T>` may drop values of type `T`.
127    _marker: PhantomData<T>,
128}
129
130unsafe impl<T: Send> Send for Queue<T> {}
131unsafe impl<T: Send> Sync for Queue<T> {}
132
133impl<T> Queue<T> {
134    /// Creates a new unbounded queue.
135    pub const fn new() -> Queue<T> {
136        Queue {
137            head: CachePadded::new(Position {
138                block: AtomicPtr::new(ptr::null_mut()),
139                index: AtomicUsize::new(0),
140            }),
141            tail: CachePadded::new(Position {
142                block: AtomicPtr::new(ptr::null_mut()),
143                index: AtomicUsize::new(0),
144            }),
145            _marker: PhantomData,
146        }
147    }
148
149    /// Pushes an element into the queue.
150    pub fn push(&self, value: T) {
151        let backoff = Backoff::new();
152        let mut tail = self.tail.index.load(Ordering::Acquire);
153        let mut block = self.tail.block.load(Ordering::Acquire);
154        let mut next_block = None;
155
156        loop {
157            // Calculate the offset of the index into the block.
158            let offset = (tail >> SHIFT) % LAP;
159
160            // If we reached the end of the block, wait until the next one is installed.
161            if offset == BLOCK_CAP {
162                backoff.snooze();
163                tail = self.tail.index.load(Ordering::Acquire);
164                block = self.tail.block.load(Ordering::Acquire);
165                continue;
166            }
167
168            // If we're going to have to install the next block, allocate it in advance in order to
169            // make the wait for other threads as short as possible.
170            if offset + 1 == BLOCK_CAP && next_block.is_none() {
171                next_block = Some(Box::new(Block::<T>::new()));
172            }
173
174            // If this is the first push operation, we need to allocate the first block.
175            if block.is_null() {
176                let new = Box::into_raw(Box::new(Block::<T>::new()));
177
178                if self
179                    .tail
180                    .block
181                    .compare_and_swap(block, new, Ordering::Release)
182                    == block
183                {
184                    self.head.block.store(new, Ordering::Release);
185                    block = new;
186                } else {
187                    next_block = unsafe { Some(Box::from_raw(new)) };
188                    tail = self.tail.index.load(Ordering::Acquire);
189                    block = self.tail.block.load(Ordering::Acquire);
190                    continue;
191                }
192            }
193
194            let new_tail = tail + (1 << SHIFT);
195
196            // Try advancing the tail forward.
197            match self.tail.index.compare_exchange_weak(
198                tail,
199                new_tail,
200                Ordering::SeqCst,
201                Ordering::Acquire,
202            ) {
203                Ok(_) => unsafe {
204                    // If we've reached the end of the block, install the next one.
205                    if offset + 1 == BLOCK_CAP {
206                        let next_block = Box::into_raw(next_block.unwrap());
207                        let next_index = new_tail.wrapping_add(1 << SHIFT);
208
209                        self.tail.block.store(next_block, Ordering::Release);
210                        self.tail.index.store(next_index, Ordering::Release);
211                        (*block).next.store(next_block, Ordering::Release);
212                    }
213
214                    // Write the value into the slot.
215                    let slot = (*block).slots.get_unchecked(offset);
216                    slot.value.get().write(MaybeUninit::new(value));
217                    slot.state.fetch_or(WRITE, Ordering::Release);
218
219                    return;
220                },
221                Err(t) => {
222                    tail = t;
223                    block = self.tail.block.load(Ordering::Acquire);
224                    backoff.spin();
225                }
226            }
227        }
228    }
229
230    /// Pops an element from the queue.
231    pub fn pop(&self) -> Option<T> {
232        let backoff = Backoff::new();
233        let mut head = self.head.index.load(Ordering::Acquire);
234        let mut block = self.head.block.load(Ordering::Acquire);
235
236        loop {
237            // Calculate the offset of the index into the block.
238            let offset = (head >> SHIFT) % LAP;
239
240            // If we reached the end of the block, wait until the next one is installed.
241            if offset == BLOCK_CAP {
242                backoff.snooze();
243                head = self.head.index.load(Ordering::Acquire);
244                block = self.head.block.load(Ordering::Acquire);
245                continue;
246            }
247
248            let mut new_head = head + (1 << SHIFT);
249
250            if new_head & HAS_NEXT == 0 {
251                atomic::fence(Ordering::SeqCst);
252                let tail = self.tail.index.load(Ordering::Relaxed);
253
254                // If the tail equals the head, that means the queue is empty.
255                if head >> SHIFT == tail >> SHIFT {
256                    return None;
257                }
258
259                // If head and tail are not in the same block, set `HAS_NEXT` in head.
260                if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
261                    new_head |= HAS_NEXT;
262                }
263            }
264
265            // The block can be null here only if the first push operation is in progress. In that
266            // case, just wait until it gets initialized.
267            if block.is_null() {
268                backoff.snooze();
269                head = self.head.index.load(Ordering::Acquire);
270                block = self.head.block.load(Ordering::Acquire);
271                continue;
272            }
273
274            // Try moving the head index forward.
275            match self.head.index.compare_exchange_weak(
276                head,
277                new_head,
278                Ordering::SeqCst,
279                Ordering::Acquire,
280            ) {
281                Ok(_) => unsafe {
282                    // If we've reached the end of the block, move to the next one.
283                    if offset + 1 == BLOCK_CAP {
284                        let next = (*block).wait_next();
285                        let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
286                        if !(*next).next.load(Ordering::Relaxed).is_null() {
287                            next_index |= HAS_NEXT;
288                        }
289
290                        self.head.block.store(next, Ordering::Release);
291                        self.head.index.store(next_index, Ordering::Release);
292                    }
293
294                    // Read the value.
295                    let slot = (*block).slots.get_unchecked(offset);
296                    slot.wait_write();
297                    let value = slot.value.get().read().assume_init();
298
299                    // Destroy the block if we've reached the end, or if another thread wanted to
300                    // destroy but couldn't because we were busy reading from the slot.
301                    if offset + 1 == BLOCK_CAP {
302                        Block::destroy(block, 0);
303                    } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
304                        Block::destroy(block, offset + 1);
305                    }
306
307                    return Some(value);
308                },
309                Err(h) => {
310                    head = h;
311                    block = self.head.block.load(Ordering::Acquire);
312                    backoff.spin();
313                }
314            }
315        }
316    }
317}
318
319impl<T> Drop for Queue<T> {
320    fn drop(&mut self) {
321        let mut head = self.head.index.load(Ordering::Relaxed);
322        let mut tail = self.tail.index.load(Ordering::Relaxed);
323        let mut block = self.head.block.load(Ordering::Relaxed);
324
325        // Erase the lower bits.
326        head &= !((1 << SHIFT) - 1);
327        tail &= !((1 << SHIFT) - 1);
328
329        unsafe {
330            // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
331            while head != tail {
332                let offset = (head >> SHIFT) % LAP;
333
334                if offset < BLOCK_CAP {
335                    // Drop the value in the slot.
336                    let slot = (*block).slots.get_unchecked(offset);
337                    let p = &mut *slot.value.get();
338                    p.as_mut_ptr().drop_in_place();
339                } else {
340                    // Deallocate the block and move to the next one.
341                    let next = (*block).next.load(Ordering::Relaxed);
342                    drop(Box::from_raw(block));
343                    block = next;
344                }
345
346                head = head.wrapping_add(1 << SHIFT);
347            }
348
349            // Deallocate the last remaining block.
350            if !block.is_null() {
351                drop(Box::from_raw(block));
352            }
353        }
354    }
355}
356
357impl<T> fmt::Debug for Queue<T> {
358    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
359        f.pad("Queue { .. }")
360    }
361}
362
363impl<T> Default for Queue<T> {
364    fn default() -> Queue<T> {
365        Queue::new()
366    }
367}