Skip to main content

kovan_queue/
array_queue.rs

1use std::cell::UnsafeCell;
2use std::mem::MaybeUninit;
3use std::sync::atomic::{AtomicUsize, Ordering};
4
5use crate::utils::CacheAligned;
6
7/// A slot in a queue.
8struct Slot<T> {
9    /// The current stamp.
10    stamp: AtomicUsize,
11
12    /// The value in this slot.
13    value: UnsafeCell<MaybeUninit<T>>,
14}
15
16/// A bounded multi-producer multi-consumer queue.
17pub struct ArrayQueue<T> {
18    /// The head of the queue.
19    head: CacheAligned<AtomicUsize>,
20
21    /// The tail of the queue.
22    tail: CacheAligned<AtomicUsize>,
23
24    /// The buffer holding slots.
25    buffer: Box<[Slot<T>]>,
26
27    /// A mask for indices.
28    mask: usize,
29}
30
31unsafe impl<T: Send> Send for ArrayQueue<T> {}
32unsafe impl<T: Send> Sync for ArrayQueue<T> {}
33
34impl<T> ArrayQueue<T> {
35    /// Creates a new bounded queue with the given capacity.
36    ///
37    /// The capacity will be rounded up to the next power of two.
38    pub fn new(cap: usize) -> ArrayQueue<T> {
39        let capacity = if cap < 1 { 1 } else { cap.next_power_of_two() };
40        let mut buffer = Vec::with_capacity(capacity);
41
42        for i in 0..capacity {
43            buffer.push(Slot {
44                stamp: AtomicUsize::new(i),
45                value: UnsafeCell::new(MaybeUninit::uninit()),
46            });
47        }
48
49        ArrayQueue {
50            buffer: buffer.into_boxed_slice(),
51            mask: capacity - 1,
52            head: CacheAligned::new(AtomicUsize::new(0)),
53            tail: CacheAligned::new(AtomicUsize::new(0)),
54        }
55    }
56
57    /// Pushes an element into the queue.
58    pub fn push(&self, value: T) -> Result<(), T> {
59        let backoff = crossbeam_utils::Backoff::new();
60        let mut tail = self.tail.load(Ordering::Relaxed);
61
62        loop {
63            let index = tail & self.mask;
64            let slot = &self.buffer[index];
65            let stamp = slot.stamp.load(Ordering::Acquire);
66
67            if tail == stamp {
68                let next = tail + 1;
69                if self
70                    .tail
71                    .compare_exchange(tail, next, Ordering::SeqCst, Ordering::Relaxed)
72                    .is_ok()
73                {
74                    unsafe {
75                        slot.value.get().write(MaybeUninit::new(value));
76                    }
77                    slot.stamp.store(tail + 1, Ordering::Release);
78                    return Ok(());
79                }
80            } else if tail + 1 > stamp {
81                let head = self.head.load(Ordering::Relaxed);
82                if tail >= head + self.buffer.len() {
83                    return Err(value);
84                }
85                backoff.snooze();
86            } else {
87                backoff.snooze();
88            }
89            tail = self.tail.load(Ordering::Relaxed);
90        }
91    }
92
93    /// Pops an element from the queue.
94    pub fn pop(&self) -> Option<T> {
95        let backoff = crossbeam_utils::Backoff::new();
96        let mut head = self.head.load(Ordering::Relaxed);
97
98        loop {
99            let index = head & self.mask;
100            let slot = &self.buffer[index];
101            let stamp = slot.stamp.load(Ordering::Acquire);
102
103            if head + 1 == stamp {
104                let next = head + 1;
105                if self
106                    .head
107                    .compare_exchange(head, next, Ordering::SeqCst, Ordering::Relaxed)
108                    .is_ok()
109                {
110                    let value = unsafe { slot.value.get().read().assume_init() };
111                    slot.stamp
112                        .store(head + self.buffer.len(), Ordering::Release);
113                    return Some(value);
114                }
115            } else if head == stamp {
116                let tail = self.tail.load(Ordering::Relaxed);
117                if tail == head {
118                    return None;
119                }
120                backoff.snooze();
121            } else {
122                backoff.snooze();
123            }
124            head = self.head.load(Ordering::Relaxed);
125        }
126    }
127
128    /// Returns the capacity of the queue.
129    pub fn capacity(&self) -> usize {
130        self.buffer.len()
131    }
132
133    /// Returns `true` if the queue is empty.
134    pub fn is_empty(&self) -> bool {
135        let head = self.head.load(Ordering::SeqCst);
136        let tail = self.tail.load(Ordering::SeqCst);
137        head == tail
138    }
139
140    /// Returns `true` if the queue is full.
141    pub fn is_full(&self) -> bool {
142        let head = self.head.load(Ordering::SeqCst);
143        let tail = self.tail.load(Ordering::SeqCst);
144        tail == head + self.buffer.len()
145    }
146}
147
148/// Drop all remaining values in the queue when it is dropped.
149///
150/// Without this, `T` values sitting in initialized slots would have their
151/// memory freed (the `Box<[Slot<T>]>` backing is released) without ever
152/// calling `T::drop()`, leaking resources such as file handles or locks.
153impl<T> Drop for ArrayQueue<T> {
154    fn drop(&mut self) {
155        // Drain all remaining elements.  Each `pop()` call moves the value out
156        // of its `MaybeUninit` slot and returns it as `Some(T)`; when that
157        // `Some(T)` goes out of scope at the end of the loop body, `T::drop()`
158        // runs automatically.
159        while self.pop().is_some() {}
160    }
161}