use crossbeam::sync::chase_lev::{self, Steal};
use std::cell::{Cell, UnsafeCell};
use std::sync::{Arc, Mutex, Condvar};
use std::sync::atomic::{AtomicUsize, Ordering};
use super::{Spawner, make_spawner};
use super::arena::Arena;
use super::job::Job;
use super::rand::{Rng, XorShiftRng};
const EXIT_FLAG: usize = 1 << 31;
pub type Stealer = chase_lev::Stealer<*mut Job>;
pub type Queue = chase_lev::Worker<*mut Job>;
pub struct SharedWorkerData {
queues: Vec<Stealer>,
arenas: Vec<Arena>,
num_jobs: Vec<AtomicUsize>,
with_work: Mutex<usize>,
wait_condvar: Condvar,
}
impl SharedWorkerData {
pub fn new(queues: Vec<Stealer>, arenas: Vec<Arena>) -> Self {
let len = queues.len();
SharedWorkerData {
queues: queues,
arenas: arenas,
num_jobs: (0..len).map(|_| AtomicUsize::new(0)).collect(),
with_work: Mutex::new(0),
wait_condvar: Condvar::new(),
}
}
pub fn notify_shutdown(&self) {
*self.with_work.lock().unwrap() |= EXIT_FLAG;
self.wait_condvar.notify_all();
}
fn worker_has_work(&self) {
let mut with_work = self.with_work.lock().unwrap();
*with_work += 1;
self.wait_condvar.notify_all();
}
fn out_of_work(&self) {
let mut with_work = self.with_work.lock().unwrap();
if *with_work | EXIT_FLAG == 0 {
*with_work -= 1;
}
}
fn wait(&self, idx: usize) -> bool {
assert!(idx != 0);
let mut guard = self.with_work.lock().unwrap();
let mut with_work = *guard;
while with_work == 0 {
guard = self.wait_condvar.wait(guard).unwrap();
with_work = *guard;
}
drop(guard);
if with_work & EXIT_FLAG != 0 {
false
} else {
true
}
}
}
pub struct Worker {
shared_data: Arc<SharedWorkerData>,
idx: usize, rng: UnsafeCell<XorShiftRng>,
exit_time: Cell<bool>,
queue: UnsafeCell<Queue>
}
impl Worker {
pub fn new(shared_data: Arc<SharedWorkerData>, idx: usize, rng: XorShiftRng, queue: Queue)
-> Self {
Worker {
shared_data: shared_data,
idx: idx,
rng: UnsafeCell::new(rng),
exit_time: Cell::new(false),
queue: UnsafeCell::new(queue),
}
}
unsafe fn steal(&self) -> Option<*mut Job> {
const ABORTS_BEFORE_BACKOFF: usize = 32;
let idx = (*self.rng.get()).gen::<usize>() % self.stealers().len();
if idx != self.idx {
let mut aborts = 0;
loop {
aborts += 1;
if aborts > ABORTS_BEFORE_BACKOFF {
return None;
}
match self.stealers()[idx].steal() {
Steal::Data(job) => {
self.shared_data.num_jobs[idx].fetch_sub(1, Ordering::SeqCst);
return Some(job)
}
Steal::Empty => return None,
Steal::Abort => {}
}
}
} else {
None
}
}
fn clear_pass(&self) -> bool {
let mut all_clear = true;
for (idx, queue) in self.stealers().iter().enumerate() {
if idx != self.idx {
loop {
match queue.steal() {
Steal::Data(job) => {
all_clear = false;
unsafe { (*job).call(self) }
}
Steal::Empty => break,
Steal::Abort => {
all_clear = false;
}
}
}
} else {
loop {
match unsafe { self.queue().try_pop() } {
Some(job) => {
all_clear = false;
unsafe { (*job).call(self) }
}
None => {
break;
}
}
}
}
}
all_clear
}
pub fn clear(&self) {
while !self.clear_pass() {}
}
pub unsafe fn run_next(&self, should_wait: bool) {
let num_jobs = self.shared_data.num_jobs[self.idx].load(Ordering::SeqCst);
if num_jobs != 0 {
match self.queue().try_pop() {
Some(job) => {
self.shared_data.num_jobs[self.idx].fetch_sub(1, Ordering::SeqCst);
(*job).call(self);
return;
}
None => {
self.shared_data.out_of_work();
}
}
}
if should_wait && !self.exit_time.get() {
if !self.shared_data.wait(self.idx) {
self.exit_time.set(true);
return;
}
}
if let Some(job) = self.steal() {
(*job).call(self);
}
}
pub unsafe fn submit_internal<F>(&self, counter: *const AtomicUsize, f: F)
where F: Send + FnOnce(&Worker)
{
let num_jobs = self.shared_data.num_jobs[self.idx].fetch_add(1, Ordering::SeqCst);
if num_jobs == 0 {
self.shared_data.worker_has_work();
}
let job = Job::new(counter, f);
let job_ptr = self.arenas()[self.idx].alloc(job);
self.queue().push(job_ptr);
}
pub fn scope<'pool, 'new, F, R>(&'pool self, f: F) -> R
where F: 'new + FnOnce(&Spawner<'pool, 'new>) -> R,
R: 'new,
{
let counter = AtomicUsize::new(0);
let s = make_spawner(self, &counter);
let top = unsafe { self.arenas()[self.idx].top() };
let res = f(&s);
struct PanicGuard<'a>(&'a AtomicUsize);
impl<'a> Drop for PanicGuard<'a> {
fn drop(&mut self) {
while self.0.load(Ordering::Acquire) > 0 {
::std::thread::yield_now()
}
}
}
let guard = PanicGuard(&counter);
loop {
let status = counter.load(Ordering::Acquire);
if status == 0 { break }
unsafe { self.run_next(false) }
}
::std::mem::forget(guard);
unsafe { self.arenas()[self.idx].set_top(top); }
res
}
pub fn shared_data(&self) -> &SharedWorkerData {
&self.shared_data
}
pub fn should_shutdown(&self) -> bool {
self.exit_time.get()
}
#[inline]
pub fn stealers(&self) -> &[Stealer] {
&self.shared_data.queues
}
#[inline]
pub fn arenas(&self) -> &[Arena] {
&self.shared_data.arenas
}
#[inline]
unsafe fn queue(&self) -> &mut Queue {
&mut *self.queue.get()
}
}
unsafe impl Send for Worker {}