1use std::cell::UnsafeCell;
2use std::mem::MaybeUninit;
3use std::ptr;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6use crate::utils::CacheAligned;
7use kovan::{Atomic, RetiredNode, Shared, pin};
8
9const SEGMENT_SIZE: usize = 32;
10
11const SLOT_EMPTY: usize = 0;
13const SLOT_WRITING: usize = 1;
15const SLOT_WRITTEN: usize = 2;
17const SLOT_CONSUMED: usize = 3;
19
20struct Slot<T> {
21 state: AtomicUsize,
22 value: UnsafeCell<MaybeUninit<T>>,
23}
24
25#[repr(C)]
26struct Segment<T> {
27 retired: RetiredNode,
28 slots: [Slot<T>; SEGMENT_SIZE],
29 next: Atomic<Segment<T>>,
30 id: usize,
31}
32
33impl<T> Segment<T> {
34 fn new(id: usize) -> Segment<T> {
35 let slots = core::array::from_fn(|_| Slot {
37 state: AtomicUsize::new(SLOT_EMPTY),
38 value: UnsafeCell::new(MaybeUninit::uninit()),
39 });
40 Segment {
41 retired: RetiredNode::new(),
42 slots,
43 next: Atomic::null(),
44 id,
45 }
46 }
47}
48
49pub struct SegQueue<T> {
50 head: CacheAligned<Atomic<Segment<T>>>,
51 tail: CacheAligned<Atomic<Segment<T>>>,
52}
53
54unsafe impl<T: Send> Send for SegQueue<T> {}
55unsafe impl<T: Send> Sync for SegQueue<T> {}
56
57impl<T: 'static> Default for SegQueue<T> {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63impl<T: 'static> SegQueue<T> {
64 pub fn new() -> SegQueue<T> {
66 let segment = Box::into_raw(Box::new(Segment::new(0)));
67 let head = Atomic::new(segment);
68 let tail = Atomic::new(segment);
69
70 SegQueue {
71 head: CacheAligned::new(head),
72 tail: CacheAligned::new(tail),
73 }
74 }
75
76 pub fn push(&self, value: T) {
78 let backoff = crossbeam_utils::Backoff::new();
79 let guard = pin();
80
81 loop {
82 let tail = self.tail.load(Ordering::Acquire, &guard);
83
84 if tail.is_null() {
85 continue;
86 }
87
88 let t = unsafe { tail.as_ref().unwrap() };
89 let next = t.next.load(Ordering::Acquire, &guard);
90
91 if !next.is_null() {
92 let _ = self.tail.compare_exchange(
93 tail,
94 next,
95 Ordering::SeqCst,
96 Ordering::Relaxed,
97 &guard,
98 );
99 continue;
100 }
101
102 for i in 0..SEGMENT_SIZE {
103 let slot = &t.slots[i];
104 let state = slot.state.load(Ordering::Acquire);
105
106 if state == SLOT_EMPTY {
107 if slot
108 .state
109 .compare_exchange(
110 SLOT_EMPTY,
111 SLOT_WRITING,
112 Ordering::SeqCst,
113 Ordering::Relaxed,
114 )
115 .is_ok()
116 {
117 unsafe {
118 slot.value.get().write(MaybeUninit::new(value));
119 }
120 slot.state.store(SLOT_WRITTEN, Ordering::Release);
121 return;
122 }
123 } else if state == SLOT_WRITING {
124 continue;
125 }
126 }
127
128 let new_segment = Box::into_raw(Box::new(Segment::new(t.id + 1)));
130 let new_shared = unsafe { Shared::from_raw(new_segment) };
131
132 let null_shared = unsafe { Shared::from_raw(ptr::null_mut()) };
136
137 if t.next
138 .compare_exchange(
139 null_shared,
140 new_shared,
141 Ordering::SeqCst,
142 Ordering::Relaxed,
143 &guard,
144 )
145 .is_ok()
146 {
147 let _ = self.tail.compare_exchange(
148 tail,
149 new_shared,
150 Ordering::SeqCst,
151 Ordering::Relaxed,
152 &guard,
153 );
154 } else {
155 unsafe { drop(Box::from_raw(new_segment)) };
156 }
157 backoff.snooze();
158 }
159 }
160
161 pub fn pop(&self) -> Option<T> {
163 let backoff = crossbeam_utils::Backoff::new();
164 let guard = pin();
165
166 loop {
167 let head = self.head.load(Ordering::Acquire, &guard);
168 let h = unsafe { head.as_ref().unwrap() };
169
170 let mut all_consumed = true;
171
172 for i in 0..SEGMENT_SIZE {
173 let slot = &h.slots[i];
174 let state = slot.state.load(Ordering::Acquire);
175
176 if state == SLOT_WRITTEN {
177 if slot
178 .state
179 .compare_exchange(
180 SLOT_WRITTEN,
181 SLOT_CONSUMED,
182 Ordering::SeqCst,
183 Ordering::Relaxed,
184 )
185 .is_ok()
186 {
187 let value = unsafe { slot.value.get().read().assume_init() };
188 return Some(value);
189 }
190 } else if state == SLOT_EMPTY {
191 let next = h.next.load(Ordering::Acquire, &guard);
192 if next.is_null() {
193 return None;
194 }
195 }
196
197 if slot.state.load(Ordering::Acquire) != SLOT_CONSUMED {
198 all_consumed = false;
199 }
200 }
201
202 let next = h.next.load(Ordering::Acquire, &guard);
203 if all_consumed
204 && !next.is_null()
205 && self
206 .head
207 .compare_exchange(head, next, Ordering::SeqCst, Ordering::Relaxed, &guard)
208 .is_ok()
209 {
210 unsafe { kovan::retire(head.as_raw()) };
213 continue;
214 }
215
216 let current_head = self.head.load(Ordering::Acquire, &guard);
217 if current_head != head {
218 continue;
219 }
220
221 if h.next.load(Ordering::Acquire, &guard).is_null() {
222 return None;
223 }
224
225 backoff.snooze();
226 }
227 }
228}
229
230impl<T> Drop for SegQueue<T> {
231 fn drop(&mut self) {
232 let guard = pin();
244 let mut current = self.head.load(Ordering::Relaxed, &guard);
245
246 while !current.is_null() {
247 unsafe {
248 let segment_ptr = current.as_raw();
249 let segment = &*segment_ptr;
250 let next = segment.next.load(Ordering::Relaxed, &guard);
251
252 for i in 0..SEGMENT_SIZE {
253 if segment.slots[i].state.load(Ordering::Relaxed) == SLOT_WRITTEN {
254 ptr::drop_in_place(segment.slots[i].value.get() as *mut T);
258 }
259 }
260
261 drop(Box::from_raw(segment_ptr));
264
265 current = next;
266 }
267 }
268 }
269}