recoverable_thread_pool/thread_pool/impl.rs
1use crate::*;
2
3/// Sync implementation of thread pool operations.
4impl ThreadPool {
5 /// Creates a new thread pool with the specified number of workers.
6 ///
7 /// # Arguments
8 ///
9 /// - `usize` - The number of worker threads to create.
10 ///
11 /// # Returns
12 ///
13 /// - `ThreadPool` - The new thread pool instance.
14 pub fn new(size: usize) -> ThreadPool {
15 let (sender, receiver) = mpsc::channel();
16 let receiver: Arc<Mutex<Receiver<ThreadPoolJob>>> = Arc::new(Mutex::new(receiver));
17 let mut workers: Vec<Worker> = Vec::with_capacity(size);
18 let mut id: usize = 0;
19 loop {
20 if id >= size {
21 break;
22 }
23 let worker: Option<Worker> = Worker::new(id, Arc::clone(&receiver));
24 if worker.is_some() {
25 workers.push(worker.unwrap_or_default());
26 id += 1;
27 }
28 }
29 ThreadPool { workers, sender }
30 }
31
32 /// Executes a synchronous job in the thread pool.
33 ///
34 /// # Arguments
35 ///
36 /// - `F` - The synchronous function to execute.
37 ///
38 /// # Returns
39 ///
40 /// - `SendResult` - Result of the job submission.
41 pub fn execute<F>(&self, job: F) -> SendResult
42 where
43 F: RecoverableFunction,
44 {
45 let job_with_handler: ThreadPoolJob = Box::new(move || {
46 let _ = run_function(job);
47 });
48 self.sender.send(job_with_handler)
49 }
50
51 /// Executes a synchronous job with error handling in the thread pool.
52 ///
53 /// # Arguments
54 ///
55 /// - `F` - The synchronous function to execute.
56 /// - `E` - The error handler function.
57 ///
58 /// # Returns
59 ///
60 /// - `SendResult` - Result of the job submission.
61 pub fn execute_with_catch<F, E>(&self, job: F, handle_error: E) -> SendResult
62 where
63 F: RecoverableFunction,
64 E: ErrorHandlerFunction,
65 {
66 let job_with_handler: ThreadPoolJob = Box::new(move || {
67 if let Err(err) = run_function(job) {
68 let err_string: String = spawn_error_to_string(&err);
69 let _ = run_error_handle_function(handle_error, &err_string);
70 }
71 });
72 self.sender.send(job_with_handler)
73 }
74
75 /// Executes a synchronous job with error handling and finalization in the thread pool.
76 ///
77 /// # Arguments
78 ///
79 /// - `F` - The synchronous function to execute.
80 /// - `E` - The error handler function.
81 /// - `L` - The finally handler function.
82 ///
83 /// # Returns
84 ///
85 /// - `SendResult` - Result of the job submission.
86 pub fn execute_with_catch_finally<F, E, L>(
87 &self,
88 job: F,
89 handle_error: E,
90 finally: L,
91 ) -> SendResult
92 where
93 F: RecoverableFunction,
94 E: ErrorHandlerFunction,
95 L: RecoverableFunction,
96 {
97 let job_with_handler: ThreadPoolJob = Box::new(move || {
98 if let Err(err) = run_function(job) {
99 let err_string: String = spawn_error_to_string(&err);
100 let _ = run_error_handle_function(handle_error, &err_string);
101 }
102 let _ = run_function(finally);
103 });
104 self.sender.send(job_with_handler)
105 }
106
107 /// Executes an async job in the thread pool.
108 ///
109 /// # Arguments
110 ///
111 /// - `F` - The async function to execute.
112 ///
113 /// # Returns
114 ///
115 /// - `SendResult` - Result of the job submission.
116 pub fn async_execute<F>(&self, job: F) -> SendResult
117 where
118 F: AsyncRecoverableFunction,
119 {
120 let job_with_handler = Box::new(move || {
121 Builder::new_current_thread()
122 .enable_all()
123 .build()
124 .unwrap()
125 .block_on(async move {
126 let _ = async_run_function(move || async {
127 job.call().await;
128 })
129 .await;
130 });
131 });
132 self.sender.send(job_with_handler)
133 }
134
135 /// Executes an async job with error handling in the thread pool.
136 ///
137 /// # Arguments
138 ///
139 /// - `F` - The async function to execute.
140 /// - `E` - The async error handler function.
141 ///
142 /// # Returns
143 ///
144 /// - `SendResult` - Result of the job submission.
145 pub fn async_execute_with_catch<F, E>(&self, job: F, handle_error: E) -> SendResult
146 where
147 F: AsyncRecoverableFunction,
148 E: AsyncErrorHandlerFunction,
149 {
150 let job_with_handler = Box::new(move || {
151 Builder::new_current_thread()
152 .enable_all()
153 .build()
154 .unwrap()
155 .block_on(async move {
156 let run_result: AsyncSpawnResult = async_run_function(move || async {
157 job.call().await;
158 })
159 .await;
160 if let Err(err) = run_result {
161 let err_string: String = tokio_error_to_string(&err);
162 let _: AsyncSpawnResult = async_run_error_handle_function(
163 move |err_str| async move {
164 handle_error.call(err_str).await;
165 },
166 Arc::new(err_string),
167 )
168 .await;
169 }
170 });
171 });
172 self.sender.send(job_with_handler)
173 }
174
175 /// Executes an async job with error handling and finalization in the thread pool.
176 ///
177 /// # Arguments
178 ///
179 /// - `F` - The async function to execute.
180 /// - `E` - The async error handler function.
181 /// - `L` - The async finally handler function.
182 ///
183 /// # Returns
184 ///
185 /// - `SendResult` - Result of the job submission.
186 pub fn async_execute_with_catch_finally<F, E, L>(
187 &self,
188 job: F,
189 handle_error: E,
190 finally: L,
191 ) -> SendResult
192 where
193 F: AsyncRecoverableFunction,
194 E: AsyncErrorHandlerFunction,
195 L: AsyncRecoverableFunction,
196 {
197 let job_with_handler = Box::new(move || {
198 Builder::new_current_thread()
199 .enable_all()
200 .build()
201 .unwrap()
202 .block_on(async move {
203 let run_result: AsyncSpawnResult = async_run_function(move || async {
204 job.call().await;
205 })
206 .await;
207 if let Err(err) = run_result {
208 let err_string: String = tokio_error_to_string(&err);
209 let _: AsyncSpawnResult = async_run_error_handle_function(
210 move |err_str| async move {
211 handle_error.call(err_str).await;
212 },
213 Arc::new(err_string),
214 )
215 .await;
216 }
217 let _: AsyncSpawnResult = async_run_function(move || async {
218 finally.call().await;
219 })
220 .await;
221 });
222 });
223 self.sender.send(job_with_handler)
224 }
225}