use anyhow::{bail, Result};
use crate::{print_debug, worker::Worker, worker_thread::WorkerThread, AsPayload};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
};
pub struct WorkerPoolInner {
pub worker_path: Arc<str>,
pub binary_args: Arc<Vec<String>>,
pub workers: Vec<Arc<Mutex<Worker>>>,
pub max_workers: usize,
pub busy_counter: Arc<AtomicUsize>,
pub debug: bool,
}
impl WorkerPoolInner {
pub fn setup(worker_path: &str, max_workers: usize) -> Self {
WorkerPoolInner {
worker_path: worker_path.into(),
binary_args: Arc::new(vec!["node".into()]),
workers: Vec::new(),
max_workers,
busy_counter: Arc::new(AtomicUsize::new(0)),
debug: false,
}
}
pub fn set_binary(&mut self, binary: &str) {
self.binary_args = Arc::new(shell_words::split(binary).expect("couldn't parse binary"));
}
pub fn with_debug(&mut self, debug: bool) {
self.debug = debug;
}
pub fn run_worker<P: AsPayload>(&mut self, cmd: String, payload: P) -> WorkerThread {
let worker = self.get_available_worker();
self.busy_counter.fetch_add(1, Ordering::SeqCst);
print_debug!(
self.debug,
"[pool] got worker {}",
worker.lock().unwrap().id
);
let waiting = self.busy_counter.clone();
let debug = self.debug;
let binary_args = self.binary_args.clone();
let payload = payload.to_payload();
let file_path = self.worker_path.clone();
let handle = std::thread::spawn(move || {
let worker = worker.clone();
let mut worker = worker.lock().unwrap();
worker.init(binary_args, file_path).unwrap();
let res = worker.perform_task(cmd, payload).expect("perform task");
print_debug!(debug, "[pool] performed task on worker {}", worker.id);
drop(worker);
waiting.fetch_sub(1, Ordering::SeqCst);
res
});
WorkerThread::from_handle(handle)
}
pub fn get_available_worker(&mut self) -> Arc<Mutex<Worker>> {
let idle_worker = self.workers.iter().find(|w| {
if let Ok(w) = w.try_lock() {
return w.idle;
}
false
});
if let Some(idle_worker) = idle_worker {
idle_worker.lock().unwrap().idle = false;
print_debug!(self.debug, "[pool] found idle worker");
return idle_worker.clone();
}
if self.workers.len() < self.max_workers {
let mut worker = Worker::new(self.workers.len() + 1, self.debug);
worker.idle = false;
self.workers.push(Arc::new(Mutex::new(worker)));
print_debug!(self.debug, "[pool] created new worker");
return self.workers.last().unwrap().clone();
}
print_debug!(self.debug, "[pool] waiting for worker to be free");
loop {
if self.busy_counter.load(Ordering::SeqCst) == 0 {
print_debug!(self.debug, "[pool] pool is free");
break;
}
}
self.get_available_worker()
}
pub fn warmup(&mut self, nbr_workers: usize) -> Result<()> {
let n = nbr_workers.clamp(0, self.max_workers - self.workers.len());
let debug = self.debug;
let ln = self.workers.len();
let mut handles = Vec::new();
for n in 0..n {
let id = ln + n + 1;
let worker = Worker::new(id, debug);
let mutex = Arc::new(Mutex::new(worker));
self.workers.push(mutex.clone());
print_debug!(debug, "[pool] (warmup) created new worker");
let binary_args = self.binary_args.clone();
let file_path = self.worker_path.clone();
let handle = std::thread::spawn(move || {
let worker = mutex.clone();
let mut worker = worker.lock().unwrap();
worker.init(binary_args, file_path).unwrap();
worker.wait_for_ready().unwrap();
print_debug!(debug, "[pool] (warmup) worker {} initialized", id);
});
handles.push(handle);
}
for handle in handles {
if handle.join().is_err() {
bail!("thread panicked")
}
}
Ok(())
}
}