extern crate crossbeam;
extern crate num_cpus;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, AtomicIsize, Ordering, fence};
use bag::{WeakBag, SharedWeakBag, RevocableWeakBag, Revocable, PopResult, PopStatus};
use crossbeam::mem::CachePadded;
use std::mem;
pub mod queue;
pub mod bag;
#[cfg(feature="prime_schedules")]
mod primes;
const THRESHOLD_DIFF: isize = 4;
const N_COUNTERS: usize = 4;
pub struct BagPipe<B: SharedWeakBag> {
pipes: Arc<BagPipeState<B>>,
offset: usize,
stride: usize,
push_failures: usize,
pop_failures: usize,
cur_diff: isize,
}
impl<B: SharedWeakBag> Drop for BagPipe<B> {
fn drop(&mut self) {
if self.cur_diff != 0 {
self.push_diff();
}
}
}
impl<B: SharedWeakBag> Clone for BagPipe<B> {
fn clone(&self) -> Self {
#[cfg(feature="prime_schedules")]
let offset = {
primes::get(self.pipes.all_refs.fetch_add(1, Ordering::Relaxed) + 1)
};
#[cfg(not(feature="prime_schedules"))]
let offset = {
let seed = self.pipes.all_refs.fetch_add(1, Ordering::Relaxed) + 1;
seed * 2 + 1
};
BagPipe {
pipes: self.pipes.clone(),
offset: offset & (self.pipes.pipes.len() - 1),
stride: offset,
push_failures: 0,
pop_failures: 0,
cur_diff: 0,
}
}
}
impl<B: SharedWeakBag> BagPipe<B> {
fn push_diff(&mut self) {
unsafe {
self.pipes
.counters
.get_unchecked(self.offset % N_COUNTERS)
.fetch_add(self.cur_diff, Ordering::Release);
}
}
pub fn new_size(size: usize) -> Self {
#[cfg(feature="prime_schedules")]
let offset = primes::get(1);
#[cfg(not(feature="prime_schedules"))]
let offset = 1;
debug_assert!(size > 0);
BagPipe {
pipes: Arc::new(BagPipeState::new_size(size)),
offset: offset,
stride: offset,
push_failures: 0,
pop_failures: 0,
cur_diff: 0,
}
}
fn propagate_diff(&mut self, d: isize) {
self.cur_diff += d;
let thresh = THRESHOLD_DIFF;
if self.cur_diff >= thresh || self.cur_diff <= -thresh {
self.push_diff();
self.cur_diff = 0;
}
}
pub fn size_guess(&self) -> isize {
use std::cmp;
let mut total = 0;
for ctr in &self.pipes.counters {
total += ctr.load(Ordering::Acquire);
}
cmp::max(0, total)
}
}
impl<B: SharedWeakBag> WeakBag for BagPipe<B> {
type Item = B::Item;
fn new() -> Self {
#[cfg(feature="prime_schedules")]
let offset = primes::get(1);
#[cfg(not(feature="prime_schedules"))]
let offset = 1;
BagPipe {
pipes: Arc::new(BagPipeState::new()),
offset: offset,
stride: offset,
push_failures: 0,
pop_failures: 0,
cur_diff: 0,
}
}
fn try_push_mut(&mut self, it: Self::Item) -> Result<(), Self::Item> {
match self.pipes
.try_push_internal(it,
self.offset,
self.stride,
self.push_failures + 1,
false) {
Ok(_) => {
self.push_failures >>= 1;
self.propagate_diff(1);
Ok(())
}
Err(item) => {
self.offset += self.stride;
self.push_failures += 1;
Err(item)
}
}
}
fn try_pop_mut(&mut self) -> PopResult<Self::Item> {
let res = self.pipes
.try_pop_internal(self.offset, self.stride, (self.pop_failures * 2) + 1);
match res {
Err(PopStatus::TransientFailure) => {
self.offset += self.stride;
self.pop_failures += 1;
Err(PopStatus::TransientFailure)
}
Err(PopStatus::Empty) => {
self.pop_failures >>= 1;
Err(PopStatus::Empty)
}
Ok(item) => {
self.pop_failures >>= 1;
self.propagate_diff(-1);
Ok(item)
}
}
}
fn push_mut(&mut self, it: Self::Item) {
if let Err(it) = self.try_push_mut(it) {
match self.pipes
.try_push_internal(it,
self.offset,
self.stride,
self.push_failures + 1,
true) {
Ok(true) => {
self.push_failures >>= 1;
}
Ok(false) => {
self.offset += self.stride;
self.push_failures += 1;
}
Err(_) => unreachable!(),
}
self.propagate_diff(1);
}
}
fn bulk_add<I: Iterator<Item = Self::Item>>(&mut self, iter: I) {
let mut cur_index = self.offset;
let p_len = self.pipes.pipes.len();
let mut n_iters = 0;
for item in iter {
let mut it = item;
loop {
cur_index &= p_len - 1;
let res = unsafe { self.pipes.pipes.get_unchecked(cur_index).try_push(it) };
cur_index += self.stride;
match res {
Ok(_) => {
n_iters += 1;
break;
}
Err(old_item) => {
it = old_item;
}
}
}
}
self.propagate_diff(n_iters)
}
}
impl<B: RevocableWeakBag> BagPipe<B>
where B::Item: Revocable
{
pub unsafe fn revoke(it: &B::Item) -> bool {
B::revoke(it)
}
}
struct BagPipeState<B: SharedWeakBag> {
all_refs: AtomicUsize,
counters: [CachePadded<AtomicIsize>; N_COUNTERS],
pipes: Vec<B>,
}
impl<B: SharedWeakBag> BagPipeState<B> {
pub fn new_size(sz: usize) -> Self {
let len = sz.next_power_of_two();
let mut res = BagPipeState {
all_refs: AtomicUsize::new(1),
counters: unsafe { mem::transmute([[0 as usize; 32]; N_COUNTERS]) },
pipes: Vec::with_capacity(len),
};
for _ in 0..len {
res.pipes.push(B::new())
}
fence(Ordering::Acquire);
res
}
pub fn new() -> Self {
Self::new_size(num_cpus::get() * 2)
}
pub fn try_push_internal(&self,
it: B::Item,
offset: usize,
stride: usize,
max_failures: usize,
succeed_final: bool)
-> Result<bool, B::Item> {
let mut ix = offset;
let mut remaining = max_failures;
let len = self.pipes.len();
debug_assert!(len.is_power_of_two());
let mut cur_item = it;
while remaining > 0 {
ix &= len - 1;
unsafe {
if succeed_final && remaining == 1 {
self.pipes.get_unchecked(ix).push(cur_item);
return Ok(false);
} else {
match self.pipes.get_unchecked(ix).try_push(cur_item) {
Ok(()) => return Ok(true),
Err(item) => cur_item = item,
}
}
}
ix += stride;
remaining -= 1;
}
Err(cur_item)
}
pub fn try_pop_internal(&self,
offset: usize,
stride: usize,
max_failures: usize)
-> PopResult<B::Item> {
let mut ix = offset;
let mut remaining = max_failures;
let mut empties = 0;
let len = self.pipes.len();
debug_assert!(len.is_power_of_two());
debug_assert!(max_failures > 0);
#[cfg(debug_assertions)]
let mut seen = Vec::new();
loop {
ix &= len - 1;
unsafe {
match self.pipes.get_unchecked(ix).try_pop() {
Ok(it) => return Ok(it),
Err(PopStatus::Empty) => {
#[cfg(debug_assertions)]
seen.push(ix);
empties += 1
}
Err(PopStatus::TransientFailure) => empties = 0,
}
};
ix += stride;
remaining -= 1;
if remaining == 0 {
return Err(PopStatus::TransientFailure);
}
if empties == len {
#[cfg(debug_assertions)]
{
seen.sort();
seen.dedup();
let expected: Vec<usize> = (0..len).collect();
assert_eq!(seen,
expected,
"got {:?} but expected {:?}, with offset={} and stride={}",
seen,
expected,
offset,
stride);
}
return Err(PopStatus::Empty);
}
}
}
}