thread_manager/
manager.rs

1use std::sync::Arc;
2
3use crate::assert::assert_wpc;
4use crate::channel::JobChannel;
5use crate::channel::ResultChannel;
6use crate::dispatch::DispatchCycle;
7use crate::iterator::ResultIter;
8use crate::iterator::YieldResultIter;
9use crate::status::ManagerStatus;
10use crate::worker::ThreadWorker;
11
12type FnType<T> = Box<dyn Fn() -> T + Send + 'static>;
13
14/// A thread manager for executing jobs in parallel.
15/// This struct manages a pool of worker threads and distributes jobs among them.
16///
17/// # Type Parameters
18/// - `F`: The type of the function or closure that the threads will execute.
19/// - `T`: The type of the value returned by the function or closure.
20///
21/// # Fields
22/// - `wpc`: The number of Workers-Per-Channel.
23/// - `dispatch`: An instance of `DispatchCycle` to manage job distribution.
24/// - `workers`: A vector of `ThreadWorker` instances representing the worker threads.
25/// - `channels`: A vector of job channels for dispatching jobs to workers.
26/// - `result_channel`: A channel for collecting the results of the jobs.
27/// - `manager_status`: An instance of `ManagerStatus` to track the status of the manager.
28pub struct ThreadManagerCore<F, T>
29where
30    F: Fn() -> T + Send + 'static,
31    T: Send + 'static,
32{
33    wpc: usize,
34    dispatch: DispatchCycle,
35    workers: Vec<ThreadWorker<F, T>>,
36    channels: Vec<Arc<JobChannel<F>>>,
37    result_channel: Arc<ResultChannel<T>>,
38    manager_status: Arc<ManagerStatus>,
39}
40
41impl<F, T> ThreadManagerCore<F, T>
42where
43    F: Fn() -> T + Send + 'static,
44    T: Send + 'static,
45{
46    /// Creates a new instance of `ThreadManagerCore` with a specified number of worker threads.
47    ///
48    /// # Arguments
49    /// - `size`: The number of worker threads to create.
50    ///
51    /// # Returns
52    /// A new instance of `ThreadManagerCore`.
53    pub fn new(size: usize) -> Self {
54        let dispatch: DispatchCycle = DispatchCycle::new(size);
55        let workers: Vec<ThreadWorker<F, T>> = Vec::with_capacity(size);
56        let channels: Vec<Arc<JobChannel<F>>> = Vec::with_capacity(size);
57        let result_channel: Arc<ResultChannel<T>> = Arc::new(ResultChannel::new());
58        let manager_status: Arc<ManagerStatus> = Arc::new(ManagerStatus::new());
59
60        let mut manager: ThreadManagerCore<F, T> = Self {
61            wpc: 1,
62            dispatch,
63            workers,
64            channels,
65            result_channel,
66            manager_status,
67        };
68        manager.create_workers(size);
69        manager
70    }
71
72    /// Creates a new instance of `ThreadManagerCore` with a specified number of worker threads
73    /// and a specific workers-per-channel ratio.
74    ///
75    /// # Arguments
76    /// - `size`: The number of worker threads to create.
77    /// - `wpc`: The number of workers per channel.
78    ///
79    /// # Returns
80    /// A new instance of `ThreadManagerCore` with the specified configuration.
81    pub fn new_asymmetric(size: usize, wpc: usize) -> Self {
82        assert_wpc(size, wpc);
83        let dispatch: DispatchCycle = DispatchCycle::new(size);
84        let workers: Vec<ThreadWorker<F, T>> = Vec::with_capacity(size);
85        let channels: Vec<Arc<JobChannel<F>>> = Vec::with_capacity(size);
86        let result_channel: Arc<ResultChannel<T>> = Arc::new(ResultChannel::new());
87        let manager_status: Arc<ManagerStatus> = Arc::new(ManagerStatus::new());
88
89        let mut manager: ThreadManagerCore<F, T> = Self {
90            wpc,
91            dispatch,
92            workers,
93            channels,
94            result_channel,
95            manager_status,
96        };
97        manager.create_workers(size);
98        manager
99    }
100
101    /// Executes a given function by sending it to an available worker thread.
102    ///
103    /// # Arguments
104    /// - `function`: The function to be executed by the worker thread.
105    pub fn execute(&self, function: F) {
106        let id: usize = self.dispatch.fetch_and_update();
107        let worker: &ThreadWorker<F, T> = &self.workers[id];
108        worker.send(function);
109    }
110
111    /// Resizes the pool of worker threads.
112    ///
113    /// # Arguments
114    /// - `size`: The new size of the worker pool.
115    pub fn resize(&mut self, size: usize) {
116        assert_wpc(size, self.wpc);
117        let dispatch_size: usize = self.dispatch.fetch_size();
118
119        if size > self.workers.len() {
120            let additional_size: usize = size - self.workers.len();
121            self.start_workers(dispatch_size, self.workers.len());
122            self.create_workers(additional_size);
123            self.dispatch.set_size(size);
124        } else if size < dispatch_size {
125            self.send_release_workers(size, dispatch_size);
126            self.dispatch.set_size(size);
127        } else if size > dispatch_size {
128            self.start_workers(dispatch_size, size);
129            self.dispatch.set_size(size);
130        }
131    }
132}
133
134impl<F, T> ThreadManagerCore<F, T>
135where
136    F: Fn() -> T + Send + 'static,
137    T: Send + 'static,
138{
139    fn get_channel(&self, id: usize) -> Arc<JobChannel<F>> {
140        let channel_id: usize = id / self.wpc;
141        self.channels[channel_id].clone()
142    }
143
144    fn create_channels(&mut self, size: usize) {
145        for _ in 0..(size / self.wpc) {
146            let channel: JobChannel<F> = JobChannel::new();
147            let channel: Arc<JobChannel<F>> = Arc::new(channel);
148            self.channels.push(channel);
149        }
150    }
151
152    fn create_workers(&mut self, size: usize) {
153        self.create_channels(size);
154        let worker_size: usize = self.workers.len();
155
156        for idx in 0..size {
157            let id: usize = idx + worker_size;
158            let job_channel: Arc<JobChannel<F>> = self.get_channel(id);
159            let result_channel: Arc<ResultChannel<T>> = self.result_channel.clone();
160            let manager_status: Arc<ManagerStatus> = self.manager_status.clone();
161            let worker: ThreadWorker<F, T> =
162                ThreadWorker::new(id, job_channel, result_channel, manager_status);
163
164            worker.start();
165            self.workers.push(worker);
166        }
167    }
168}
169
170impl<F, T> ThreadManagerCore<F, T>
171where
172    F: Fn() -> T + Send + 'static,
173    T: Send + 'static,
174{
175    /// Joins all worker threads, effectively blocking the current thread until all worker threads have completed their execution.
176    ///
177    /// # Note
178    /// This method will block the current thread until all worker threads have finished processing their jobs.
179    pub fn join(&self) {
180        self.send_release_workers(0, self.workers.len());
181        self.join_workers(0, self.workers.len());
182        self.clear_channels(0, self.channels.len());
183    }
184
185    /// Terminates all worker threads gracefully.
186    ///
187    /// # Note
188    /// This method will block until the currently executing job among threads is completed.
189    pub fn terminate_all(&self) {
190        self.set_termination_workers(0, self.workers.len());
191        self.send_release_workers(0, self.workers.len());
192        self.join_workers(0, self.workers.len());
193        self.clear_channels(0, self.channels.len());
194    }
195
196    /// Provides the job distribution across the worker threads.
197    ///
198    /// # Returns
199    /// A vector containing the count of jobs executed by each worker thread.
200    pub fn job_distribution(&self) -> Vec<usize> {
201        let mut distribution: Vec<usize> = Vec::with_capacity(self.workers.len());
202        for worker in self.workers.iter() {
203            distribution.push(worker.status().received());
204        }
205        distribution
206    }
207
208    /// Checks if all jobs have been finished.
209    ///
210    /// # Returns
211    /// `true` if all jobs are finished, `false` otherwise.
212    pub fn has_finished(&self) -> bool {
213        for job_channel in self.channels.iter() {
214            if !job_channel.is_finished() {
215                return false;
216            }
217        }
218        true
219    }
220
221    /// Retrieves an iterator over the results of completed jobs.
222    ///
223    /// # Returns
224    /// An iterator (`ResultIter`) over the results of the jobs that have been completed.
225    pub fn results<'a>(&'a self) -> ResultIter<'a, T> {
226        ResultIter::new(&self.result_channel)
227    }
228
229    /// Retrieves an iterator that yields results as they become available.
230    ///
231    /// # Returns
232    /// An iterator (`YieldResultIter`) that yields results from worker threads.
233    /// This method blocks for each result until the job queue is complete.
234    pub fn yield_results<'a>(&'a self) -> YieldResultIter<'a, F, T> {
235        YieldResultIter::new(&self.workers, &self.result_channel)
236    }
237
238    /// Returns the number of active worker threads (both busy and waiting).
239    ///
240    /// # Returns
241    /// The total number of active worker threads.
242    pub fn active_threads(&self) -> usize {
243        self.manager_status.active_threads()
244    }
245
246    /// Returns the number of worker threads that are currently busy executing a job.
247    ///
248    /// # Returns
249    /// The number of busy worker threads.
250    pub fn busy_threads(&self) -> usize {
251        self.manager_status.busy_threads()
252    }
253
254    /// Returns the number of worker threads that are currently waiting for a job.
255    ///
256    /// # Returns
257    /// The number of waiting worker threads.
258    pub fn waiting_threads(&self) -> usize {
259        self.manager_status.waiting_threads()
260    }
261
262    /// Returns the number of jobs currently in the queue waiting to be executed.
263    ///
264    /// # Returns
265    /// The size of the job queue.
266    pub fn job_queue(&self) -> usize {
267        let mut queue: usize = 0;
268        for job_channel in self.channels.iter() {
269            queue += job_channel.status().pending();
270        }
271        queue
272    }
273
274    /// Returns the total number of jobs that have been sent to worker threads.
275    ///
276    /// # Returns
277    /// The number of sent jobs.
278    pub fn sent_jobs(&self) -> usize {
279        let mut sent: usize = 0;
280        for job_channel in self.channels.iter() {
281            sent += job_channel.status().sent();
282        }
283        sent
284    }
285
286    /// Returns the total number of jobs that have been received by worker threads.
287    ///
288    /// # Returns
289    /// The number of received jobs.
290    pub fn received_jobs(&self) -> usize {
291        let mut received: usize = 0;
292        for job_channel in self.channels.iter() {
293            received += job_channel.status().received();
294        }
295        received
296    }
297
298    /// Returns the total number of jobs that have been concluded by worker threads.
299    ///
300    /// # Returns
301    /// The number of concluded jobs.
302    pub fn concluded_jobs(&self) -> usize {
303        let mut concluded: usize = 0;
304        for job_channel in self.channels.iter() {
305            concluded += job_channel.status().concluded();
306        }
307        concluded
308    }
309}
310
311impl<F, T> ThreadManagerCore<F, T>
312where
313    F: Fn() -> T + Send + 'static,
314    T: Send + 'static,
315{
316    fn start_workers(&self, st: usize, en: usize) {
317        for worker in self.workers[st..en].iter() {
318            worker.start();
319        }
320    }
321
322    fn join_workers(&self, st: usize, en: usize) {
323        for worker in self.workers[st..en].iter() {
324            worker.join();
325        }
326    }
327
328    fn clear_channels(&self, st: usize, en: usize) {
329        for job_channel in self.channels[st..en].iter() {
330            job_channel.clear();
331        }
332    }
333
334    fn set_termination_workers(&self, st: usize, en: usize) {
335        for worker in self.workers[st..en].iter() {
336            worker.set_termination(true);
337        }
338    }
339
340    fn send_release_workers(&self, st: usize, en: usize) {
341        for worker in self.workers[st..en].iter() {
342            worker.send_release();
343        }
344    }
345}
346
347impl<F, T> Drop for ThreadManagerCore<F, T>
348where
349    F: Fn() -> T + Send + 'static,
350    T: Send + 'static,
351{
352    fn drop(&mut self) {
353        self.terminate_all();
354    }
355}
356
357/// A dynamic dispatch version of `ThreadManagerCore` for managing threads that execute functions
358/// returning a specific type `T`.
359///
360/// # Type Parameters
361/// - `T`: The type of the value returned by the functions executed by the threads.
362pub struct ThreadManager<T>
363where
364    T: Send + 'static,
365{
366    manager: ThreadManagerCore<FnType<T>, T>,
367}
368
369impl<T> ThreadManager<T>
370where
371    T: Send + 'static,
372{
373    /// Creates a new instance of `ThreadManager` with a specified number of worker threads.
374    ///
375    /// # Arguments
376    /// - `size`: The number of worker threads to create.
377    ///
378    /// # Returns
379    /// A new instance of `ThreadManager`.
380    pub fn new(size: usize) -> Self {
381        let manager: ThreadManagerCore<FnType<T>, T> = ThreadManagerCore::new(size);
382        Self { manager }
383    }
384
385    /// Creates a new instance of `ThreadManager` with a specified number of worker threads
386    /// and a specific workers-per-channel ratio.
387    ///
388    /// # Arguments
389    /// - `size`: The number of worker threads to create.
390    /// - `wpc`: The number of workers per channel.
391    ///
392    /// # Returns
393    /// A new instance of `ThreadManager` with the specified configuration.
394    pub fn new_asymmetric(size: usize, wpc: usize) -> Self {
395        let manager: ThreadManagerCore<FnType<T>, T> = ThreadManagerCore::new_asymmetric(size, wpc);
396        Self { manager }
397    }
398
399    /// Executes a given function by sending it to an available worker thread.
400    ///
401    /// # Type Parameters
402    /// - `F`: The type of the function to execute.
403    ///
404    /// # Arguments
405    /// - `function`: The function to be executed by the worker thread.
406    pub fn execute<F>(&self, function: F)
407    where
408        F: Fn() -> T + Send + 'static,
409    {
410        self.manager.execute(Box::new(function))
411    }
412
413    /// Resizes the pool of worker threads.
414    ///
415    /// # Arguments
416    /// - `size`: The new size of the worker pool.
417    pub fn resize(&mut self, size: usize) {
418        self.manager.resize(size)
419    }
420
421    /// Joins all worker threads, effectively blocking the current thread until all worker threads have completed their execution.
422    ///
423    /// # Note
424    /// This method will block the current thread until all worker threads have finished processing their jobs.
425    pub fn join(&self) {
426        self.manager.join();
427    }
428
429    /// Terminates all worker threads gracefully.
430    ///
431    /// # Note
432    /// This method will block until the currently executing job among threads is completed.
433    pub fn terminate_all(&self) {
434        self.manager.terminate_all()
435    }
436
437    /// Provides the job distribution across the worker threads.
438    ///
439    /// # Returns
440    /// A vector containing the count of jobs executed by each worker thread.
441    pub fn job_distribution(&self) -> Vec<usize> {
442        self.manager.job_distribution()
443    }
444
445    /// Checks if all jobs have been finished.
446    ///
447    /// # Returns
448    /// `true` if all jobs are finished, `false` otherwise.
449    pub fn has_finished(&self) -> bool {
450        self.manager.has_finished()
451    }
452
453    /// Retrieves an iterator over the results of completed jobs.
454    ///
455    /// # Returns
456    /// An iterator (`ResultIter`) over the results of the jobs that have been completed.
457    pub fn results<'a>(&'a self) -> ResultIter<'a, T> {
458        self.manager.results()
459    }
460
461    /// Retrieves an iterator that yields results as they become available.
462    ///
463    /// # Returns
464    /// An iterator (`YieldResultIter`) that yields results from worker threads.
465    /// This method blocks for each result until the job queue is complete.
466    pub fn yield_results<'a>(&'a self) -> YieldResultIter<'a, FnType<T>, T> {
467        self.manager.yield_results()
468    }
469
470    /// Returns the number of active worker threads (both busy and waiting).
471    ///
472    /// # Returns
473    /// The total number of active worker threads.
474    pub fn active_threads(&self) -> usize {
475        self.manager.active_threads()
476    }
477
478    /// Returns the number of worker threads that are currently busy executing a job.
479    ///
480    /// # Returns
481    /// The number of busy worker threads.
482    pub fn busy_threads(&self) -> usize {
483        self.manager.busy_threads()
484    }
485
486    /// Returns the number of worker threads that are currently waiting for a job.
487    ///
488    /// # Returns
489    /// The number of waiting worker threads.
490    pub fn waiting_threads(&self) -> usize {
491        self.manager.waiting_threads()
492    }
493
494    /// Returns the number of jobs currently in the queue waiting to be executed.
495    ///
496    /// # Returns
497    /// The size of the job queue.
498    pub fn job_queue(&self) -> usize {
499        self.manager.job_queue()
500    }
501
502    /// Returns the total number of jobs that have been sent to worker threads.
503    ///
504    /// # Returns
505    /// The number of sent jobs.
506    pub fn sent_jobs(&self) -> usize {
507        self.manager.sent_jobs()
508    }
509
510    /// Returns the total number of jobs that have been received by worker threads.
511    ///
512    /// # Returns
513    /// The number of received jobs.
514    pub fn received_jobs(&self) -> usize {
515        self.manager.received_jobs()
516    }
517
518    /// Returns the total number of jobs that have been concluded by worker threads.
519    ///
520    /// # Returns
521    /// The number of concluded jobs.
522    pub fn concluded_jobs(&self) -> usize {
523        self.manager.concluded_jobs()
524    }
525}