use std::mem;
use std::ptr;
use std::cell::{UnsafeCell};
use std::sync::{Arc};
use std::sync::atomic::{AtomicUsize};
use std::sync::atomic::Ordering::*;
use {ConsumeError, ProduceError, POINTERS};
use buffer::{Buffer};
#[derive(Debug)]
pub struct Consumer<T>(Arc<Queue<T>>);
impl<T> Consumer<T> {
pub fn consume(&self) -> Result<T, ConsumeError> {
self.0.consume()
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn capacity(&self) -> usize {
self.0.capacity()
}
}
impl<T> Clone for Consumer<T> {
fn clone(&self) -> Self {
self.0.consumer.fetch_add(1, Release);
Consumer(self.0.clone())
}
}
impl<T> Drop for Consumer<T> {
fn drop(&mut self) {
self.0.consumer.fetch_sub(1, Release);
}
}
unsafe impl<T> Send for Consumer<T> where T: Send { }
#[derive(Debug)]
pub struct Producer<T>(Arc<Queue<T>>);
impl<T> Producer<T> {
pub fn produce(&self, item: T) -> Result<(), ProduceError<T>> {
self.0.produce(item)
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn capacity(&self) -> usize {
self.0.capacity()
}
}
impl<T> Clone for Producer<T> {
fn clone(&self) -> Self {
self.0.producer.fetch_add(1, Release);
Producer(self.0.clone())
}
}
impl<T> Drop for Producer<T> {
fn drop(&mut self) {
self.0.producer.fetch_sub(1, Release);
}
}
unsafe impl<T> Send for Producer<T> where T: Send { }
#[derive(Debug)]
struct Slot<T> {
item: UnsafeCell<T>,
sequence: AtomicUsize,
}
impl<T> Slot<T> {
fn new(index: usize) -> Self {
let item = unsafe { mem::uninitialized() };
Slot { item: UnsafeCell::new(item), sequence: AtomicUsize::new(index) }
}
unsafe fn get(&self) -> T {
let mut item = mem::uninitialized();
ptr::swap(self.item.get(), &mut item);
item
}
unsafe fn set(&self, item: T) {
ptr::write(self.item.get(), item);
}
}
#[derive(Debug)]
#[repr(C)]
struct Queue<T> {
write: AtomicUsize,
consumer: AtomicUsize,
_wpadding: [usize; POINTERS - 2],
read: AtomicUsize,
producer: AtomicUsize,
_rpadding: [usize; POINTERS - 2],
buffer: Buffer<Slot<T>>,
}
impl<T> Queue<T> {
fn new(size: usize) -> Arc<Self> {
let buffer = Buffer::new(size);
for index in 0..size {
unsafe { buffer.set(index, Slot::new(index)); }
}
Arc::new(Queue {
write: AtomicUsize::new(0),
consumer: AtomicUsize::new(1),
_wpadding: [0; POINTERS - 2],
read: AtomicUsize::new(0),
producer: AtomicUsize::new(1),
_rpadding: [0; POINTERS - 2],
buffer: buffer,
})
}
fn len(&self) -> usize {
self.write.load(Acquire).wrapping_sub(self.read.load(Acquire))
}
fn capacity(&self) -> usize {
self.buffer.size()
}
fn produce(&self, item: T) -> Result<(), ProduceError<T>> {
if self.consumer.load(Acquire) == 0 {
return Err(ProduceError::Disconnected(item));
}
loop {
let write = self.write.load(Relaxed);
let slot = unsafe { self.buffer.wrapping_get_ref(write) };
let sequence = slot.sequence.load(Acquire);
let difference = (sequence as isize).wrapping_sub(write as isize);
if difference < 0 {
return Err(ProduceError::Full(item));
}
let next = write.wrapping_add(1);
if difference == 0 && exchange(&self.write, write, next) {
unsafe { slot.set(item); }
slot.sequence.store(next, Release);
return Ok(());
}
}
}
fn consume(&self) -> Result<T, ConsumeError> {
loop {
let read = self.read.load(Relaxed);
let slot = unsafe { self.buffer.wrapping_get_ref(read) };
let sequence = slot.sequence.load(Acquire);
let difference = (sequence as isize).wrapping_sub(read.wrapping_add(1) as isize);
if difference < 0 {
if self.producer.load(Acquire) == 0 {
return Err(ConsumeError::Disconnected);
} else {
return Err(ConsumeError::Empty);
}
}
let next = read.wrapping_add(1);
if difference == 0 && exchange(&self.read, read, next) {
let item = unsafe { slot.get() };
slot.sequence.store(next.wrapping_add(self.buffer.size() - 1), Release);
return Ok(item);
}
}
}
}
impl<T> Drop for Queue<T> {
fn drop(&mut self) {
while self.consume().is_ok() { }
}
}
unsafe impl<T> Sync for Queue<T> where T: Send { }
fn exchange(atomic: &AtomicUsize, current: usize, new: usize) -> bool {
atomic.compare_exchange_weak(current, new, Relaxed, Relaxed).is_ok()
}
pub fn channel<T>(size: usize) -> (Producer<T>, Consumer<T>) {
assert!(size.is_power_of_two(), "`size` is not a power of two");
let queue = Queue::new(size);
(Producer(queue.clone()), Consumer(queue))
}