may_queue/
mpsc_list_v1.rs1use std::cell::UnsafeCell;
2use std::ptr;
3use std::sync::atomic::{AtomicPtr, Ordering};
4
5use crossbeam_utils::{Backoff, CachePadded};
6
7struct Node<T> {
8 prev: *mut Node<T>,
9 next: AtomicPtr<Node<T>>,
10 value: Option<T>,
11 refs: usize,
12}
13const REF_INIT: usize = 0x1000_0002;
15const REF_COUNT_MASK: usize = 0x0FFF_FFFF;
16
17impl<T> Node<T> {
18 unsafe fn new(v: Option<T>) -> *mut Node<T> {
19 Box::into_raw(Box::new(Node {
20 prev: ptr::null_mut(),
21 next: AtomicPtr::new(ptr::null_mut()),
22 value: v,
23 refs: REF_INIT,
24 }))
25 }
26}
27
28pub struct Entry<T>(ptr::NonNull<Node<T>>);
29
30unsafe impl<T: Sync> Sync for Entry<T> {}
31
32impl<T> Entry<T> {
33 #[inline]
38 pub unsafe fn with_mut_data<F>(&self, f: F)
39 where
40 F: FnOnce(&mut T),
41 {
42 let node = &mut *self.0.as_ptr();
43 let data = node.value.as_mut().expect("Node value is None");
44 f(data);
45 }
46
47 #[inline]
49 pub fn is_link(&self) -> bool {
50 let node = unsafe { &mut *self.0.as_ptr() };
51 node.refs & !REF_COUNT_MASK != 0
52 }
53
54 #[inline]
55 pub fn into_ptr(self) -> *mut Self {
56 let ret = self.0.as_ptr() as *mut Self;
57 ::std::mem::forget(self);
58 ret
59 }
60
61 #[inline]
62 pub unsafe fn from_ptr(ptr: *mut Self) -> Self {
66 Entry(ptr::NonNull::new_unchecked(ptr as *mut Node<T>))
67 }
68
69 pub fn remove(mut self) -> Option<T> {
72 unsafe {
73 let node = self.0.as_mut();
74
75 if node.refs & !REF_COUNT_MASK == 0 {
77 return None;
79 }
80
81 if node.prev.is_null() {
83 return None;
84 }
85
86 let next = node.next.load(Ordering::Acquire);
87 let prev = &mut *node.prev;
88
89 if !next.is_null() {
100 node.refs &= REF_COUNT_MASK;
102
103 (*next).prev = prev;
105 prev.next.store(next, Ordering::Release);
106
107 let ret = node.value.take();
108
109 node.refs -= 1;
111 if node.refs == 0 {
112 let _: Box<Node<T>> = Box::from_raw(node);
114 }
115
116 return ret;
117 }
118 }
119
120 None
121 }
122}
123
124impl<T> Drop for Entry<T> {
125 fn drop(&mut self) {
129 let node = unsafe { self.0.as_mut() };
130 node.refs -= 1;
132 if node.refs == 0 {
133 let _: Box<Node<T>> = unsafe { Box::from_raw(node) };
135 }
136 }
137}
138
139unsafe impl<T: Send> Send for Entry<T> {}
140
141pub struct Queue<T> {
145 head: CachePadded<AtomicPtr<Node<T>>>,
146 tail: UnsafeCell<*mut Node<T>>,
147}
148
149unsafe impl<T: Send> Send for Queue<T> {}
150unsafe impl<T: Send> Sync for Queue<T> {}
151
152impl<T> Queue<T> {
153 pub fn new() -> Queue<T> {
156 let stub = unsafe { Node::new(None) };
157 unsafe { &mut *stub }.refs = 1;
159 Queue {
160 head: AtomicPtr::new(stub).into(),
161 tail: UnsafeCell::new(stub),
162 }
163 }
164
165 pub fn push(&self, t: T) -> (Entry<T>, bool) {
169 unsafe {
170 let node = Node::new(Some(t));
171 let prev = self.head.swap(node, Ordering::AcqRel);
172 (*node).prev = prev;
173 (*prev).next.store(node, Ordering::Release);
174 let tail = *self.tail.get();
175 let is_head = tail == prev;
176 (Entry(ptr::NonNull::new_unchecked(node)), is_head)
177 }
178 }
179
180 #[inline]
182 pub fn is_empty(&self) -> bool {
183 let tail = unsafe { *self.tail.get() };
184 self.head.load(Ordering::Acquire) == tail
186 }
187
188 #[inline]
192 pub unsafe fn peek(&self) -> Option<&T> {
193 let tail = *self.tail.get();
194 if self.head.load(Ordering::Acquire) == tail {
196 return None;
197 }
198 let mut next;
200 let backoff = Backoff::new();
201 loop {
202 next = (*tail).next.load(Ordering::Acquire);
203 if !next.is_null() {
204 break;
205 }
206 backoff.snooze();
207 }
208
209 assert!((*tail).value.is_none());
210 assert!((*next).value.is_some());
211
212 (*next).value.as_ref()
213 }
214
215 pub fn pop_if<F>(&self, f: &F) -> Option<T>
216 where
217 F: Fn(&T) -> bool,
218 {
219 unsafe {
220 let tail = *self.tail.get();
221 if self.head.load(Ordering::Acquire) == tail {
223 return None;
224 }
225
226 let mut next;
228 let backoff = Backoff::new();
229 loop {
230 next = (*tail).next.load(Ordering::Acquire);
231 if !next.is_null() {
232 break;
233 }
234 backoff.snooze();
235 }
236
237 assert!((*tail).value.is_none());
238 assert!((*next).value.is_some());
239
240 let v = (*next).value.as_ref().unwrap();
241 if !f(v) {
242 return None;
244 }
245
246 assert!((*tail).refs & REF_COUNT_MASK != 0);
248 (*tail).refs &= REF_COUNT_MASK;
249
250 (*next).prev = ptr::null_mut();
252 *self.tail.get() = next;
254
255 let ret = (*next).value.take().unwrap();
257 (*tail).refs -= 1;
258 if (*tail).refs == 0 {
259 let _: Box<Node<T>> = Box::from_raw(tail);
261 }
262
263 Some(ret)
264 }
265 }
266
267 pub fn pop(&self) -> Option<T> {
269 unsafe {
270 let tail = *self.tail.get();
271
272 if self.head.load(Ordering::Acquire) == tail {
274 return None;
275 }
276
277 assert!((*tail).refs & REF_COUNT_MASK != 0);
279 (*tail).refs &= REF_COUNT_MASK;
280
281 let mut next;
283 let backoff = Backoff::new();
284 loop {
285 next = (*tail).next.load(Ordering::Acquire);
286 if !next.is_null() {
287 break;
288 }
289 backoff.snooze();
290 }
291 (*next).prev = ptr::null_mut();
292 *self.tail.get() = next;
294
295 assert!((*tail).value.is_none());
296 assert!((*next).value.is_some());
297 let ret = (*next).value.take().unwrap();
299 (*tail).refs -= 1;
300 if (*tail).refs == 0 {
301 let _: Box<Node<T>> = Box::from_raw(tail);
303 }
304
305 Some(ret)
306 }
307 }
308}
309
310impl<T> Default for Queue<T> {
311 fn default() -> Self {
312 Queue::new()
313 }
314}
315
316impl<T> Drop for Queue<T> {
317 fn drop(&mut self) {
318 while self.pop().is_some() {}
319 let _: Box<Node<T>> = unsafe { Box::from_raw(*self.tail.get()) };
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327 use std::sync::mpsc::channel;
328 use std::sync::Arc;
329 use std::thread;
330
331 #[test]
332 fn test_queue() {
333 let q: Queue<usize> = Queue::new();
334 assert_eq!(q.pop(), None);
335 q.push(1);
336 q.push(2);
337 assert_eq!(q.pop(), Some(1));
338 assert_eq!(q.pop(), Some(2));
339 assert!(q.is_empty());
340 let a = q.push(3);
341 let b = q.push(4);
342 assert!(a.1);
343 assert_eq!(a.0.remove(), Some(3));
344 assert!(!b.1);
345 assert_eq!(b.0.remove(), None);
346 assert_eq!(q.pop(), Some(4));
347 assert!(q.is_empty());
348
349 q.push(5);
350 q.push(6);
351 q.push(7);
352 let co = |v: &usize| *v < 7;
353 assert_eq!(unsafe { q.peek() }, Some(&5));
354 assert_eq!(q.pop_if(&co), Some(5));
355 assert_eq!(q.pop_if(&co), Some(6));
356 assert_eq!(q.pop_if(&co), None);
357 assert_eq!(q.pop(), Some(7));
358 }
359
360 #[test]
361 fn test() {
362 let nthreads = 8;
363 let nmsgs = 1000;
364 let q = Queue::new();
365 match q.pop() {
366 None => {}
367 Some(..) => panic!(),
368 }
369 let (tx, rx) = channel();
370 let q = Arc::new(q);
371
372 for _ in 0..nthreads {
373 let tx = tx.clone();
374 let q = q.clone();
375 thread::spawn(move || {
376 for i in 0..nmsgs {
377 q.push(i);
378 }
379 tx.send(()).unwrap();
380 });
381 }
382
383 let mut i = 0;
384 while i < nthreads * nmsgs {
385 match q.pop() {
386 None => {}
387 Some(_) => i += 1,
388 }
389 }
390 drop(tx);
391 for _ in 0..nthreads {
392 rx.recv().unwrap();
393 }
394 }
395}