use std::sync::atomic::Ordering;
use circ::{AtomicRc, Guard, Rc, RcObject, Snapshot, Weak};
use crossbeam_utils::CachePadded;
pub struct Output<'g, T> {
found: Snapshot<'g, Node<T>>,
}
impl<'g, T> Output<'g, T> {
pub fn output(&self) -> &T {
self.found
.as_ref()
.map(|node| node.item.as_ref().unwrap())
.unwrap()
}
}
struct Node<T> {
item: Option<T>,
prev: Weak<Node<T>>,
next: CachePadded<AtomicRc<Node<T>>>,
}
unsafe impl<T> RcObject for Node<T> {
fn pop_edges(&mut self, out: &mut Vec<Rc<Self>>) {
out.push(self.next.take())
}
}
impl<T> Node<T> {
fn sentinel() -> Self {
Self {
item: None,
prev: Weak::null(),
next: CachePadded::new(AtomicRc::null()),
}
}
fn new(item: T) -> Self {
Self {
item: Some(item),
prev: Weak::null(),
next: CachePadded::new(AtomicRc::null()),
}
}
}
pub struct DLQueue<T: Sync + Send> {
head: CachePadded<AtomicRc<Node<T>>>,
tail: CachePadded<AtomicRc<Node<T>>>,
}
impl<T: Sync + Send> DLQueue<T> {
#[inline]
pub fn new() -> Self {
let sentinel = Rc::new(Node::sentinel());
Self {
head: CachePadded::new(AtomicRc::from(sentinel.clone())),
tail: CachePadded::new(AtomicRc::from(sentinel)),
}
}
#[inline]
pub fn enqueue(&self, item: T, guard: &Guard) {
let [mut node, sub] = Rc::new_many(Node::new(item));
loop {
let ltail = self.tail.load(Ordering::Acquire, guard);
unsafe { node.deref_mut() }.prev = ltail.downgrade().counted();
if let Some(lprev) = ltail
.as_ref()
.unwrap()
.prev
.snapshot(guard)
.upgrade()
.and_then(Snapshot::as_ref)
{
if lprev.next.load(Ordering::SeqCst, guard).is_null() {
lprev.next.store(ltail.counted(), Ordering::Relaxed, guard);
}
}
match self
.tail
.compare_exchange(ltail, node, Ordering::SeqCst, Ordering::SeqCst, guard)
{
Ok(_) => {
ltail
.as_ref()
.unwrap()
.next
.store(sub, Ordering::Release, guard);
return;
}
Err(e) => node = e.desired,
}
}
}
#[inline]
pub fn dequeue<'g>(&self, guard: &'g Guard) -> Option<Output<'g, T>> {
loop {
let lhead = self.head.load(Ordering::Acquire, guard);
let lnext = lhead.as_ref().unwrap().next.load(Ordering::Acquire, guard);
if lnext.is_null() {
return None;
}
if self
.head
.compare_exchange(
lhead,
lnext.counted(),
Ordering::SeqCst,
Ordering::SeqCst,
guard,
)
.is_ok()
{
return Some(Output { found: lnext });
}
}
}
}
#[cfg(test)]
mod test {
use std::sync::atomic::{AtomicU32, Ordering};
use super::DLQueue;
use circ::cs;
use crossbeam_utils::thread::scope;
#[test]
fn simple() {
let queue = DLQueue::new();
let guard = &cs();
assert!(queue.dequeue(guard).is_none());
queue.enqueue(1, guard);
queue.enqueue(2, guard);
queue.enqueue(3, guard);
assert_eq!(*queue.dequeue(guard).unwrap().output(), 1);
assert_eq!(*queue.dequeue(guard).unwrap().output(), 2);
assert_eq!(*queue.dequeue(guard).unwrap().output(), 3);
assert!(queue.dequeue(guard).is_none());
}
#[test]
fn smoke() {
const THREADS: usize = 100;
const ELEMENTS_PER_THREAD: usize = 10000;
let queue = DLQueue::new();
let mut found = Vec::new();
found.resize_with(THREADS * ELEMENTS_PER_THREAD, || AtomicU32::new(0));
scope(|s| {
for t in 0..THREADS {
let queue = &queue;
s.spawn(move |_| {
for i in 0..ELEMENTS_PER_THREAD {
queue.enqueue((t * ELEMENTS_PER_THREAD + i).to_string(), &cs());
}
});
}
})
.unwrap();
scope(|s| {
for _ in 0..THREADS {
let queue = &queue;
let found = &found;
s.spawn(move |_| {
for _ in 0..ELEMENTS_PER_THREAD {
let guard = cs();
let output = queue.dequeue(&guard).unwrap();
let res = output.output();
assert_eq!(
found[res.parse::<usize>().unwrap()].fetch_add(1, Ordering::Relaxed),
0
);
}
});
}
})
.unwrap();
assert!(
found
.iter()
.filter(|v| v.load(Ordering::Relaxed) == 0)
.count()
== 0
);
}
}