use std::ptr;
use std::cell::{Cell};
use std::sync::{Arc};
use std::sync::atomic::{AtomicPtr, AtomicUsize};
use std::sync::atomic::Ordering::*;
use hazard::{BoxMemory, Memory};
use {ConsumeError, ProduceError, POINTERS};
#[derive(Debug)]
pub struct Consumer<T>(Arc<Queue<T>>);
impl<T> Consumer<T> {
pub fn consume(&self) -> Result<T, ConsumeError> {
self.0.consume()
}
}
impl<T> Drop for Consumer<T> {
fn drop(&mut self) {
self.0.consumer.store(0, Release);
}
}
unsafe impl<T> Send for Consumer<T> where T: Send { }
#[derive(Debug)]
pub struct Producer<T>(Arc<Queue<T>>);
impl<T> Producer<T> {
pub fn produce(&self, item: T) -> Result<(), ProduceError<T>> {
self.0.produce(item)
}
}
impl<T> Drop for Producer<T> {
fn drop(&mut self) {
self.0.producer.store(0, Release);
}
}
unsafe impl<T> Send for Producer<T> where T: Send { }
#[derive(Debug)]
struct Node<T> {
item: Option<T>,
next: AtomicPtr<Node<T>>,
}
impl<T> Node<T> {
fn new(item: Option<T>) -> Self {
Node { item: item, next: AtomicPtr::new(ptr::null_mut()) }
}
}
#[derive(Debug)]
#[repr(C)]
struct Queue<T> {
write: Cell<*mut Node<T>>,
consumer: AtomicUsize,
_wpadding: [usize; POINTERS - 2],
read: Cell<*mut Node<T>>,
producer: AtomicUsize,
_rpadding: [usize; POINTERS - 2],
}
impl<T> Queue<T> {
fn new() -> Arc<Self> {
let sentinel = BoxMemory.allocate(Node::new(None));
Arc::new(Queue {
write: Cell::new(sentinel),
consumer: AtomicUsize::new(1),
_wpadding: [0; POINTERS - 2],
read: Cell::new(sentinel),
producer: AtomicUsize::new(1),
_rpadding: [0; POINTERS - 2],
})
}
fn produce(&self, item: T) -> Result<(), ProduceError<T>> {
if self.consumer.load(Acquire) == 0 {
return Err(ProduceError::Disconnected(item));
}
let node = BoxMemory.allocate(Node::new(Some(item)));
deref!(self.write.get()).next.store(node, Release);
self.write.set(node);
Ok(())
}
fn consume(&self) -> Result<T, ConsumeError> {
let next = deref!(self.read.get()).next.load(Acquire);
if next.is_null() {
if self.producer.load(Acquire) == 0 {
return Err(ConsumeError::Disconnected);
} else {
return Err(ConsumeError::Empty);
}
}
let item = deref_mut!(next).item.take().unwrap();
unsafe { BoxMemory.deallocate(self.read.get()); }
self.read.set(next);
Ok(item)
}
}
impl<T> Drop for Queue<T> {
fn drop(&mut self) {
while self.consume().is_ok() { }
unsafe { BoxMemory.deallocate(self.write.get()); }
}
}
unsafe impl<T> Sync for Queue<T> where T: Send { }
pub fn channel<T>() -> (Producer<T>, Consumer<T>) {
let queue = Queue::new();
(Producer(queue.clone()), Consumer(queue))
}