use std::sync::atomic::{
AtomicBool,
Ordering,
};
use crossbeam_deque::{
Injector,
Stealer,
Worker,
};
use super::queue_steal_source::{
QueueStealSource,
steal_batch_and_pop,
steal_one,
};
use super::thread_pool::PoolJob;
pub(crate) struct WorkerQueue {
worker_index: usize,
inbox: Injector<PoolJob>,
stealer: Stealer<PoolJob>,
active: AtomicBool,
}
impl WorkerQueue {
pub(crate) fn new(worker_index: usize, stealer: Stealer<PoolJob>) -> Self {
Self {
worker_index,
inbox: Injector::new(),
stealer,
active: AtomicBool::new(false),
}
}
#[inline]
pub(crate) fn worker_index(&self) -> usize {
self.worker_index
}
#[inline]
pub(crate) fn is_active(&self) -> bool {
self.active.load(Ordering::Acquire)
}
pub(crate) fn activate(&self) -> bool {
self.active
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
}
pub(crate) fn deactivate(&self) -> bool {
self.active
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
}
pub(crate) fn push_back(&self, job: PoolJob) {
self.inbox.push(job);
}
pub(crate) fn pop_inbox_into(&self, local: &Worker<PoolJob>) -> Option<PoolJob> {
steal_batch_and_pop(&self.inbox, local)
}
pub(crate) fn steal_into(&self, dest: &Worker<PoolJob>) -> Option<PoolJob> {
steal_batch_and_pop(&self.stealer, dest).or_else(|| steal_batch_and_pop(&self.inbox, dest))
}
pub(crate) fn drain(&self) -> Vec<PoolJob> {
let mut jobs = Vec::new();
drain_source(&self.stealer, &mut jobs);
drain_source(&self.inbox, &mut jobs);
jobs
}
}
fn drain_source<S>(source: &S, jobs: &mut Vec<PoolJob>)
where
S: QueueStealSource,
{
while let Some(job) = steal_one(source) {
jobs.push(job);
}
}