1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use crate::channel::AtomicChannel;
use crate::status::ManagerStatus;
use crate::types::Job;
use crate::worker::ThreadWorker;

pub struct ThreadManager {
    channel: Arc<AtomicChannel<Job>>,
    status: Arc<ManagerStatus>,
    workers: Vec<ThreadWorker>,
    dispatch: RoundRobinDispatch,
}

impl ThreadManager {
    pub fn new(size: usize) -> Self {
        let channel: Arc<AtomicChannel<Job>> = Arc::new(AtomicChannel::new());
        let status: Arc<ManagerStatus> = Arc::new(ManagerStatus::new());
        let dispatch: RoundRobinDispatch = RoundRobinDispatch::new(size);
        let workers: Vec<ThreadWorker> = Vec::with_capacity(size);

        let mut manager: ThreadManager = ThreadManager {
            channel,
            status,
            workers,
            dispatch,
        };
        manager.create_workers(size);
        manager
    }

    pub fn execute<F>(&self, function: F)
    where
        F: Fn() + Send + 'static,
    {
        let dispatch: usize = self.dispatch.fetch_and_update();
        let worker: &ThreadWorker = &self.workers[dispatch];
        worker.send(function);
    }

    pub fn join(&self) {
        for worker in self.workers.iter() {
            worker.send_join_signal();
        }

        for worker in self.workers.iter() {
            worker.send_channel_release();
        }

        for worker in self.workers.iter() {
            worker.join();
        }
    }

    pub fn terminate_all(&self) {
        for worker in self.workers.iter() {
            worker.send_termination_signal();
        }

        for worker in self.workers.iter() {
            worker.join();
        }

        self.channel.clear_receiver();
    }

    pub fn set_thread_size(&mut self, size: usize) {
        if size > self.workers.len() {
            let additional_size: usize = size - self.workers.len();
            self.create_workers(additional_size);
        } else if size < self.workers.len() {
            let split_workers: Vec<ThreadWorker> = self.workers.split_off(size);
            for worker in split_workers.iter() {
                worker.send_termination_signal();
            }
        }
    }

    pub fn has_finished(&self) -> bool {
        let sent_jobs: usize = self.get_sent_jobs();
        let completed_jobs: usize = self.get_completed_jobs();

        if completed_jobs != sent_jobs {
            return false;
        }
        true
    }

    pub fn get_active_threads(&self) -> usize {
        self.status.get_active_threads()
    }

    pub fn get_busy_threads(&self) -> usize {
        self.status.get_busy_threads()
    }

    pub fn get_waiting_threads(&self) -> usize {
        self.status.get_waiting_threads()
    }

    pub fn get_job_distribution(&self) -> Vec<usize> {
        let mut received_jobs: Vec<usize> = Vec::new();
        for worker in self.workers.iter() {
            received_jobs.push(worker.get_received_jobs());
        }
        received_jobs
    }

    pub fn get_job_queue(&self) -> usize {
        let job_queue: usize = self.channel.get_pending_count();
        job_queue
    }

    pub fn get_received_jobs(&self) -> usize {
        let received_jobs: usize = self.channel.get_received_count();
        received_jobs
    }

    pub fn get_sent_jobs(&self) -> usize {
        let sent_jobs: usize = self.channel.get_sent_count();
        sent_jobs
    }

    pub fn get_completed_jobs(&self) -> usize {
        let overall_completed_jobs: usize = self.channel.get_concluded_count();
        overall_completed_jobs
    }
}

impl ThreadManager {
    fn create_workers(&mut self, size: usize) {
        let worker_size: usize = self.workers.len();

        for idx in 0..size {
            let id: usize = idx + worker_size;
            let channel: Arc<AtomicChannel<Box<dyn Fn() + Send>>> = self.channel.clone();
            let manager_status: Arc<ManagerStatus> = self.status.clone();
            let worker: ThreadWorker = ThreadWorker::new(id, channel, manager_status);

            worker.start();
            self.workers.push(worker);
        }
    }
}

pub struct RoundRobinDispatch {
    max: usize,
    value: AtomicUsize,
}

impl RoundRobinDispatch {
    pub fn new(max: usize) -> Self {
        let value: AtomicUsize = AtomicUsize::new(0);
        Self { max, value }
    }

    pub fn fetch_and_update(&self) -> usize {
        let dispatch: usize = self.value.load(Ordering::Acquire);
        if dispatch >= (self.max - 1) {
            self.value.store(0, Ordering::Release);
        } else {
            self.value.store(dispatch + 1, Ordering::Release);
        }
        dispatch
    }
}