use std::ptr;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicPtr, AtomicUsize};
use std::sync::atomic::Ordering::*;
use hazard::{BoxMemory, Memory, Pointers};
use {ConsumeError, ProduceError, POINTERS};
#[derive(Debug)]
pub struct Consumer<T>(usize, Arc<Queue<T>>);
impl<T> Consumer<T> {
pub fn consume(&self) -> Result<T, ConsumeError> {
self.1.consume(self.0)
}
pub fn try_clone(&self) -> Option<Self> {
if let Some(thread) = self.1.threads.lock().unwrap().pop() {
self.1.consumers.fetch_add(1, Release);
Some(Consumer(thread, self.1.clone()))
} else {
None
}
}
}
impl<T> Clone for Consumer<T> {
fn clone(&self) -> Self {
self.try_clone().expect("too many producer and consumer clones")
}
}
impl<T> Drop for Consumer<T> {
fn drop(&mut self) {
self.1.threads.lock().unwrap().push(self.0);
self.1.consumers.fetch_sub(1, Release);
}
}
unsafe impl<T> Send for Consumer<T> where T: Send { }
#[derive(Debug)]
pub struct Producer<T>(usize, Arc<Queue<T>>);
impl<T> Producer<T> {
pub fn produce(&self, item: T) -> Result<(), ProduceError<T>> {
self.1.produce(self.0, item)
}
pub fn try_clone(&self) -> Option<Self> {
if let Some(thread) = self.1.threads.lock().unwrap().pop() {
self.1.producers.fetch_add(1, Release);
Some(Producer(thread, self.1.clone()))
} else {
None
}
}
}
impl<T> Clone for Producer<T> {
fn clone(&self) -> Self {
self.try_clone().expect("too many producer and consumer clones")
}
}
impl<T> Drop for Producer<T> {
fn drop(&mut self) {
self.1.threads.lock().unwrap().push(self.0);
self.1.producers.fetch_sub(1, Release);
}
}
unsafe impl<T> Send for Producer<T> where T: Send { }
#[derive(Debug)]
struct Node<T> {
item: Option<T>,
next: AtomicPtr<Node<T>>,
}
impl<T> Node<T> {
fn new(item: Option<T>) -> Self {
Node { item: item, next: AtomicPtr::new(ptr::null_mut()) }
}
}
const READ: usize = 0;
const WRITE: usize = 1;
const NEXT: usize = 2;
#[derive(Debug)]
#[repr(C)]
struct Queue<T> {
write: AtomicPtr<Node<T>>,
producers: AtomicUsize,
_wpadding: [usize; POINTERS - 2],
read: AtomicPtr<Node<T>>,
consumers: AtomicUsize,
_rpadding: [usize; POINTERS - 2],
pointers: Pointers<Node<T>, BoxMemory>,
threads: Mutex<Vec<usize>>,
}
impl<T> Queue<T> {
fn new(threads: usize) -> Arc<Self> {
let sentinel = BoxMemory.allocate(Node::new(None));
Arc::new(Queue {
write: AtomicPtr::new(sentinel),
producers: AtomicUsize::new(1),
_wpadding: [0; POINTERS - 2],
read: AtomicPtr::new(sentinel),
consumers: AtomicUsize::new(1),
_rpadding: [0; POINTERS - 2],
pointers: Pointers::new(BoxMemory, threads, 3, 512),
threads: Mutex::new((2..threads).collect()),
})
}
fn produce(&self, thread: usize, item: T) -> Result<(), ProduceError<T>> {
if self.consumers.load(Acquire) == 0 {
return Err(ProduceError::Disconnected(item));
}
let node = BoxMemory.allocate(Node::new(Some(item)));
loop {
let write = self.pointers.mark_ptr(thread, WRITE, self.write.load(Acquire));
if write == self.write.load(Acquire) {
let next = deref!(write).next.load(Acquire);
if next.is_null() {
if exchange(&deref!(write).next, ptr::null_mut(), node) {
exchange(&self.write, write, node);
self.pointers.clear(thread, WRITE);
return Ok(());
}
} else {
exchange(&self.write, write, next);
}
}
}
}
fn consume(&self, thread: usize) -> Result<T, ConsumeError> {
loop {
let read = self.pointers.mark(thread, READ, &self.read);
if read == self.write.load(Acquire) {
if self.producers.load(Acquire) == 0 {
return Err(ConsumeError::Disconnected);
} else {
return Err(ConsumeError::Empty);
}
}
let next = self.pointers.mark(thread, NEXT, &deref!(read).next);
if exchange(&self.read, read, next) {
let item = deref_mut!(next).item.take().unwrap();
self.pointers.clear(thread, READ);
self.pointers.clear(thread, NEXT);
self.pointers.retire(thread, read);
return Ok(item);
}
}
}
}
impl<T> Drop for Queue<T> {
fn drop(&mut self) {
while self.consume(0).is_ok() { }
unsafe { BoxMemory.deallocate(self.write.load(Relaxed)); }
}
}
unsafe impl<T> Sync for Queue<T> where T: Send { }
fn exchange<T>(atomic: &AtomicPtr<Node<T>>, current: *mut Node<T>, new: *mut Node<T>) -> bool {
atomic.compare_exchange(current, new, AcqRel, Acquire).is_ok()
}
pub fn channel<T>(clones: usize) -> (Producer<T>, Consumer<T>) {
let queue = Queue::new(clones + 2);
(Producer(0, queue.clone()), Consumer(1, queue))
}