1use std::cell::UnsafeCell;
2use std::mem::MaybeUninit;
3use std::ptr;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6use crate::utils::CacheAligned;
7use kovan::{Atomic, RetiredNode, Shared, pin};
8
9const SEGMENT_SIZE: usize = 32;
10
11const SLOT_EMPTY: usize = 0;
13const SLOT_WRITING: usize = 1;
15const SLOT_WRITTEN: usize = 2;
17const SLOT_CONSUMED: usize = 3;
19
20struct Slot<T> {
21 state: AtomicUsize,
22 value: UnsafeCell<MaybeUninit<T>>,
23}
24
25#[repr(C)]
26struct Segment<T> {
27 retired: RetiredNode,
28 slots: [Slot<T>; SEGMENT_SIZE],
29 next: Atomic<Segment<T>>,
30 id: usize,
31}
32
33impl<T> Segment<T> {
34 fn new(id: usize) -> Segment<T> {
35 let mut slots: [Slot<T>; SEGMENT_SIZE] = unsafe { MaybeUninit::zeroed().assume_init() };
36 for slot in &mut slots {
37 slot.state = AtomicUsize::new(SLOT_EMPTY);
38 }
39 Segment {
40 retired: RetiredNode::new(),
41 slots,
42 next: Atomic::null(),
43 id,
44 }
45 }
46}
47
48pub struct SegQueue<T> {
49 head: CacheAligned<Atomic<Segment<T>>>,
50 tail: CacheAligned<Atomic<Segment<T>>>,
51}
52
53unsafe impl<T: Send> Send for SegQueue<T> {}
54unsafe impl<T: Send> Sync for SegQueue<T> {}
55
56impl<T: 'static> Default for SegQueue<T> {
57 fn default() -> Self {
58 Self::new()
59 }
60}
61
62impl<T: 'static> SegQueue<T> {
63 pub fn new() -> SegQueue<T> {
65 let segment = Box::into_raw(Box::new(Segment::new(0)));
66 let head = Atomic::new(segment);
67 let tail = Atomic::new(segment);
68
69 SegQueue {
70 head: CacheAligned::new(head),
71 tail: CacheAligned::new(tail),
72 }
73 }
74
75 pub fn push(&self, value: T) {
77 let backoff = crossbeam_utils::Backoff::new();
78 let guard = pin();
79
80 loop {
81 let tail = self.tail.load(Ordering::Acquire, &guard);
82
83 if tail.is_null() {
84 continue;
85 }
86
87 let t = unsafe { tail.as_ref().unwrap() };
88 let next = t.next.load(Ordering::Acquire, &guard);
89
90 if !next.is_null() {
91 let _ = self.tail.compare_exchange(
92 tail,
93 next,
94 Ordering::SeqCst,
95 Ordering::Relaxed,
96 &guard,
97 );
98 continue;
99 }
100
101 for i in 0..SEGMENT_SIZE {
102 let slot = &t.slots[i];
103 let state = slot.state.load(Ordering::Acquire);
104
105 if state == SLOT_EMPTY {
106 if slot
107 .state
108 .compare_exchange(
109 SLOT_EMPTY,
110 SLOT_WRITING,
111 Ordering::SeqCst,
112 Ordering::Relaxed,
113 )
114 .is_ok()
115 {
116 unsafe {
117 slot.value.get().write(MaybeUninit::new(value));
118 }
119 slot.state.store(SLOT_WRITTEN, Ordering::Release);
120 return;
121 }
122 } else if state == SLOT_WRITING {
123 continue;
124 }
125 }
126
127 let new_segment = Box::into_raw(Box::new(Segment::new(t.id + 1)));
129 let new_shared = unsafe { Shared::from_raw(new_segment) };
130
131 let null_shared = unsafe { Shared::from_raw(ptr::null_mut()) };
135
136 if t.next
137 .compare_exchange(
138 null_shared,
139 new_shared,
140 Ordering::SeqCst,
141 Ordering::Relaxed,
142 &guard,
143 )
144 .is_ok()
145 {
146 let _ = self.tail.compare_exchange(
147 tail,
148 new_shared,
149 Ordering::SeqCst,
150 Ordering::Relaxed,
151 &guard,
152 );
153 } else {
154 unsafe { drop(Box::from_raw(new_segment)) };
155 }
156 backoff.snooze();
157 }
158 }
159
160 pub fn pop(&self) -> Option<T> {
162 let backoff = crossbeam_utils::Backoff::new();
163 let guard = pin();
164
165 loop {
166 let head = self.head.load(Ordering::Acquire, &guard);
167 let h = unsafe { head.as_ref().unwrap() };
168
169 for i in 0..SEGMENT_SIZE {
170 let slot = &h.slots[i];
171 let state = slot.state.load(Ordering::Acquire);
172
173 if state == SLOT_WRITTEN {
174 if slot
175 .state
176 .compare_exchange(
177 SLOT_WRITTEN,
178 SLOT_CONSUMED,
179 Ordering::SeqCst,
180 Ordering::Relaxed,
181 )
182 .is_ok()
183 {
184 let value = unsafe { slot.value.get().read().assume_init() };
185 return Some(value);
186 }
187 } else if state == SLOT_EMPTY {
188 let next = h.next.load(Ordering::Acquire, &guard);
189 if next.is_null() {
190 return None;
191 }
192 }
193 }
194
195 let next = h.next.load(Ordering::Acquire, &guard);
196 if !next.is_null()
197 && self
198 .head
199 .compare_exchange(head, next, Ordering::SeqCst, Ordering::Relaxed, &guard)
200 .is_ok()
201 {
202 kovan::retire(head.as_raw());
205 continue;
206 }
207
208 let current_head = self.head.load(Ordering::Acquire, &guard);
209 if current_head != head {
210 continue;
211 }
212
213 if h.next.load(Ordering::Acquire, &guard).is_null() {
214 return None;
215 }
216
217 backoff.snooze();
218 }
219 }
220}
221
222impl<T> Drop for SegQueue<T> {
223 fn drop(&mut self) {
224 let guard = pin();
225 let mut current = self.head.load(Ordering::Relaxed, &guard);
226
227 while !current.is_null() {
228 unsafe {
229 let segment_ptr = current.as_raw();
230 let segment = &*segment_ptr;
231 let next = segment.next.load(Ordering::Relaxed, &guard);
232
233 for i in 0..SEGMENT_SIZE {
234 if segment.slots[i].state.load(Ordering::Relaxed) == SLOT_WRITTEN {
235 ptr::drop_in_place(segment.slots[i].value.get() as *mut T);
236 }
237 }
238
239 drop(Box::from_raw(segment_ptr));
242
243 current = next;
244 }
245 }
246 }
247}