use crossbeam::deque::{Injector, Worker};
use log::trace;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;
use crate::entities::WorkerPool;
type Job = Box<dyn FnOnce() + Send + 'static>;
pub struct Workers {
injector: Arc<Injector<Job>>, handles: Vec<thread::JoinHandle<()>>, current_epoch: Arc<AtomicU64>, shutdown: Arc<AtomicBool>, }
impl Workers {
pub fn new(num_threads: usize, epoch: Arc<AtomicU64>) -> Self {
let injector: Arc<Injector<Job>> = Arc::new(Injector::new());
let shutdown = Arc::new(AtomicBool::new(false));
let mut workers_local: Vec<Worker<Job>> = Vec::new();
let mut stealers = Vec::new();
let mut handles = Vec::new();
for _ in 0..num_threads {
let worker: Worker<Job> = Worker::new_fifo();
stealers.push(worker.stealer());
workers_local.push(worker);
}
for (worker_id, worker) in workers_local.into_iter().enumerate() {
let injector = Arc::clone(&injector);
let shutdown = Arc::clone(&shutdown);
let stealers = stealers.clone();
let handle = thread::Builder::new()
.name(format!("playa-worker-{}", worker_id))
.spawn(move || {
trace!("Worker {} started", worker_id);
loop {
if let Some(job) = worker.pop() {
job();
continue;
}
if let Some(job) = injector.steal().success() {
job();
continue;
}
let mut found_work = false;
for stealer in &stealers {
if let Some(job) = stealer.steal().success() {
job();
found_work = true;
break;
}
}
if found_work {
continue;
}
if shutdown.load(Ordering::Relaxed) {
break;
}
thread::sleep(std::time::Duration::from_millis(1));
}
trace!("Worker {} stopped", worker_id);
})
.expect("Failed to spawn worker thread");
handles.push(handle);
}
trace!("Workers initialized: {} threads (work-stealing)", num_threads);
Self {
injector,
handles,
current_epoch: epoch,
shutdown,
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
self.injector.push(Box::new(f));
}
pub fn current_epoch(&self) -> u64 {
self.current_epoch.load(Ordering::Relaxed)
}
pub fn execute_with_epoch<F>(&self, epoch: u64, f: F)
where
F: FnOnce() + Send + 'static,
{
let current_epoch = Arc::clone(&self.current_epoch);
let wrapped = move || {
if current_epoch.load(Ordering::Relaxed) == epoch {
f(); }
};
self.injector.push(Box::new(wrapped));
}
}
impl Drop for Workers {
fn drop(&mut self) {
use std::time::{Duration, Instant};
let num_threads = self.handles.len();
trace!("Workers shutting down ({} threads)...", num_threads);
self.shutdown.store(true, Ordering::SeqCst);
let deadline = Instant::now() + Duration::from_millis(500);
let handles = std::mem::take(&mut self.handles);
for handle in handles {
while !handle.is_finished() {
if Instant::now() >= deadline {
trace!("Shutdown timeout reached, exiting anyway");
return;
}
thread::sleep(Duration::from_millis(1));
}
let _ = handle.join();
}
trace!("All {} workers stopped gracefully", num_threads);
}
}
impl WorkerPool for Workers {
fn execute_with_epoch(&self, epoch: u64, f: Box<dyn FnOnce() + Send + 'static>) {
Workers::execute_with_epoch(self, epoch, f)
}
}