pub use self::Stolen::*;
use std::sync::Arc;
use std::mem::forget;
use std::ptr;
use std::marker::PhantomData;
use std::cell::Cell;
use std::fmt;
use std::sync::atomic::{AtomicIsize, AtomicPtr, fence};
use std::sync::atomic::Ordering::{SeqCst, Acquire, Release, Relaxed};
static MIN_SIZE: usize = 32;
struct Deque<T: Send> {
bottom: AtomicIsize,
top: AtomicIsize,
array: AtomicPtr<Buffer<T>>,
}
pub struct Worker<T: Send> {
deque: Arc<Deque<T>>,
marker: PhantomData<Cell<()>>,
}
pub struct Stealer<T: Send> {
deque: Arc<Deque<T>>,
}
impl<T: Send> Clone for Stealer<T> {
fn clone(&self) -> Self {
Stealer { deque: self.deque.clone() }
}
}
#[derive(PartialEq, Debug)]
pub enum Stolen<T> {
Empty,
Abort,
Data(T),
}
struct Buffer<T: Send> {
storage: *mut T,
size: usize,
prev: Option<Box<Buffer<T>>>,
}
pub fn new<T: Send>() -> (Worker<T>, Stealer<T>) {
let a = Arc::new(Deque::new());
let b = a.clone();
(Worker {
deque: a,
marker: PhantomData,
},
Stealer { deque: b })
}
impl<T: Send> Worker<T> {
pub fn push(&self, t: T) {
unsafe { self.deque.push(t) }
}
pub fn pop(&self) -> Option<T> {
unsafe { self.deque.pop() }
}
}
impl<T: Send> Stealer<T> {
pub fn steal(&self) -> Stolen<T> {
unsafe { self.deque.steal() }
}
}
impl<T: Send> Deque<T> {
fn new() -> Deque<T> {
let buf = Box::new(unsafe { Buffer::new(MIN_SIZE) });
Deque {
bottom: AtomicIsize::new(0),
top: AtomicIsize::new(0),
array: AtomicPtr::new(Box::into_raw(buf)),
}
}
unsafe fn push(&self, data: T) {
let b = self.bottom.load(Relaxed);
let t = self.top.load(Acquire);
let mut a = self.array.load(Relaxed);
let size = b.wrapping_sub(t);
if size == (*a).size() {
a = Box::into_raw(Box::from_raw(a).grow(b, t));
self.array.store(a, Release);
}
(*a).put(b, data);
fence(Release);
self.bottom.store(b.wrapping_add(1), Relaxed);
}
unsafe fn pop(&self) -> Option<T> {
let b = self.bottom.load(Relaxed);
let t = self.top.load(Relaxed);
if b.wrapping_sub(t) <= 0 {
return None;
}
let b = b.wrapping_sub(1);
self.bottom.store(b, Relaxed);
fence(SeqCst);
let t = self.top.load(Relaxed);
let size = b.wrapping_sub(t);
if size < 0 {
self.bottom.store(b.wrapping_add(1), Relaxed);
return None;
}
let a = self.array.load(Relaxed);
let data = (*a).get(b);
if size != 0 {
return Some(data);
}
if self.top.compare_and_swap(t, t.wrapping_add(1), SeqCst) == t {
self.bottom.store(t.wrapping_add(1), Relaxed);
return Some(data);
} else {
self.bottom.store(t.wrapping_add(1), Relaxed);
forget(data); return None;
}
}
unsafe fn steal(&self) -> Stolen<T> {
let t = self.top.load(Acquire);
fence(SeqCst);
let b = self.bottom.load(Acquire);
let size = b.wrapping_sub(t);
if size <= 0 {
return Empty;
}
let a = self.array.load(Acquire);
let data = (*a).get(t);
if self.top.compare_and_swap(t, t.wrapping_add(1), SeqCst) == t {
Data(data)
} else {
forget(data); Abort
}
}
}
impl<T: Send> Drop for Deque<T> {
fn drop(&mut self) {
let t = self.top.load(Relaxed);
let b = self.bottom.load(Relaxed);
let a = self.array.load(Relaxed);
let mut i = t;
while i != b {
unsafe { (*a).get(i) };
i = i.wrapping_add(1);
}
unsafe { Box::from_raw(a) };
}
}
#[inline]
unsafe fn take_ptr_from_vec<T>(mut buf: Vec<T>) -> *mut T {
let ptr = buf.as_mut_ptr();
forget(buf);
ptr
}
#[inline]
unsafe fn allocate<T>(number: usize) -> *mut T {
let v = Vec::with_capacity(number);
take_ptr_from_vec(v)
}
#[inline]
unsafe fn deallocate<T>(ptr: *mut T, number: usize) {
Vec::from_raw_parts(ptr, 0, number);
}
impl<T: Send> Buffer<T> {
unsafe fn new(size: usize) -> Buffer<T> {
Buffer {
storage: allocate(size),
size: size,
prev: None,
}
}
fn size(&self) -> isize {
self.size as isize
}
fn mask(&self) -> isize {
self.size as isize - 1
}
unsafe fn elem(&self, i: isize) -> *mut T {
self.storage.offset(i & self.mask())
}
unsafe fn get(&self, i: isize) -> T {
ptr::read(self.elem(i))
}
unsafe fn put(&self, i: isize, t: T) {
ptr::write(self.elem(i), t);
}
unsafe fn grow(self: Box<Buffer<T>>, b: isize, t: isize) -> Box<Buffer<T>> {
let mut buf = Box::new(Buffer::new(self.size * 2));
let mut i = t;
while i != b {
buf.put(i, self.get(i));
i = i.wrapping_add(1);
}
buf.prev = Some(self);
return buf;
}
}
impl<T: Send> Drop for Buffer<T> {
fn drop(&mut self) {
unsafe { deallocate(self.storage, self.size) }
}
}
impl<T: Send> fmt::Debug for Deque<T> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> fmt::Result {
f.debug_struct("Deque")
.field("bottom", &self.bottom)
.field("top", &self.top)
.field("array", &self.array)
.finish()
}
}
impl<T: Send> fmt::Debug for Worker<T> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> fmt::Result {
f.debug_struct("Worker")
.field("deque", &self.deque)
.field("marker", &self.marker)
.finish()
}
}
impl<T: Send> fmt::Debug for Stealer<T> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> fmt::Result {
f.debug_struct("Stealer")
.field("deque", &self.deque)
.finish()
}
}