node_workers/
worker_pool.rs1use crate::{
2 as_payload::AsPayload, print_debug, worker_pool_inner::WorkerPoolInner,
3 worker_thread::WorkerThread,
4};
5use anyhow::{bail, Result};
6use serde::de::DeserializeOwned;
7use std::{
8 sync::{Arc, Mutex},
9 thread::JoinHandle,
10};
11
12pub struct WorkerPool {
16 inner: Arc<Mutex<WorkerPoolInner>>,
17}
18
19impl WorkerPool {
20 pub fn setup(worker_path: &str, max_workers: usize) -> Self {
28 WorkerPool {
29 inner: Arc::new(Mutex::new(WorkerPoolInner::setup(worker_path, max_workers))),
30 }
31 }
32
33 pub fn set_binary(&mut self, binary: &str) {
47 self.inner.lock().unwrap().set_binary(binary);
48 }
49
50 pub fn with_debug(&mut self, debug: bool) {
52 self.inner.lock().unwrap().with_debug(debug);
53 }
54
55 pub fn run_worker<P: AsPayload>(&mut self, cmd: &str, payload: P) -> WorkerThread {
82 let payload = payload.to_payload();
83 let cmd = cmd.to_string();
84 let inner = self.inner.clone();
85
86 let handle = std::thread::spawn(move || {
88 let inner = inner.clone();
89 let mut pool = inner.lock().unwrap();
90 let res = pool.run_worker(cmd, payload);
91 drop(pool);
92 res.join().unwrap()
93 });
94 WorkerThread::from_handle(handle)
95 }
96
97 pub fn perform<T: DeserializeOwned, P: AsPayload>(
117 &mut self,
118 cmd: &str,
119 payloads: Vec<P>,
120 ) -> Result<Vec<Option<T>>> {
121 let debug = self.inner.lock().unwrap().debug;
122 print_debug!(debug, "[pool] running tasks");
123 let mut handles = Vec::new();
124 for (n, payload) in payloads.into_iter().map(|x| x.to_payload()).enumerate() {
125 print_debug!(debug, "[pool] (task {}) start of iteration", n);
126 let handle = self
127 .inner
128 .lock()
129 .unwrap()
130 .run_worker(cmd.to_string(), payload);
131 handles.push(handle);
132 print_debug!(debug, "[pool] (task {}) end of iteration", n);
133 }
134
135 handles
136 .into_iter()
137 .enumerate()
138 .map(|(n, x)| {
139 print_debug!(debug, "[pool] (thread {}) joined", n);
140 let res = x.get_result::<T>();
141 if let Ok(res) = res {
142 Ok(res)
143 } else {
144 bail!("failed to join thread")
145 }
146 })
147 .collect::<Result<Vec<_>, _>>()
148 }
149
150 pub fn warmup(&self, nbr_workers: usize) -> JoinHandle<()> {
163 let inner = self.inner.clone();
164 std::thread::spawn(move || inner.lock().unwrap().warmup(nbr_workers).unwrap())
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use crate::worker_pool::WorkerPool;
171
172 #[test]
173 pub fn create_worker_when_needed() {
174 let pool = WorkerPool::setup("", 1);
175 assert_eq!(pool.inner.lock().unwrap().workers.len(), 0);
176
177 pool.inner.lock().unwrap().get_available_worker();
178 assert_eq!(pool.inner.lock().unwrap().workers.len(), 1);
179 }
180
181 #[test]
182 pub fn same_idle_worker() {
183 let pool = WorkerPool::setup("", 1);
184 let worker = pool.inner.lock().unwrap().get_available_worker();
185 worker.lock().unwrap().idle = true;
186 let worker_id = worker.lock().unwrap().id;
187 let other_worker_id = pool
188 .inner
189 .lock()
190 .unwrap()
191 .get_available_worker()
192 .lock()
193 .unwrap()
194 .id;
195 assert_eq!(worker_id, other_worker_id);
196 }
197
198 #[test]
199 pub fn create_new_worker_when_busy() {
200 let pool = WorkerPool::setup("examples/worker", 2);
201 pool.inner.lock().unwrap().run_worker("fib2".into(), 40);
202
203 let worker_id = pool
204 .inner
205 .lock()
206 .unwrap()
207 .get_available_worker()
208 .lock()
209 .unwrap()
210 .id;
211 println!("got worker_id");
212 assert_eq!(worker_id, 2);
213 }
214
215 #[test]
216 pub fn reuse_worker_when_full() {
217 let pool = WorkerPool::setup("examples/worker", 1);
218 pool.inner.lock().unwrap().run_worker("fib2".into(), 40);
219
220 let worker_id = pool
221 .inner
222 .lock()
223 .unwrap()
224 .get_available_worker()
225 .lock()
226 .unwrap()
227 .id;
228 assert_eq!(worker_id, 1);
229 }
230
231 #[test]
232 pub fn warmup() {
233 let mut pool = WorkerPool::setup("examples/worker", 2);
234 pool.with_debug(true);
235 pool.warmup(2).join().unwrap();
236
237 let workers = pool.inner.lock().unwrap().workers.clone();
238 for worker in workers {
239 assert_eq!(worker.lock().unwrap().ready, true);
240 }
241 }
242
243 #[test]
244 pub fn error_invalid_command() {
245 {
246 let mut pool = WorkerPool::setup("foo", 1);
247 let res = pool.run_worker("fib2", 40).join();
248 println!("{:?}", res);
249 assert_eq!(true, matches!(res, Err(_)));
250 }
251
252 {
253 let mut pool = WorkerPool::setup("foo", 1);
254 let res = pool.perform::<(), _>("fib2", vec![40]);
255 assert_eq!(true, matches!(res, Err(_)));
256 }
257
258 {
259 let pool = WorkerPool::setup("foo", 1);
260 let res = pool.warmup(1).join();
261 assert_eq!(true, matches!(res, Err(_)));
262 }
263 }
264
265 #[test]
266 pub fn error_task_throws() {
267 {
268 let mut pool = WorkerPool::setup("examples/worker", 1);
269 let res = pool.run_worker("error", 40).join();
270 assert_eq!(true, matches!(res, Err(_)));
271 }
272
273 {
274 let mut pool = WorkerPool::setup("examples/worker", 1);
275 let res = pool.perform::<(), _>("error", vec![40]);
276 assert_eq!(true, matches!(res, Err(_)));
277 }
278 }
279
280 #[test]
281 pub fn error_task_not_found() {
282 {
283 let mut pool = WorkerPool::setup("examples/worker", 1);
284 let res = pool.run_worker("no", 40).join();
285 assert_eq!(true, matches!(res, Err(_)));
286 }
287
288 {
289 let mut pool = WorkerPool::setup("examples/worker", 1);
290 let res = pool.perform::<(), _>("no", vec![40]);
291 assert_eq!(true, matches!(res, Err(_)));
292 }
293 }
294}