kovan_queue/
array_queue.rs1use std::cell::UnsafeCell;
2use std::mem::MaybeUninit;
3use std::sync::atomic::{AtomicUsize, Ordering};
4
5use crate::utils::CacheAligned;
6
7struct Slot<T> {
9 stamp: AtomicUsize,
11
12 value: UnsafeCell<MaybeUninit<T>>,
14}
15
16pub struct ArrayQueue<T> {
18 head: CacheAligned<AtomicUsize>,
20
21 tail: CacheAligned<AtomicUsize>,
23
24 buffer: Box<[Slot<T>]>,
26
27 mask: usize,
29}
30
31unsafe impl<T: Send> Send for ArrayQueue<T> {}
32unsafe impl<T: Send> Sync for ArrayQueue<T> {}
33
34impl<T> ArrayQueue<T> {
35 pub fn new(cap: usize) -> ArrayQueue<T> {
39 let capacity = if cap < 1 { 1 } else { cap.next_power_of_two() };
40 let mut buffer = Vec::with_capacity(capacity);
41
42 for i in 0..capacity {
43 buffer.push(Slot {
44 stamp: AtomicUsize::new(i),
45 value: UnsafeCell::new(MaybeUninit::uninit()),
46 });
47 }
48
49 ArrayQueue {
50 buffer: buffer.into_boxed_slice(),
51 mask: capacity - 1,
52 head: CacheAligned::new(AtomicUsize::new(0)),
53 tail: CacheAligned::new(AtomicUsize::new(0)),
54 }
55 }
56
57 pub fn push(&self, value: T) -> Result<(), T> {
59 let backoff = crossbeam_utils::Backoff::new();
60 let mut tail = self.tail.load(Ordering::Relaxed);
61
62 loop {
63 let index = tail & self.mask;
64 let slot = &self.buffer[index];
65 let stamp = slot.stamp.load(Ordering::Acquire);
66
67 if tail == stamp {
68 let next = tail + 1;
69 if self
70 .tail
71 .compare_exchange(tail, next, Ordering::SeqCst, Ordering::Relaxed)
72 .is_ok()
73 {
74 unsafe {
75 slot.value.get().write(MaybeUninit::new(value));
76 }
77 slot.stamp.store(tail + 1, Ordering::Release);
78 return Ok(());
79 }
80 } else if tail + 1 > stamp {
81 let head = self.head.load(Ordering::Relaxed);
82 if tail >= head + self.buffer.len() {
83 return Err(value);
84 }
85 backoff.snooze();
86 } else {
87 backoff.snooze();
88 }
89 tail = self.tail.load(Ordering::Relaxed);
90 }
91 }
92
93 pub fn pop(&self) -> Option<T> {
95 let backoff = crossbeam_utils::Backoff::new();
96 let mut head = self.head.load(Ordering::Relaxed);
97
98 loop {
99 let index = head & self.mask;
100 let slot = &self.buffer[index];
101 let stamp = slot.stamp.load(Ordering::Acquire);
102
103 if head + 1 == stamp {
104 let next = head + 1;
105 if self
106 .head
107 .compare_exchange(head, next, Ordering::SeqCst, Ordering::Relaxed)
108 .is_ok()
109 {
110 let value = unsafe { slot.value.get().read().assume_init() };
111 slot.stamp
112 .store(head + self.buffer.len(), Ordering::Release);
113 return Some(value);
114 }
115 } else if head == stamp {
116 let tail = self.tail.load(Ordering::Relaxed);
117 if tail == head {
118 return None;
119 }
120 backoff.snooze();
121 } else {
122 backoff.snooze();
123 }
124 head = self.head.load(Ordering::Relaxed);
125 }
126 }
127
128 pub fn capacity(&self) -> usize {
130 self.buffer.len()
131 }
132
133 pub fn is_empty(&self) -> bool {
135 let head = self.head.load(Ordering::SeqCst);
136 let tail = self.tail.load(Ordering::SeqCst);
137 head == tail
138 }
139
140 pub fn is_full(&self) -> bool {
142 let head = self.head.load(Ordering::SeqCst);
143 let tail = self.tail.load(Ordering::SeqCst);
144 tail == head + self.buffer.len()
145 }
146}
147
148impl<T> Drop for ArrayQueue<T> {
154 fn drop(&mut self) {
155 while self.pop().is_some() {}
160 }
161}