node_workers/
worker_pool.rs

1use crate::{
2  as_payload::AsPayload, print_debug, worker_pool_inner::WorkerPoolInner,
3  worker_thread::WorkerThread,
4};
5use anyhow::{bail, Result};
6use serde::de::DeserializeOwned;
7use std::{
8  sync::{Arc, Mutex},
9  thread::JoinHandle,
10};
11
12/// A pool of nodejs workers.
13/// Wraps a inner struct inside `Arc<Mutex<T>>` to be able to invoke it's method within a spawned thread.
14/// This is important so that indefinitely blocking methods such as `get_available_workers` can be offloaded.
15pub struct WorkerPool {
16  inner: Arc<Mutex<WorkerPoolInner>>,
17}
18
19impl WorkerPool {
20  /// Create a new workers pool with the maximum numbers of workers that can be spawned for the duration of the program
21  /// ```
22  /// use node_workers::{WorkerPool};
23  ///
24  /// let nbr_max_workers = 4;
25  /// let mut pool = WorkerPool::setup("worker.js", nbr_max_workers);
26  /// ```
27  pub fn setup(worker_path: &str, max_workers: usize) -> Self {
28    WorkerPool {
29      inner: Arc::new(Mutex::new(WorkerPoolInner::setup(worker_path, max_workers))),
30    }
31  }
32
33  /// Configure the binary that's used to run JS workers
34  /// This can be usefull configure node or to run JS via another runtime
35  /// ```rust
36  /// use node_workers::{EmptyPayload, WorkerPool};
37  /// # use std::error::Error;
38  ///
39  /// # fn main() -> Result<(), Box<dyn Error>> {
40  /// let mut pool = WorkerPool::setup("examples/worker.ts", 4);
41  /// pool.set_binary("node -r esbuild-register");
42  /// pool.perform::<(), _>("ping", EmptyPayload::bulk(1))?;
43  /// # Ok(())
44  /// # }
45  /// ```
46  pub fn set_binary(&mut self, binary: &str) {
47    self.inner.lock().unwrap().set_binary(binary);
48  }
49
50  /// Enable or disable logging
51  pub fn with_debug(&mut self, debug: bool) {
52    self.inner.lock().unwrap().with_debug(debug);
53  }
54
55  /// Run a single worker in a thread. This method returns the created thread, not the result of the worker.
56  /// Use this if you need more control on the pool.
57  /// ```
58  /// use node_workers::{WorkerPool};
59  ///
60  /// let mut pool = WorkerPool::setup("examples/worker", 2);
61  /// for n in 1..=4 {
62  ///   pool.run_worker("fib", n * 10);
63  /// }
64  /// println!("not blocking");
65  /// ```
66  ///
67  /// The returned thread optionally holds the serialized result from the worker. This can be deserialized using serde_json in order to
68  /// get a proper result. This is done under the hood for you.
69  /// ```
70  /// use node_workers::{WorkerPool};
71  /// # use std::error::Error;
72  ///
73  /// # fn main() -> Result<(), Box<dyn Error>> {
74  /// let mut pool = WorkerPool::setup("examples/worker", 2);
75  /// let thread = pool.run_worker("fib2", 40u32);
76  /// let result = thread.get_result::<u32>()?;
77  /// println!("run_worker result: {:#?}", result);
78  /// # Ok(())
79  /// # }
80  /// ```
81  pub fn run_worker<P: AsPayload>(&mut self, cmd: &str, payload: P) -> WorkerThread {
82    let payload = payload.to_payload();
83    let cmd = cmd.to_string();
84    let inner = self.inner.clone();
85
86    // spawn a thread so that inner.get_available_worker() doesn't block
87    let handle = std::thread::spawn(move || {
88      let inner = inner.clone();
89      let mut pool = inner.lock().unwrap();
90      let res = pool.run_worker(cmd, payload);
91      drop(pool);
92      res.join().unwrap()
93    });
94    WorkerThread::from_handle(handle)
95  }
96
97  /// Dispatch a task between available workers with a set of payloads.
98  /// This mobilize a worker for each payload. As soon as a worker is free, it'll be assigned right away a new task until all payloads have been processed.
99  /// Contrarily to `run_worker`, this method is blocking and directly return the result from all workers.
100  /// ```
101  /// use node_workers::{WorkerPool};
102  /// # use std::error::Error;
103  ///
104  /// # fn main() -> Result<(), Box<dyn Error>> {
105  /// let mut pool = WorkerPool::setup("examples/worker", 2);
106  /// pool.with_debug(true);
107  /// let payloads = vec![10, 20, 30, 40];
108  /// let result = pool.perform::<u64, _>("fib2", payloads).unwrap();
109  /// println!("result: {:#?}", result);
110  /// # Ok(())
111  /// # }
112  /// ```
113  /// ## Errors
114  ///
115  /// Each worker is run in a thread, and `perform()` will return an error variant if one of them panick.
116  pub fn perform<T: DeserializeOwned, P: AsPayload>(
117    &mut self,
118    cmd: &str,
119    payloads: Vec<P>,
120  ) -> Result<Vec<Option<T>>> {
121    let debug = self.inner.lock().unwrap().debug;
122    print_debug!(debug, "[pool] running tasks");
123    let mut handles = Vec::new();
124    for (n, payload) in payloads.into_iter().map(|x| x.to_payload()).enumerate() {
125      print_debug!(debug, "[pool] (task {}) start of iteration", n);
126      let handle = self
127        .inner
128        .lock()
129        .unwrap()
130        .run_worker(cmd.to_string(), payload);
131      handles.push(handle);
132      print_debug!(debug, "[pool] (task {}) end of iteration", n);
133    }
134
135    handles
136      .into_iter()
137      .enumerate()
138      .map(|(n, x)| {
139        print_debug!(debug, "[pool] (thread {}) joined", n);
140        let res = x.get_result::<T>();
141        if let Ok(res) = res {
142          Ok(res)
143        } else {
144          bail!("failed to join thread")
145        }
146      })
147      .collect::<Result<Vec<_>, _>>()
148  }
149
150  /// Boot a maximum of *n* workers, making them ready to take on a task right away.
151  /// ```rust
152  /// use node_workers::{WorkerPool};
153  ///
154  /// let mut pool = WorkerPool::setup("examples/worker", 2);
155  /// let handle = pool.warmup(2);
156  ///
157  /// //... some intensive task on the main thread
158  ///
159  /// handle.join().expect("Couldn't warmup workers");
160  /// //... task workers
161  /// ```
162  pub fn warmup(&self, nbr_workers: usize) -> JoinHandle<()> {
163    let inner = self.inner.clone();
164    std::thread::spawn(move || inner.lock().unwrap().warmup(nbr_workers).unwrap())
165  }
166}
167
168#[cfg(test)]
169mod tests {
170  use crate::worker_pool::WorkerPool;
171
172  #[test]
173  pub fn create_worker_when_needed() {
174    let pool = WorkerPool::setup("", 1);
175    assert_eq!(pool.inner.lock().unwrap().workers.len(), 0);
176
177    pool.inner.lock().unwrap().get_available_worker();
178    assert_eq!(pool.inner.lock().unwrap().workers.len(), 1);
179  }
180
181  #[test]
182  pub fn same_idle_worker() {
183    let pool = WorkerPool::setup("", 1);
184    let worker = pool.inner.lock().unwrap().get_available_worker();
185    worker.lock().unwrap().idle = true;
186    let worker_id = worker.lock().unwrap().id;
187    let other_worker_id = pool
188      .inner
189      .lock()
190      .unwrap()
191      .get_available_worker()
192      .lock()
193      .unwrap()
194      .id;
195    assert_eq!(worker_id, other_worker_id);
196  }
197
198  #[test]
199  pub fn create_new_worker_when_busy() {
200    let pool = WorkerPool::setup("examples/worker", 2);
201    pool.inner.lock().unwrap().run_worker("fib2".into(), 40);
202
203    let worker_id = pool
204      .inner
205      .lock()
206      .unwrap()
207      .get_available_worker()
208      .lock()
209      .unwrap()
210      .id;
211    println!("got worker_id");
212    assert_eq!(worker_id, 2);
213  }
214
215  #[test]
216  pub fn reuse_worker_when_full() {
217    let pool = WorkerPool::setup("examples/worker", 1);
218    pool.inner.lock().unwrap().run_worker("fib2".into(), 40);
219
220    let worker_id = pool
221      .inner
222      .lock()
223      .unwrap()
224      .get_available_worker()
225      .lock()
226      .unwrap()
227      .id;
228    assert_eq!(worker_id, 1);
229  }
230
231  #[test]
232  pub fn warmup() {
233    let mut pool = WorkerPool::setup("examples/worker", 2);
234    pool.with_debug(true);
235    pool.warmup(2).join().unwrap();
236
237    let workers = pool.inner.lock().unwrap().workers.clone();
238    for worker in workers {
239      assert_eq!(worker.lock().unwrap().ready, true);
240    }
241  }
242
243  #[test]
244  pub fn error_invalid_command() {
245    {
246      let mut pool = WorkerPool::setup("foo", 1);
247      let res = pool.run_worker("fib2", 40).join();
248      println!("{:?}", res);
249      assert_eq!(true, matches!(res, Err(_)));
250    }
251
252    {
253      let mut pool = WorkerPool::setup("foo", 1);
254      let res = pool.perform::<(), _>("fib2", vec![40]);
255      assert_eq!(true, matches!(res, Err(_)));
256    }
257
258    {
259      let pool = WorkerPool::setup("foo", 1);
260      let res = pool.warmup(1).join();
261      assert_eq!(true, matches!(res, Err(_)));
262    }
263  }
264
265  #[test]
266  pub fn error_task_throws() {
267    {
268      let mut pool = WorkerPool::setup("examples/worker", 1);
269      let res = pool.run_worker("error", 40).join();
270      assert_eq!(true, matches!(res, Err(_)));
271    }
272
273    {
274      let mut pool = WorkerPool::setup("examples/worker", 1);
275      let res = pool.perform::<(), _>("error", vec![40]);
276      assert_eq!(true, matches!(res, Err(_)));
277    }
278  }
279
280  #[test]
281  pub fn error_task_not_found() {
282    {
283      let mut pool = WorkerPool::setup("examples/worker", 1);
284      let res = pool.run_worker("no", 40).join();
285      assert_eq!(true, matches!(res, Err(_)));
286    }
287
288    {
289      let mut pool = WorkerPool::setup("examples/worker", 1);
290      let res = pool.perform::<(), _>("no", vec![40]);
291      assert_eq!(true, matches!(res, Err(_)));
292    }
293  }
294}