use std::{
cell::Cell,
sync::{
Arc,
atomic::{
AtomicBool,
Ordering,
},
},
};
use crossbeam_deque::{
Injector,
Steal,
Stealer,
Worker,
};
use super::thread_pool::PoolJob;
pub(crate) struct WorkerQueue {
worker_index: usize,
inbox: Injector<PoolJob>,
stealer: Stealer<PoolJob>,
active: AtomicBool,
}
impl WorkerQueue {
fn new(worker_index: usize, stealer: Stealer<PoolJob>) -> Self {
Self {
worker_index,
inbox: Injector::new(),
stealer,
active: AtomicBool::new(false),
}
}
#[inline]
pub(crate) fn worker_index(&self) -> usize {
self.worker_index
}
#[inline]
pub(crate) fn is_active(&self) -> bool {
self.active.load(Ordering::Acquire)
}
pub(crate) fn activate(&self) -> bool {
self.active
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
}
pub(crate) fn deactivate(&self) -> bool {
self.active
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
}
pub(crate) fn push_back(&self, job: PoolJob) {
self.inbox.push(job);
}
pub(crate) fn pop_inbox_into(&self, local: &Worker<PoolJob>) -> Option<PoolJob> {
steal_batch_and_pop(&self.inbox, local)
}
pub(crate) fn steal_into(&self, dest: &Worker<PoolJob>) -> Option<PoolJob> {
steal_batch_and_pop(&self.stealer, dest).or_else(|| steal_batch_and_pop(&self.inbox, dest))
}
pub(crate) fn drain(&self) -> Vec<PoolJob> {
let mut jobs = Vec::new();
while let Some(job) = steal_one(&self.stealer) {
jobs.push(job);
}
while let Some(job) = steal_one(&self.inbox) {
jobs.push(job);
}
jobs
}
}
pub(crate) struct WorkerRuntime {
pub(crate) queue: Arc<WorkerQueue>,
pub(crate) local: Worker<PoolJob>,
steal_cursor: Cell<usize>,
}
impl WorkerRuntime {
pub(crate) fn new(worker_index: usize) -> Self {
let local = Worker::new_fifo();
let queue = Arc::new(WorkerQueue::new(worker_index, local.stealer()));
Self {
queue,
local,
steal_cursor: Cell::new(worker_index.wrapping_add(1)),
}
}
#[inline]
pub(crate) fn worker_index(&self) -> usize {
self.queue.worker_index()
}
pub(crate) fn next_steal_start(&self, queue_count: usize) -> usize {
let current = self.steal_cursor.get();
self.steal_cursor.set(current.wrapping_add(1));
current % queue_count
}
}
pub(crate) fn steal_one<S>(source: &S) -> Option<PoolJob>
where
S: QueueStealSource,
{
loop {
match source.steal_one() {
Steal::Success(job) => return Some(job),
Steal::Empty => return None,
Steal::Retry => continue,
}
}
}
pub(crate) fn steal_batch_and_pop<S>(source: &S, dest: &Worker<PoolJob>) -> Option<PoolJob>
where
S: QueueStealSource,
{
loop {
match source.steal_batch_and_pop(dest) {
Steal::Success(job) => return Some(job),
Steal::Empty => return None,
Steal::Retry => continue,
}
}
}
pub(crate) trait QueueStealSource {
fn steal_one(&self) -> Steal<PoolJob>;
fn steal_batch_and_pop(&self, dest: &Worker<PoolJob>) -> Steal<PoolJob>;
}
impl QueueStealSource for Injector<PoolJob> {
#[inline]
fn steal_one(&self) -> Steal<PoolJob> {
self.steal()
}
#[inline]
fn steal_batch_and_pop(&self, dest: &Worker<PoolJob>) -> Steal<PoolJob> {
Injector::steal_batch_and_pop(self, dest)
}
}
impl QueueStealSource for Stealer<PoolJob> {
#[inline]
fn steal_one(&self) -> Steal<PoolJob> {
self.steal()
}
#[inline]
fn steal_batch_and_pop(&self, dest: &Worker<PoolJob>) -> Steal<PoolJob> {
Stealer::steal_batch_and_pop(self, dest)
}
}