#![allow(missing_docs, dead_code)]
use std::cell::UnsafeCell;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use std::sync::Arc;
use crossbeam_utils::CachePadded;
struct Node<T> {
sequence: AtomicUsize,
value: Option<T>,
}
unsafe impl<T: Send> Send for Node<T> {}
unsafe impl<T: Sync> Sync for Node<T> {}
struct State<T> {
enqueue_pos: CachePadded<AtomicUsize>,
buffer: Vec<UnsafeCell<Node<T>>>,
dequeue_pos: CachePadded<AtomicUsize>,
mask: usize,
}
unsafe impl<T: Send> Send for State<T> {}
unsafe impl<T: Sync> Sync for State<T> {}
pub struct Queue<T> {
state: Arc<State<T>>,
}
impl<T: Send> State<T> {
fn with_capacity(capacity: usize) -> State<T> {
let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 {
if capacity < 2 {
2
} else {
capacity.next_power_of_two()
}
} else {
capacity
};
let buffer = (0..capacity)
.map(|i| {
UnsafeCell::new(Node {
sequence: AtomicUsize::new(i),
value: None,
})
})
.collect::<Vec<_>>();
State {
buffer,
mask: capacity - 1,
enqueue_pos: AtomicUsize::new(0).into(),
dequeue_pos: AtomicUsize::new(0).into(),
}
}
fn push(&self, value: T) -> Result<(), T> {
let mask = self.mask;
let mut pos = self.enqueue_pos.load(Relaxed);
loop {
let node = unsafe { &mut *((self.buffer[pos & mask]).get()) };
let seq = node.sequence.load(Acquire);
match seq.cmp(&pos) {
std::cmp::Ordering::Equal => {
match self
.enqueue_pos
.compare_exchange(pos, pos + 1, Relaxed, Relaxed)
{
Ok(_) => {
node.value = Some(value);
node.sequence.store(pos + 1, Release);
break;
}
Err(enqueue_pos) => pos = enqueue_pos,
}
}
std::cmp::Ordering::Less => return Err(value),
std::cmp::Ordering::Greater => pos = self.enqueue_pos.load(Relaxed),
}
}
Ok(())
}
fn pop(&self) -> Option<T> {
let mask = self.mask;
let mut pos = self.dequeue_pos.load(Relaxed);
loop {
let node = unsafe { &mut *((self.buffer[pos & mask]).get()) };
let seq = node.sequence.load(Acquire);
match seq.cmp(&(pos + 1)) {
std::cmp::Ordering::Equal => {
match self
.dequeue_pos
.compare_exchange(pos, pos + 1, Relaxed, Relaxed)
{
Ok(_) => {
let value = node.value.take();
node.sequence.store(pos + mask + 1, Release);
return value;
}
Err(dequeue_pos) => pos = dequeue_pos,
}
}
std::cmp::Ordering::Less => return None,
std::cmp::Ordering::Greater => pos = self.dequeue_pos.load(Relaxed),
}
}
}
}
impl<T: Send> Queue<T> {
pub fn with_capacity(capacity: usize) -> Queue<T> {
Queue {
state: Arc::new(State::with_capacity(capacity)),
}
}
pub fn push(&self, value: T) -> Result<(), T> {
self.state.push(value)
}
pub fn pop(&self) -> Option<T> {
self.state.pop()
}
}
impl<T: Send> Clone for Queue<T> {
fn clone(&self) -> Queue<T> {
Queue {
state: self.state.clone(),
}
}
}
#[cfg(all(nightly, test))]
mod bench {
extern crate test;
use self::test::Bencher;
use super::Queue;
use std::thread;
#[bench]
fn bounded_mpmc(b: &mut Bencher) {
let total_work = 1_000_000;
let nthreads = 2;
let nmsgs = total_work / nthreads;
b.iter(|| {
let q = Queue::with_capacity(nthreads * nmsgs);
for i in 0..nthreads {
let q = q.clone();
let start = i * nmsgs;
thread::spawn(move || {
let q = q;
for i in start..start + nmsgs {
assert!(q.push(i).is_ok());
}
});
}
let mut vec = Vec::with_capacity(nthreads);
for _ in 0..nthreads {
let q = q.clone();
let r = thread::spawn(move || {
let mut i = 0;
let mut sum = 0;
loop {
match q.pop() {
None => std::hint::spin_loop(),
Some(v) => {
sum += v;
i += 1;
if i == nmsgs {
return sum;
}
}
}
}
});
vec.push(r);
}
let total: usize = vec.into_iter().map(|r| r.join().unwrap()).sum();
assert_eq!(total, (0..total_work).sum())
});
}
}