thread_manager/manager.rs
1use std::sync::Arc;
2
3use crate::assert::assert_wpc;
4use crate::channel::JobChannel;
5use crate::channel::ResultChannel;
6use crate::dispatch::DispatchCycle;
7use crate::iterator::ResultIter;
8use crate::iterator::YieldResultIter;
9use crate::status::ManagerStatus;
10use crate::worker::ThreadWorker;
11
12type FnType<T> = Box<dyn Fn() -> T + Send + 'static>;
13
14/// A thread manager for executing jobs in parallel.
15/// This struct manages a pool of worker threads and distributes jobs among them.
16///
17/// # Type Parameters
18/// - `F`: The type of the function or closure that the threads will execute.
19/// - `T`: The type of the value returned by the function or closure.
20///
21/// # Fields
22/// - `wpc`: The number of Workers-Per-Channel.
23/// - `dispatch`: An instance of `DispatchCycle` to manage job distribution.
24/// - `workers`: A vector of `ThreadWorker` instances representing the worker threads.
25/// - `channels`: A vector of job channels for dispatching jobs to workers.
26/// - `result_channel`: A channel for collecting the results of the jobs.
27/// - `manager_status`: An instance of `ManagerStatus` to track the status of the manager.
28pub struct ThreadManagerCore<F, T>
29where
30 F: Fn() -> T + Send + 'static,
31 T: Send + 'static,
32{
33 wpc: usize,
34 dispatch: DispatchCycle,
35 workers: Vec<ThreadWorker<F, T>>,
36 channels: Vec<Arc<JobChannel<F>>>,
37 result_channel: Arc<ResultChannel<T>>,
38 manager_status: Arc<ManagerStatus>,
39}
40
41impl<F, T> ThreadManagerCore<F, T>
42where
43 F: Fn() -> T + Send + 'static,
44 T: Send + 'static,
45{
46 /// Creates a new instance of `ThreadManagerCore` with a specified number of worker threads.
47 ///
48 /// # Arguments
49 /// - `size`: The number of worker threads to create.
50 ///
51 /// # Returns
52 /// A new instance of `ThreadManagerCore`.
53 pub fn new(size: usize) -> Self {
54 let dispatch: DispatchCycle = DispatchCycle::new(size);
55 let workers: Vec<ThreadWorker<F, T>> = Vec::with_capacity(size);
56 let channels: Vec<Arc<JobChannel<F>>> = Vec::with_capacity(size);
57 let result_channel: Arc<ResultChannel<T>> = Arc::new(ResultChannel::new());
58 let manager_status: Arc<ManagerStatus> = Arc::new(ManagerStatus::new());
59
60 let mut manager: ThreadManagerCore<F, T> = Self {
61 wpc: 1,
62 dispatch,
63 workers,
64 channels,
65 result_channel,
66 manager_status,
67 };
68 manager.create_workers(size);
69 manager
70 }
71
72 /// Creates a new instance of `ThreadManagerCore` with a specified number of worker threads
73 /// and a specific workers-per-channel ratio.
74 ///
75 /// # Arguments
76 /// - `size`: The number of worker threads to create.
77 /// - `wpc`: The number of workers per channel.
78 ///
79 /// # Returns
80 /// A new instance of `ThreadManagerCore` with the specified configuration.
81 pub fn new_asymmetric(size: usize, wpc: usize) -> Self {
82 assert_wpc(size, wpc);
83 let dispatch: DispatchCycle = DispatchCycle::new(size);
84 let workers: Vec<ThreadWorker<F, T>> = Vec::with_capacity(size);
85 let channels: Vec<Arc<JobChannel<F>>> = Vec::with_capacity(size);
86 let result_channel: Arc<ResultChannel<T>> = Arc::new(ResultChannel::new());
87 let manager_status: Arc<ManagerStatus> = Arc::new(ManagerStatus::new());
88
89 let mut manager: ThreadManagerCore<F, T> = Self {
90 wpc,
91 dispatch,
92 workers,
93 channels,
94 result_channel,
95 manager_status,
96 };
97 manager.create_workers(size);
98 manager
99 }
100
101 /// Executes a given function by sending it to an available worker thread.
102 ///
103 /// # Arguments
104 /// - `function`: The function to be executed by the worker thread.
105 pub fn execute(&self, function: F) {
106 let id: usize = self.dispatch.fetch_and_update();
107 let worker: &ThreadWorker<F, T> = &self.workers[id];
108 worker.send(function);
109 }
110
111 /// Resizes the pool of worker threads.
112 ///
113 /// # Arguments
114 /// - `size`: The new size of the worker pool.
115 pub fn resize(&mut self, size: usize) {
116 assert_wpc(size, self.wpc);
117 let dispatch_size: usize = self.dispatch.fetch_size();
118
119 if size > self.workers.len() {
120 let additional_size: usize = size - self.workers.len();
121 self.start_workers(dispatch_size, self.workers.len());
122 self.create_workers(additional_size);
123 self.dispatch.set_size(size);
124 } else if size < dispatch_size {
125 self.send_release_workers(size, dispatch_size);
126 self.dispatch.set_size(size);
127 } else if size > dispatch_size {
128 self.start_workers(dispatch_size, size);
129 self.dispatch.set_size(size);
130 }
131 }
132}
133
134impl<F, T> ThreadManagerCore<F, T>
135where
136 F: Fn() -> T + Send + 'static,
137 T: Send + 'static,
138{
139 fn get_channel(&self, id: usize) -> Arc<JobChannel<F>> {
140 let channel_id: usize = id / self.wpc;
141 self.channels[channel_id].clone()
142 }
143
144 fn create_channels(&mut self, size: usize) {
145 for _ in 0..(size / self.wpc) {
146 let channel: JobChannel<F> = JobChannel::new();
147 let channel: Arc<JobChannel<F>> = Arc::new(channel);
148 self.channels.push(channel);
149 }
150 }
151
152 fn create_workers(&mut self, size: usize) {
153 self.create_channels(size);
154 let worker_size: usize = self.workers.len();
155
156 for idx in 0..size {
157 let id: usize = idx + worker_size;
158 let job_channel: Arc<JobChannel<F>> = self.get_channel(id);
159 let result_channel: Arc<ResultChannel<T>> = self.result_channel.clone();
160 let manager_status: Arc<ManagerStatus> = self.manager_status.clone();
161 let worker: ThreadWorker<F, T> =
162 ThreadWorker::new(id, job_channel, result_channel, manager_status);
163
164 worker.start();
165 self.workers.push(worker);
166 }
167 }
168}
169
170impl<F, T> ThreadManagerCore<F, T>
171where
172 F: Fn() -> T + Send + 'static,
173 T: Send + 'static,
174{
175 /// Joins all worker threads, effectively blocking the current thread until all worker threads have completed their execution.
176 ///
177 /// # Note
178 /// This method will block the current thread until all worker threads have finished processing their jobs.
179 pub fn join(&self) {
180 self.send_release_workers(0, self.workers.len());
181 self.join_workers(0, self.workers.len());
182 self.clear_channels(0, self.channels.len());
183 }
184
185 /// Terminates all worker threads gracefully.
186 ///
187 /// # Note
188 /// This method will block until the currently executing job among threads is completed.
189 pub fn terminate_all(&self) {
190 self.set_termination_workers(0, self.workers.len());
191 self.send_release_workers(0, self.workers.len());
192 self.join_workers(0, self.workers.len());
193 self.clear_channels(0, self.channels.len());
194 }
195
196 /// Provides the job distribution across the worker threads.
197 ///
198 /// # Returns
199 /// A vector containing the count of jobs executed by each worker thread.
200 pub fn job_distribution(&self) -> Vec<usize> {
201 let mut distribution: Vec<usize> = Vec::with_capacity(self.workers.len());
202 for worker in self.workers.iter() {
203 distribution.push(worker.status().received());
204 }
205 distribution
206 }
207
208 /// Checks if all jobs have been finished.
209 ///
210 /// # Returns
211 /// `true` if all jobs are finished, `false` otherwise.
212 pub fn has_finished(&self) -> bool {
213 for job_channel in self.channels.iter() {
214 if !job_channel.is_finished() {
215 return false;
216 }
217 }
218 true
219 }
220
221 /// Retrieves an iterator over the results of completed jobs.
222 ///
223 /// # Returns
224 /// An iterator (`ResultIter`) over the results of the jobs that have been completed.
225 pub fn results<'a>(&'a self) -> ResultIter<'a, T> {
226 ResultIter::new(&self.result_channel)
227 }
228
229 /// Retrieves an iterator that yields results as they become available.
230 ///
231 /// # Returns
232 /// An iterator (`YieldResultIter`) that yields results from worker threads.
233 /// This method blocks for each result until the job queue is complete.
234 pub fn yield_results<'a>(&'a self) -> YieldResultIter<'a, F, T> {
235 YieldResultIter::new(&self.workers, &self.result_channel)
236 }
237
238 /// Returns the number of active worker threads (both busy and waiting).
239 ///
240 /// # Returns
241 /// The total number of active worker threads.
242 pub fn active_threads(&self) -> usize {
243 self.manager_status.active_threads()
244 }
245
246 /// Returns the number of worker threads that are currently busy executing a job.
247 ///
248 /// # Returns
249 /// The number of busy worker threads.
250 pub fn busy_threads(&self) -> usize {
251 self.manager_status.busy_threads()
252 }
253
254 /// Returns the number of worker threads that are currently waiting for a job.
255 ///
256 /// # Returns
257 /// The number of waiting worker threads.
258 pub fn waiting_threads(&self) -> usize {
259 self.manager_status.waiting_threads()
260 }
261
262 /// Returns the number of jobs currently in the queue waiting to be executed.
263 ///
264 /// # Returns
265 /// The size of the job queue.
266 pub fn job_queue(&self) -> usize {
267 let mut queue: usize = 0;
268 for job_channel in self.channels.iter() {
269 queue += job_channel.status().pending();
270 }
271 queue
272 }
273
274 /// Returns the total number of jobs that have been sent to worker threads.
275 ///
276 /// # Returns
277 /// The number of sent jobs.
278 pub fn sent_jobs(&self) -> usize {
279 let mut sent: usize = 0;
280 for job_channel in self.channels.iter() {
281 sent += job_channel.status().sent();
282 }
283 sent
284 }
285
286 /// Returns the total number of jobs that have been received by worker threads.
287 ///
288 /// # Returns
289 /// The number of received jobs.
290 pub fn received_jobs(&self) -> usize {
291 let mut received: usize = 0;
292 for job_channel in self.channels.iter() {
293 received += job_channel.status().received();
294 }
295 received
296 }
297
298 /// Returns the total number of jobs that have been concluded by worker threads.
299 ///
300 /// # Returns
301 /// The number of concluded jobs.
302 pub fn concluded_jobs(&self) -> usize {
303 let mut concluded: usize = 0;
304 for job_channel in self.channels.iter() {
305 concluded += job_channel.status().concluded();
306 }
307 concluded
308 }
309}
310
311impl<F, T> ThreadManagerCore<F, T>
312where
313 F: Fn() -> T + Send + 'static,
314 T: Send + 'static,
315{
316 fn start_workers(&self, st: usize, en: usize) {
317 for worker in self.workers[st..en].iter() {
318 worker.start();
319 }
320 }
321
322 fn join_workers(&self, st: usize, en: usize) {
323 for worker in self.workers[st..en].iter() {
324 worker.join();
325 }
326 }
327
328 fn clear_channels(&self, st: usize, en: usize) {
329 for job_channel in self.channels[st..en].iter() {
330 job_channel.clear();
331 }
332 }
333
334 fn set_termination_workers(&self, st: usize, en: usize) {
335 for worker in self.workers[st..en].iter() {
336 worker.set_termination(true);
337 }
338 }
339
340 fn send_release_workers(&self, st: usize, en: usize) {
341 for worker in self.workers[st..en].iter() {
342 worker.send_release();
343 }
344 }
345}
346
347impl<F, T> Drop for ThreadManagerCore<F, T>
348where
349 F: Fn() -> T + Send + 'static,
350 T: Send + 'static,
351{
352 fn drop(&mut self) {
353 self.terminate_all();
354 }
355}
356
357/// A dynamic dispatch version of `ThreadManagerCore` for managing threads that execute functions
358/// returning a specific type `T`.
359///
360/// # Type Parameters
361/// - `T`: The type of the value returned by the functions executed by the threads.
362pub struct ThreadManager<T>
363where
364 T: Send + 'static,
365{
366 manager: ThreadManagerCore<FnType<T>, T>,
367}
368
369impl<T> ThreadManager<T>
370where
371 T: Send + 'static,
372{
373 /// Creates a new instance of `ThreadManager` with a specified number of worker threads.
374 ///
375 /// # Arguments
376 /// - `size`: The number of worker threads to create.
377 ///
378 /// # Returns
379 /// A new instance of `ThreadManager`.
380 pub fn new(size: usize) -> Self {
381 let manager: ThreadManagerCore<FnType<T>, T> = ThreadManagerCore::new(size);
382 Self { manager }
383 }
384
385 /// Creates a new instance of `ThreadManager` with a specified number of worker threads
386 /// and a specific workers-per-channel ratio.
387 ///
388 /// # Arguments
389 /// - `size`: The number of worker threads to create.
390 /// - `wpc`: The number of workers per channel.
391 ///
392 /// # Returns
393 /// A new instance of `ThreadManager` with the specified configuration.
394 pub fn new_asymmetric(size: usize, wpc: usize) -> Self {
395 let manager: ThreadManagerCore<FnType<T>, T> = ThreadManagerCore::new_asymmetric(size, wpc);
396 Self { manager }
397 }
398
399 /// Executes a given function by sending it to an available worker thread.
400 ///
401 /// # Type Parameters
402 /// - `F`: The type of the function to execute.
403 ///
404 /// # Arguments
405 /// - `function`: The function to be executed by the worker thread.
406 pub fn execute<F>(&self, function: F)
407 where
408 F: Fn() -> T + Send + 'static,
409 {
410 self.manager.execute(Box::new(function))
411 }
412
413 /// Resizes the pool of worker threads.
414 ///
415 /// # Arguments
416 /// - `size`: The new size of the worker pool.
417 pub fn resize(&mut self, size: usize) {
418 self.manager.resize(size)
419 }
420
421 /// Joins all worker threads, effectively blocking the current thread until all worker threads have completed their execution.
422 ///
423 /// # Note
424 /// This method will block the current thread until all worker threads have finished processing their jobs.
425 pub fn join(&self) {
426 self.manager.join();
427 }
428
429 /// Terminates all worker threads gracefully.
430 ///
431 /// # Note
432 /// This method will block until the currently executing job among threads is completed.
433 pub fn terminate_all(&self) {
434 self.manager.terminate_all()
435 }
436
437 /// Provides the job distribution across the worker threads.
438 ///
439 /// # Returns
440 /// A vector containing the count of jobs executed by each worker thread.
441 pub fn job_distribution(&self) -> Vec<usize> {
442 self.manager.job_distribution()
443 }
444
445 /// Checks if all jobs have been finished.
446 ///
447 /// # Returns
448 /// `true` if all jobs are finished, `false` otherwise.
449 pub fn has_finished(&self) -> bool {
450 self.manager.has_finished()
451 }
452
453 /// Retrieves an iterator over the results of completed jobs.
454 ///
455 /// # Returns
456 /// An iterator (`ResultIter`) over the results of the jobs that have been completed.
457 pub fn results<'a>(&'a self) -> ResultIter<'a, T> {
458 self.manager.results()
459 }
460
461 /// Retrieves an iterator that yields results as they become available.
462 ///
463 /// # Returns
464 /// An iterator (`YieldResultIter`) that yields results from worker threads.
465 /// This method blocks for each result until the job queue is complete.
466 pub fn yield_results<'a>(&'a self) -> YieldResultIter<'a, FnType<T>, T> {
467 self.manager.yield_results()
468 }
469
470 /// Returns the number of active worker threads (both busy and waiting).
471 ///
472 /// # Returns
473 /// The total number of active worker threads.
474 pub fn active_threads(&self) -> usize {
475 self.manager.active_threads()
476 }
477
478 /// Returns the number of worker threads that are currently busy executing a job.
479 ///
480 /// # Returns
481 /// The number of busy worker threads.
482 pub fn busy_threads(&self) -> usize {
483 self.manager.busy_threads()
484 }
485
486 /// Returns the number of worker threads that are currently waiting for a job.
487 ///
488 /// # Returns
489 /// The number of waiting worker threads.
490 pub fn waiting_threads(&self) -> usize {
491 self.manager.waiting_threads()
492 }
493
494 /// Returns the number of jobs currently in the queue waiting to be executed.
495 ///
496 /// # Returns
497 /// The size of the job queue.
498 pub fn job_queue(&self) -> usize {
499 self.manager.job_queue()
500 }
501
502 /// Returns the total number of jobs that have been sent to worker threads.
503 ///
504 /// # Returns
505 /// The number of sent jobs.
506 pub fn sent_jobs(&self) -> usize {
507 self.manager.sent_jobs()
508 }
509
510 /// Returns the total number of jobs that have been received by worker threads.
511 ///
512 /// # Returns
513 /// The number of received jobs.
514 pub fn received_jobs(&self) -> usize {
515 self.manager.received_jobs()
516 }
517
518 /// Returns the total number of jobs that have been concluded by worker threads.
519 ///
520 /// # Returns
521 /// The number of concluded jobs.
522 pub fn concluded_jobs(&self) -> usize {
523 self.manager.concluded_jobs()
524 }
525}