use park::{BoxPark, BoxUnpark};
use task::Task;
use worker::state::{State, PUSHED_MASK};
use std::cell::UnsafeCell;
use std::fmt;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use crossbeam_deque::{Steal, Stealer, Worker};
use crossbeam_queue::SegQueue;
use crossbeam_utils::CachePadded;
use slab::Slab;
pub(crate) struct WorkerEntry {
pub state: CachePadded<AtomicUsize>,
next_sleeper: UnsafeCell<usize>,
pub worker: Worker<Arc<Task>>,
stealer: Stealer<Arc<Task>>,
park: UnsafeCell<Option<BoxPark>>,
unpark: UnsafeCell<Option<BoxUnpark>>,
running_tasks: UnsafeCell<Slab<Arc<Task>>>,
remotely_completed_tasks: SegQueue<Arc<Task>>,
needs_drain: AtomicBool,
}
impl WorkerEntry {
pub fn new(park: BoxPark, unpark: BoxUnpark) -> Self {
let w = Worker::new_fifo();
let s = w.stealer();
WorkerEntry {
state: CachePadded::new(AtomicUsize::new(State::default().into())),
next_sleeper: UnsafeCell::new(0),
worker: w,
stealer: s,
park: UnsafeCell::new(Some(park)),
unpark: UnsafeCell::new(Some(unpark)),
running_tasks: UnsafeCell::new(Slab::new()),
remotely_completed_tasks: SegQueue::new(),
needs_drain: AtomicBool::new(false),
}
}
pub fn fetch_unset_pushed(&self, ordering: Ordering) -> State {
self.state.fetch_and(!PUSHED_MASK, ordering).into()
}
#[inline]
pub fn submit_internal(&self, task: Arc<Task>) {
self.push_internal(task);
}
#[inline]
pub fn notify(&self, mut state: State) -> bool {
use worker::Lifecycle::*;
loop {
let mut next = state;
next.notify();
let actual = self
.state
.compare_and_swap(state.into(), next.into(), AcqRel)
.into();
if state == actual {
break;
}
state = actual;
}
match state.lifecycle() {
Sleeping => {
self.unpark();
true
}
Shutdown => false,
Running | Notified | Signaled => {
true
}
}
}
pub fn signal_stop(&self, mut state: State) {
use worker::Lifecycle::*;
loop {
let mut next = state;
match state.lifecycle() {
Shutdown => {
return;
}
Running | Sleeping => {}
Notified | Signaled => {
return;
}
}
next.set_lifecycle(Signaled);
let actual = self
.state
.compare_and_swap(state.into(), next.into(), AcqRel)
.into();
if actual == state {
break;
}
state = actual;
}
self.unpark();
}
#[inline]
pub fn pop_task(&self) -> Option<Arc<Task>> {
self.worker.pop()
}
pub fn steal_tasks(&self, dest: &Self) -> Steal<Arc<Task>> {
self.stealer.steal_batch_and_pop(&dest.worker)
}
pub fn drain_tasks(&self) {
while self.worker.pop().is_some() {}
}
pub fn park(&self) {
if let Some(park) = unsafe { (*self.park.get()).as_mut() } {
park.park().unwrap();
}
}
pub fn park_timeout(&self, duration: Duration) {
if let Some(park) = unsafe { (*self.park.get()).as_mut() } {
park.park_timeout(duration).unwrap();
}
}
#[inline]
pub fn unpark(&self) {
if let Some(park) = unsafe { (*self.unpark.get()).as_ref() } {
park.unpark();
}
}
#[inline]
pub fn register_task(&self, task: &Arc<Task>) {
let running_tasks = unsafe { &mut *self.running_tasks.get() };
let key = running_tasks.insert(task.clone());
task.reg_index.set(key);
}
#[inline]
pub fn unregister_task(&self, task: Arc<Task>) {
let running_tasks = unsafe { &mut *self.running_tasks.get() };
running_tasks.remove(task.reg_index.get());
self.drain_remotely_completed_tasks();
}
#[inline]
pub fn remotely_complete_task(&self, task: Arc<Task>) {
self.remotely_completed_tasks.push(task);
self.needs_drain.store(true, Release);
}
pub fn shutdown(&self) {
self.drain_remotely_completed_tasks();
let running_tasks = unsafe { &mut *self.running_tasks.get() };
for (_, task) in running_tasks.iter() {
task.abort();
}
running_tasks.clear();
unsafe {
*self.park.get() = None;
*self.unpark.get() = None;
}
}
#[inline]
fn drain_remotely_completed_tasks(&self) {
if self.needs_drain.compare_and_swap(true, false, Acquire) {
let running_tasks = unsafe { &mut *self.running_tasks.get() };
while let Ok(task) = self.remotely_completed_tasks.pop() {
running_tasks.remove(task.reg_index.get());
}
}
}
#[inline]
pub fn push_internal(&self, task: Arc<Task>) {
self.worker.push(task);
}
#[inline]
pub fn next_sleeper(&self) -> usize {
unsafe { *self.next_sleeper.get() }
}
#[inline]
pub fn set_next_sleeper(&self, val: usize) {
unsafe {
*self.next_sleeper.get() = val;
}
}
}
impl fmt::Debug for WorkerEntry {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("WorkerEntry")
.field("state", &self.state.load(Relaxed))
.field("next_sleeper", &"UnsafeCell<usize>")
.field("worker", &self.worker)
.field("stealer", &self.stealer)
.field("park", &"UnsafeCell<BoxPark>")
.field("unpark", &"BoxUnpark")
.finish()
}
}