node_workers/
worker_pool_inner.rs

1use anyhow::{bail, Result};
2
3use crate::{print_debug, worker::Worker, worker_thread::WorkerThread, AsPayload};
4use std::sync::{
5  atomic::{AtomicUsize, Ordering},
6  Arc, Mutex,
7};
8
9/// Struct responsible of the inner working of the pool
10/// Needs to be wrapped in a Arc<Mutex<T>> for manipulations within different threads
11pub struct WorkerPoolInner {
12  pub worker_path: Arc<str>,
13  pub binary_args: Arc<Vec<String>>,
14  pub workers: Vec<Arc<Mutex<Worker>>>,
15  pub max_workers: usize,
16  pub busy_counter: Arc<AtomicUsize>,
17  pub debug: bool,
18}
19
20impl WorkerPoolInner {
21  /// Create a new pool with some parameters
22  pub fn setup(worker_path: &str, max_workers: usize) -> Self {
23    WorkerPoolInner {
24      worker_path: worker_path.into(),
25      binary_args: Arc::new(vec!["node".into()]),
26      workers: Vec::new(),
27      max_workers,
28      busy_counter: Arc::new(AtomicUsize::new(0)),
29      debug: false,
30    }
31  }
32
33  /// Refers to `WorkerPool::set_binary` for documentation
34  pub fn set_binary(&mut self, binary: &str) {
35    self.binary_args = Arc::new(shell_words::split(binary).expect("couldn't parse binary"));
36  }
37
38  /// Refers to `WorkerPool::with_debug` for documentation
39  pub fn with_debug(&mut self, debug: bool) {
40    self.debug = debug;
41  }
42
43  /// Run a worker in a new thread. However, `get_available_worker` is executed on the main thread
44  /// and therefor can block if the pool is waiting for an idle worker.
45  pub fn run_worker<P: AsPayload>(&mut self, cmd: String, payload: P) -> WorkerThread {
46    let worker = self.get_available_worker();
47    self.busy_counter.fetch_add(1, Ordering::SeqCst);
48
49    print_debug!(
50      self.debug,
51      "[pool] got worker {}",
52      worker.lock().unwrap().id
53    );
54    let waiting = self.busy_counter.clone();
55    let debug = self.debug;
56    let binary_args = self.binary_args.clone();
57    let payload = payload.to_payload();
58    let file_path = self.worker_path.clone();
59
60    let handle = std::thread::spawn(move || {
61      let worker = worker.clone();
62      let mut worker = worker.lock().unwrap();
63      worker.init(binary_args, file_path).unwrap();
64      let res = worker.perform_task(cmd, payload).expect("perform task");
65      print_debug!(debug, "[pool] performed task on worker {}", worker.id);
66      drop(worker);
67
68      waiting.fetch_sub(1, Ordering::SeqCst);
69      res
70    });
71    WorkerThread::from_handle(handle)
72  }
73
74  /// Find an idle worker that can take on a task.
75  /// If no worker is free, and the capacity of the pool is not reached yet, a new worker is created.
76  /// However, if the capacity is reached, this method will wait (and block) until a worker is idle.
77  pub fn get_available_worker(&mut self) -> Arc<Mutex<Worker>> {
78    let idle_worker = self.workers.iter().find(|w| {
79      if let Ok(w) = w.try_lock() {
80        return w.idle;
81      }
82      false
83    });
84    if let Some(idle_worker) = idle_worker {
85      idle_worker.lock().unwrap().idle = false;
86      print_debug!(self.debug, "[pool] found idle worker");
87      return idle_worker.clone();
88    }
89    if self.workers.len() < self.max_workers {
90      let mut worker = Worker::new(self.workers.len() + 1, self.debug);
91      worker.idle = false;
92      self.workers.push(Arc::new(Mutex::new(worker)));
93      print_debug!(self.debug, "[pool] created new worker");
94      return self.workers.last().unwrap().clone();
95    }
96    print_debug!(self.debug, "[pool] waiting for worker to be free");
97    loop {
98      if self.busy_counter.load(Ordering::SeqCst) == 0 {
99        print_debug!(self.debug, "[pool] pool is free");
100        break;
101      }
102    }
103    self.get_available_worker()
104  }
105
106  pub fn warmup(&mut self, nbr_workers: usize) -> Result<()> {
107    let n = nbr_workers.clamp(0, self.max_workers - self.workers.len());
108    let debug = self.debug;
109    let ln = self.workers.len();
110    let mut handles = Vec::new();
111    for n in 0..n {
112      let id = ln + n + 1;
113      let worker = Worker::new(id, debug);
114      let mutex = Arc::new(Mutex::new(worker));
115      self.workers.push(mutex.clone());
116      print_debug!(debug, "[pool] (warmup) created new worker");
117
118      let binary_args = self.binary_args.clone();
119      let file_path = self.worker_path.clone();
120      let handle = std::thread::spawn(move || {
121        let worker = mutex.clone();
122        let mut worker = worker.lock().unwrap();
123        worker.init(binary_args, file_path).unwrap();
124        worker.wait_for_ready().unwrap();
125        print_debug!(debug, "[pool] (warmup) worker {} initialized", id);
126      });
127      handles.push(handle);
128    }
129    for handle in handles {
130      if handle.join().is_err() {
131        bail!("thread panicked")
132      }
133    }
134    Ok(())
135  }
136}