use std::cell::UnsafeCell;
use std::ptr;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::thread;
use crossbeam::utils::CachePadded;
struct Node<T> {
prev: *mut Node<T>,
next: AtomicPtr<Node<T>>,
value: Option<T>,
refs: usize,
}
const REF_INIT: usize = 0x1000_0002;
const REF_COUNT_MASK: usize = 0x0FFF_FFFF;
impl<T> Node<T> {
unsafe fn new(v: Option<T>) -> *mut Node<T> {
Box::into_raw(Box::new(Node {
prev: ptr::null_mut(),
next: AtomicPtr::new(ptr::null_mut()),
value: v,
refs: REF_INIT,
}))
}
}
pub struct Entry<T>(ptr::NonNull<Node<T>>);
unsafe impl<T: Sync> Sync for Entry<T> {}
impl<T> Entry<T> {
#[inline]
pub unsafe fn with_mut_data<F>(&self, f: F)
where
F: FnOnce(&mut T),
{
let node = &mut *self.0.as_ptr();
let data = node.value.as_mut().expect("Node value is None");
f(data);
}
#[inline]
pub fn is_link(&self) -> bool {
let node = unsafe { &mut *self.0.as_ptr() };
node.refs & !REF_COUNT_MASK != 0
}
#[inline]
pub fn into_ptr(self) -> *mut Self {
let ret = self.0.as_ptr() as *mut Self;
::std::mem::forget(self);
ret
}
#[inline]
pub unsafe fn from_ptr(ptr: *mut Self) -> Self {
Entry(ptr::NonNull::new_unchecked(ptr as *mut Node<T>))
}
pub fn remove(mut self) -> Option<T> {
unsafe {
let node = self.0.as_mut();
if node.refs & !REF_COUNT_MASK == 0 {
return None;
}
if node.prev.is_null() {
return None;
}
let next = node.next.load(Ordering::Acquire);
let prev = &mut *node.prev;
if !next.is_null() {
node.refs &= REF_COUNT_MASK;
(*next).prev = prev;
prev.next.store(next, Ordering::Release);
let ret = node.value.take();
node.refs -= 1;
if node.refs == 0 {
let _: Box<Node<T>> = Box::from_raw(node);
}
return ret;
}
}
None
}
}
impl<T> Drop for Entry<T> {
fn drop(&mut self) {
let node = unsafe { self.0.as_mut() };
node.refs -= 1;
if node.refs == 0 {
let _: Box<Node<T>> = unsafe { Box::from_raw(node) };
}
}
}
unsafe impl<T: Send> Send for Entry<T> {}
pub struct Queue<T> {
head: CachePadded<AtomicPtr<Node<T>>>,
tail: UnsafeCell<*mut Node<T>>,
}
unsafe impl<T: Send> Send for Queue<T> {}
unsafe impl<T: Send> Sync for Queue<T> {}
impl<T> Queue<T> {
pub fn new() -> Queue<T> {
let stub = unsafe { Node::new(None) };
unsafe { &mut *stub }.refs = 1;
Queue {
head: AtomicPtr::new(stub).into(),
tail: UnsafeCell::new(stub),
}
}
pub fn push(&self, t: T) -> (Entry<T>, bool) {
unsafe {
let node = Node::new(Some(t));
let prev = self.head.swap(node, Ordering::AcqRel);
(*node).prev = prev;
(*prev).next.store(node, Ordering::Release);
let tail = *self.tail.get();
let is_head = tail == prev;
(Entry(ptr::NonNull::new_unchecked(node)), is_head)
}
}
#[inline]
pub fn is_empty(&self) -> bool {
let tail = unsafe { *self.tail.get() };
self.head.load(Ordering::Acquire) == tail
}
#[inline]
pub fn peek(&self) -> Option<&T> {
unsafe {
let tail = *self.tail.get();
if self.head.load(Ordering::Acquire) == tail {
return None;
}
let mut next;
let mut i = 0;
loop {
next = (*tail).next.load(Ordering::Acquire);
if !next.is_null() {
break;
}
i += 1;
if i > 500 {
{
thread::yield_now();
i = 0;
}
} else {
std::hint::spin_loop()
}
}
assert!((*tail).value.is_none());
assert!((*next).value.is_some());
(*next).value.as_ref()
}
}
pub fn pop_if<F>(&self, f: &F) -> Option<T>
where
F: Fn(&T) -> bool,
{
unsafe {
let tail = *self.tail.get();
if self.head.load(Ordering::Acquire) == tail {
return None;
}
let mut next;
let mut i = 0;
loop {
next = (*tail).next.load(Ordering::Acquire);
if !next.is_null() && (*next).value.is_some() {
break;
}
i += 1;
if i > 100 {
thread::yield_now();
i = 0;
}
}
assert!((*tail).value.is_none());
assert!((*next).value.is_some());
let v = (*next).value.as_ref().unwrap();
if !f(v) {
return None;
}
assert!((*tail).refs & REF_COUNT_MASK != 0);
(*tail).refs &= REF_COUNT_MASK;
(*next).prev = ptr::null_mut();
*self.tail.get() = next;
let ret = (*next).value.take().unwrap();
(*tail).refs -= 1;
if (*tail).refs == 0 {
let _: Box<Node<T>> = Box::from_raw(tail);
}
Some(ret)
}
}
pub fn pop(&self) -> Option<T> {
unsafe {
let tail = *self.tail.get();
if self.head.load(Ordering::Acquire) == tail {
return None;
}
assert!((*tail).refs & REF_COUNT_MASK != 0);
(*tail).refs &= REF_COUNT_MASK;
let mut next;
let mut i = 0;
loop {
next = (*tail).next.load(Ordering::Acquire);
if !next.is_null() {
break;
}
i += 1;
if i > 100 {
{
thread::yield_now();
i = 0;
}
} else {
std::hint::spin_loop()
}
}
(*next).prev = ptr::null_mut();
*self.tail.get() = next;
assert!((*tail).value.is_none());
assert!((*next).value.is_some());
let ret = (*next).value.take().unwrap();
(*tail).refs -= 1;
if (*tail).refs == 0 {
let _: Box<Node<T>> = Box::from_raw(tail);
}
Some(ret)
}
}
}
impl<T> Default for Queue<T> {
fn default() -> Self {
Queue::new()
}
}
impl<T> Drop for Queue<T> {
fn drop(&mut self) {
while self.pop().is_some() {}
let _: Box<Node<T>> = unsafe { Box::from_raw(*self.tail.get()) };
}
}
#[cfg(test)]
mod tests {
#![feature(test)]
use super::*;
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::thread;
#[test]
fn test_queue() {
let q: Queue<usize> = Queue::new();
assert_eq!(q.pop(), None);
q.push(1);
q.push(2);
assert_eq!(q.pop(), Some(1));
assert_eq!(q.pop(), Some(2));
assert_eq!(q.is_empty(), true);
let a = q.push(3);
let b = q.push(4);
assert_eq!(a.1, true);
assert_eq!(a.0.remove(), Some(3));
assert_eq!(b.1, false);
assert_eq!(b.0.remove(), None);
assert_eq!(q.pop(), Some(4));
assert_eq!(q.is_empty(), true);
q.push(5);
q.push(6);
q.push(7);
let co = |v: &usize| *v < 7;
assert_eq!(q.peek(), Some(&5));
assert_eq!(q.pop_if(&co), Some(5));
assert_eq!(q.pop_if(&co), Some(6));
assert_eq!(q.pop_if(&co), None);
assert_eq!(q.pop(), Some(7));
}
#[test]
fn test() {
let nthreads = 8;
let nmsgs = 1000;
let q = Queue::new();
match q.pop() {
None => {}
Some(..) => panic!(),
}
let (tx, rx) = channel();
let q = Arc::new(q);
for _ in 0..nthreads {
let tx = tx.clone();
let q = q.clone();
thread::spawn(move || {
for i in 0..nmsgs {
q.push(i);
}
tx.send(()).unwrap();
});
}
let mut i = 0;
while i < nthreads * nmsgs {
match q.pop() {
None => {}
Some(_) => i += 1,
}
}
drop(tx);
for _ in 0..nthreads {
rx.recv().unwrap();
}
}
}