recoverable_thread_pool/thread_pool/
sync.rs

1use super::r#type::ThreadPool;
2use crate::{SendResult, ThreadPoolJob, worker::r#type::Worker};
3use recoverable_spawn::*;
4use std::sync::{
5    Arc, Mutex,
6    mpsc::{self, Receiver},
7};
8
9impl ThreadPool {
10    pub fn new(size: usize) -> ThreadPool {
11        let (sender, receiver) = mpsc::channel();
12        let receiver: Arc<Mutex<Receiver<ThreadPoolJob>>> = Arc::new(Mutex::new(receiver));
13        let mut workers: Vec<Worker> = Vec::with_capacity(size);
14        let mut id: usize = 0;
15        loop {
16            if id >= size {
17                break;
18            }
19            let worker: Option<Worker> = Worker::new(id, Arc::clone(&receiver));
20            if worker.is_some() {
21                workers.push(worker.unwrap_or_default());
22                id += 1;
23            }
24        }
25        ThreadPool { workers, sender }
26    }
27
28    pub fn execute<F>(&self, job: F) -> SendResult
29    where
30        F: RecoverableFunction,
31    {
32        let job_with_handler: ThreadPoolJob = Box::new(move || {
33            let _ = sync::run_function(job);
34        });
35        self.sender.send(job_with_handler)
36    }
37
38    pub fn execute_with_catch<F, E>(&self, job: F, handle_error: E) -> SendResult
39    where
40        F: RecoverableFunction,
41        E: ErrorHandlerFunction,
42    {
43        let job_with_handler: ThreadPoolJob = Box::new(move || {
44            if let Err(err) = sync::run_function(job) {
45                let err_string: String = sync::spawn_error_to_string(&err);
46                let _ = sync::run_error_handle_function(handle_error, &err_string);
47            }
48        });
49        self.sender.send(job_with_handler)
50    }
51
52    pub fn execute_with_catch_finally<F, E, L>(
53        &self,
54        job: F,
55        handle_error: E,
56        finally: L,
57    ) -> SendResult
58    where
59        F: RecoverableFunction,
60        E: ErrorHandlerFunction,
61        L: RecoverableFunction,
62    {
63        let job_with_handler: ThreadPoolJob = Box::new(move || {
64            if let Err(err) = sync::run_function(job) {
65                let err_string: String = sync::spawn_error_to_string(&err);
66                let _ = sync::run_error_handle_function(handle_error, &err_string);
67            }
68            let _ = sync::run_function(finally);
69        });
70        self.sender.send(job_with_handler)
71    }
72}