1use std::cell::UnsafeCell;
2use std::mem::MaybeUninit;
3use std::ptr;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6use crate::utils::CacheAligned;
7use kovan::{Atomic, RetiredNode, Shared, pin};
8
9const SEGMENT_SIZE: usize = 32;
10
11const SLOT_EMPTY: usize = 0;
13const SLOT_WRITING: usize = 1;
15const SLOT_WRITTEN: usize = 2;
17const SLOT_CONSUMED: usize = 3;
19
20struct Slot<T> {
21 state: AtomicUsize,
22 value: UnsafeCell<MaybeUninit<T>>,
23}
24
25#[repr(C)]
26struct Segment<T> {
27 retired: RetiredNode,
28 slots: [Slot<T>; SEGMENT_SIZE],
29 next: Atomic<Segment<T>>,
30 id: usize,
31}
32
33impl<T> Segment<T> {
34 fn new(id: usize) -> Segment<T> {
35 let slots = core::array::from_fn(|_| Slot {
37 state: AtomicUsize::new(SLOT_EMPTY),
38 value: UnsafeCell::new(MaybeUninit::uninit()),
39 });
40 Segment {
41 retired: RetiredNode::new(),
42 slots,
43 next: Atomic::null(),
44 id,
45 }
46 }
47}
48
49pub struct SegQueue<T> {
50 head: CacheAligned<Atomic<Segment<T>>>,
51 tail: CacheAligned<Atomic<Segment<T>>>,
52 len: AtomicUsize,
53}
54
55unsafe impl<T: Send> Send for SegQueue<T> {}
56unsafe impl<T: Send> Sync for SegQueue<T> {}
57
58impl<T: 'static> Default for SegQueue<T> {
59 fn default() -> Self {
60 Self::new()
61 }
62}
63
64impl<T: 'static> SegQueue<T> {
65 pub fn new() -> SegQueue<T> {
67 let segment = Box::into_raw(Box::new(Segment::new(0)));
68 let head = Atomic::new(segment);
69 let tail = Atomic::new(segment);
70
71 SegQueue {
72 head: CacheAligned::new(head),
73 tail: CacheAligned::new(tail),
74 len: AtomicUsize::new(0),
75 }
76 }
77
78 pub fn push(&self, value: T) {
80 let backoff = crossbeam_utils::Backoff::new();
81 let guard = pin();
82
83 loop {
84 let tail = self.tail.load(Ordering::Acquire, &guard);
85
86 if tail.is_null() {
87 continue;
88 }
89
90 let t = unsafe { tail.as_ref().unwrap() };
91 let next = t.next.load(Ordering::Acquire, &guard);
92
93 if !next.is_null() {
94 let _ = self.tail.compare_exchange(
95 tail,
96 next,
97 Ordering::SeqCst,
98 Ordering::Relaxed,
99 &guard,
100 );
101 continue;
102 }
103
104 for i in 0..SEGMENT_SIZE {
105 let slot = &t.slots[i];
106 let state = slot.state.load(Ordering::Acquire);
107
108 if state == SLOT_EMPTY {
109 if slot
110 .state
111 .compare_exchange(
112 SLOT_EMPTY,
113 SLOT_WRITING,
114 Ordering::SeqCst,
115 Ordering::Relaxed,
116 )
117 .is_ok()
118 {
119 unsafe {
120 slot.value.get().write(MaybeUninit::new(value));
121 }
122 slot.state.store(SLOT_WRITTEN, Ordering::Release);
123 self.len.fetch_add(1, Ordering::Relaxed);
124 return;
125 }
126 } else if state == SLOT_WRITING {
127 continue;
128 }
129 }
130
131 let new_segment = Box::into_raw(Box::new(Segment::new(t.id + 1)));
133 let new_shared = unsafe { Shared::from_raw(new_segment) };
134
135 let null_shared = unsafe { Shared::from_raw(ptr::null_mut()) };
139
140 if t.next
141 .compare_exchange(
142 null_shared,
143 new_shared,
144 Ordering::SeqCst,
145 Ordering::Relaxed,
146 &guard,
147 )
148 .is_ok()
149 {
150 let _ = self.tail.compare_exchange(
151 tail,
152 new_shared,
153 Ordering::SeqCst,
154 Ordering::Relaxed,
155 &guard,
156 );
157 } else {
158 unsafe { drop(Box::from_raw(new_segment)) };
159 }
160 backoff.snooze();
161 }
162 }
163
164 pub fn pop(&self) -> Option<T> {
166 let backoff = crossbeam_utils::Backoff::new();
167 let guard = pin();
168
169 loop {
170 let head = self.head.load(Ordering::Acquire, &guard);
171 let h = unsafe { head.as_ref().unwrap() };
172
173 let mut all_consumed = true;
174
175 for i in 0..SEGMENT_SIZE {
176 let slot = &h.slots[i];
177 let state = slot.state.load(Ordering::Acquire);
178
179 if state == SLOT_WRITTEN {
180 if slot
181 .state
182 .compare_exchange(
183 SLOT_WRITTEN,
184 SLOT_CONSUMED,
185 Ordering::SeqCst,
186 Ordering::Relaxed,
187 )
188 .is_ok()
189 {
190 let value = unsafe { slot.value.get().read().assume_init() };
191 self.len.fetch_sub(1, Ordering::Relaxed);
192 return Some(value);
193 }
194 } else if state == SLOT_EMPTY {
195 let next = h.next.load(Ordering::Acquire, &guard);
196 if next.is_null() {
197 return None;
198 }
199 }
200
201 if slot.state.load(Ordering::Acquire) != SLOT_CONSUMED {
202 all_consumed = false;
203 }
204 }
205
206 let next = h.next.load(Ordering::Acquire, &guard);
207 if all_consumed
208 && !next.is_null()
209 && self
210 .head
211 .compare_exchange(head, next, Ordering::SeqCst, Ordering::Relaxed, &guard)
212 .is_ok()
213 {
214 unsafe { kovan::retire(head.as_raw()) };
217 continue;
218 }
219
220 let current_head = self.head.load(Ordering::Acquire, &guard);
221 if current_head != head {
222 continue;
223 }
224
225 if h.next.load(Ordering::Acquire, &guard).is_null() {
226 return None;
227 }
228
229 backoff.snooze();
230 }
231 }
232
233 #[inline]
239 pub fn len(&self) -> usize {
240 self.len.load(Ordering::Relaxed)
241 }
242
243 #[inline]
245 pub fn is_empty(&self) -> bool {
246 self.len() == 0
247 }
248}
249
250impl<T> Drop for SegQueue<T> {
251 fn drop(&mut self) {
252 let guard = pin();
264 let mut current = self.head.load(Ordering::Relaxed, &guard);
265
266 while !current.is_null() {
267 unsafe {
268 let segment_ptr = current.as_raw();
269 let segment = &*segment_ptr;
270 let next = segment.next.load(Ordering::Relaxed, &guard);
271
272 for i in 0..SEGMENT_SIZE {
273 if segment.slots[i].state.load(Ordering::Relaxed) == SLOT_WRITTEN {
274 ptr::drop_in_place(segment.slots[i].value.get() as *mut T);
278 }
279 }
280
281 drop(Box::from_raw(segment_ptr));
284
285 current = next;
286 }
287 }
288 }
289}