node_workers/
worker_pool_inner.rs1use anyhow::{bail, Result};
2
3use crate::{print_debug, worker::Worker, worker_thread::WorkerThread, AsPayload};
4use std::sync::{
5 atomic::{AtomicUsize, Ordering},
6 Arc, Mutex,
7};
8
9pub struct WorkerPoolInner {
12 pub worker_path: Arc<str>,
13 pub binary_args: Arc<Vec<String>>,
14 pub workers: Vec<Arc<Mutex<Worker>>>,
15 pub max_workers: usize,
16 pub busy_counter: Arc<AtomicUsize>,
17 pub debug: bool,
18}
19
20impl WorkerPoolInner {
21 pub fn setup(worker_path: &str, max_workers: usize) -> Self {
23 WorkerPoolInner {
24 worker_path: worker_path.into(),
25 binary_args: Arc::new(vec!["node".into()]),
26 workers: Vec::new(),
27 max_workers,
28 busy_counter: Arc::new(AtomicUsize::new(0)),
29 debug: false,
30 }
31 }
32
33 pub fn set_binary(&mut self, binary: &str) {
35 self.binary_args = Arc::new(shell_words::split(binary).expect("couldn't parse binary"));
36 }
37
38 pub fn with_debug(&mut self, debug: bool) {
40 self.debug = debug;
41 }
42
43 pub fn run_worker<P: AsPayload>(&mut self, cmd: String, payload: P) -> WorkerThread {
46 let worker = self.get_available_worker();
47 self.busy_counter.fetch_add(1, Ordering::SeqCst);
48
49 print_debug!(
50 self.debug,
51 "[pool] got worker {}",
52 worker.lock().unwrap().id
53 );
54 let waiting = self.busy_counter.clone();
55 let debug = self.debug;
56 let binary_args = self.binary_args.clone();
57 let payload = payload.to_payload();
58 let file_path = self.worker_path.clone();
59
60 let handle = std::thread::spawn(move || {
61 let worker = worker.clone();
62 let mut worker = worker.lock().unwrap();
63 worker.init(binary_args, file_path).unwrap();
64 let res = worker.perform_task(cmd, payload).expect("perform task");
65 print_debug!(debug, "[pool] performed task on worker {}", worker.id);
66 drop(worker);
67
68 waiting.fetch_sub(1, Ordering::SeqCst);
69 res
70 });
71 WorkerThread::from_handle(handle)
72 }
73
74 pub fn get_available_worker(&mut self) -> Arc<Mutex<Worker>> {
78 let idle_worker = self.workers.iter().find(|w| {
79 if let Ok(w) = w.try_lock() {
80 return w.idle;
81 }
82 false
83 });
84 if let Some(idle_worker) = idle_worker {
85 idle_worker.lock().unwrap().idle = false;
86 print_debug!(self.debug, "[pool] found idle worker");
87 return idle_worker.clone();
88 }
89 if self.workers.len() < self.max_workers {
90 let mut worker = Worker::new(self.workers.len() + 1, self.debug);
91 worker.idle = false;
92 self.workers.push(Arc::new(Mutex::new(worker)));
93 print_debug!(self.debug, "[pool] created new worker");
94 return self.workers.last().unwrap().clone();
95 }
96 print_debug!(self.debug, "[pool] waiting for worker to be free");
97 loop {
98 if self.busy_counter.load(Ordering::SeqCst) == 0 {
99 print_debug!(self.debug, "[pool] pool is free");
100 break;
101 }
102 }
103 self.get_available_worker()
104 }
105
106 pub fn warmup(&mut self, nbr_workers: usize) -> Result<()> {
107 let n = nbr_workers.clamp(0, self.max_workers - self.workers.len());
108 let debug = self.debug;
109 let ln = self.workers.len();
110 let mut handles = Vec::new();
111 for n in 0..n {
112 let id = ln + n + 1;
113 let worker = Worker::new(id, debug);
114 let mutex = Arc::new(Mutex::new(worker));
115 self.workers.push(mutex.clone());
116 print_debug!(debug, "[pool] (warmup) created new worker");
117
118 let binary_args = self.binary_args.clone();
119 let file_path = self.worker_path.clone();
120 let handle = std::thread::spawn(move || {
121 let worker = mutex.clone();
122 let mut worker = worker.lock().unwrap();
123 worker.init(binary_args, file_path).unwrap();
124 worker.wait_for_ready().unwrap();
125 print_debug!(debug, "[pool] (warmup) worker {} initialized", id);
126 });
127 handles.push(handle);
128 }
129 for handle in handles {
130 if handle.join().is_err() {
131 bail!("thread panicked")
132 }
133 }
134 Ok(())
135 }
136}