pconvert_rust/
parallelism.rs1use crate::constants;
4use crate::errors::PConvertError;
5use crate::utils::min;
6use image::{ImageBuffer, Rgba};
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::{mpsc, Arc, Mutex};
9use std::thread::{spawn, JoinHandle};
10
11pub struct ThreadPool {
13 workers: Vec<Worker>,
14 work_channel_sender: mpsc::Sender<WorkMessage>,
15 work_channel_receiver_mutex: Arc<Mutex<mpsc::Receiver<WorkMessage>>>,
16 status: Arc<ThreadPoolStatus>,
17}
18
19impl ThreadPool {
20 pub fn new(size: usize) -> Result<ThreadPool, PConvertError> {
22 if size == 0 {
23 return Err(PConvertError::ArgumentError(
24 "Thread Pool size should be a positive number".to_string(),
25 ));
26 }
27
28 let (work_channel_sender, work_channel_receiver) = mpsc::channel();
29 let workers = Vec::with_capacity(size);
30 let work_channel_receiver_mutex = Arc::new(Mutex::new(work_channel_receiver));
31
32 let status = Arc::new(ThreadPoolStatus::new(size));
33
34 Ok(ThreadPool {
35 workers,
36 work_channel_sender,
37 work_channel_receiver_mutex,
38 status,
39 })
40 }
41
42 pub fn start(&mut self) {
44 for _ in 0..self.workers.capacity() {
45 self.spawn_worker();
46 }
47 }
48
49 fn stop(&mut self) {
51 for _ in &self.workers {
53 self.work_channel_sender
54 .send(WorkMessage::Terminate)
55 .unwrap_or_default();
56 }
57
58 for worker in &mut self.workers {
60 if let Some(thread) = worker.thread.take() {
61 thread.join().unwrap_or_default();
62 }
63 }
64 }
65
66 pub fn execute<F>(&self, func: F) -> mpsc::Receiver<ResultMessage>
91 where
92 F: FnOnce() -> ResultMessage + Send + 'static,
93 {
94 let (result_channel_sender, result_channel_receiver) = mpsc::channel();
95 let task = Box::new(func);
96
97 self.status.inc_queued_count();
100 self.work_channel_sender
101 .send(WorkMessage::NewTask(task, result_channel_sender))
102 .unwrap_or_default();
103
104 result_channel_receiver
105 }
106
107 pub fn expand_to(&mut self, num_threads: usize) {
110 let num_threads = min(num_threads, constants::MAX_THREAD_POOL_SIZE);
111 let to_spawn = num_threads as isize - self.status.size() as isize;
112 for _ in 0..to_spawn {
113 self.spawn_worker();
114 self.status.inc_size();
115 }
116 }
117
118 pub fn get_status(&self) -> ThreadPoolStatus {
119 (*self.status).clone()
120 }
121
122 fn spawn_worker(&mut self) {
123 self.workers.push(Worker::new(
126 self.status.clone(),
127 Arc::clone(&self.work_channel_receiver_mutex),
128 ));
129 }
130}
131
132impl Drop for ThreadPool {
133 fn drop(&mut self) {
134 self.stop();
135 }
136}
137
138struct Worker {
139 thread: Option<JoinHandle<()>>,
140}
141
142impl Worker {
143 fn new(
144 thread_pool_status: Arc<ThreadPoolStatus>,
145 receiver: Arc<Mutex<mpsc::Receiver<WorkMessage>>>,
146 ) -> Worker {
147 let thread = spawn(move || loop {
148 let message = receiver.lock().unwrap().recv().unwrap();
149
150 match message {
151 WorkMessage::NewTask(task, result_channel_sender) => {
152 thread_pool_status.dec_queued_count();
153 thread_pool_status.inc_active_count();
154
155 let result = task();
156
157 result_channel_sender.send(result).unwrap_or_default();
158
159 thread_pool_status.dec_active_count();
160 }
161
162 WorkMessage::Terminate => {
163 thread_pool_status.dec_size();
164 break;
165 }
166 }
167 });
168
169 Worker {
170 thread: Some(thread),
171 }
172 }
173}
174
175type Task = Box<dyn FnOnce() -> ResultMessage + Send>;
176enum WorkMessage {
177 NewTask(Task, mpsc::Sender<ResultMessage>),
178 Terminate,
179}
180
181pub enum ResultMessage {
183 ImageResult(Result<ImageBuffer<Rgba<u8>, Vec<u8>>, PConvertError>),
184}
185
186pub struct ThreadPoolStatus {
189 size: AtomicUsize,
190 queued_count: AtomicUsize,
191 active_count: AtomicUsize,
192}
193
194impl ThreadPoolStatus {
195 pub fn new(size: usize) -> Self {
196 ThreadPoolStatus {
197 size: AtomicUsize::new(size),
198 queued_count: AtomicUsize::new(0),
199 active_count: AtomicUsize::new(0),
200 }
201 }
202
203 pub fn size(&self) -> usize {
204 self.size.load(Ordering::Acquire)
205 }
206
207 pub fn queued(&self) -> usize {
208 self.queued_count.load(Ordering::Relaxed)
209 }
210
211 pub fn active(&self) -> usize {
212 self.active_count.load(Ordering::Relaxed)
213 }
214
215 pub fn inc_queued_count(&self) {
216 self.queued_count.fetch_add(1, Ordering::Relaxed);
217 }
218
219 pub fn dec_queued_count(&self) {
220 self.queued_count.fetch_sub(1, Ordering::Relaxed);
221 }
222
223 pub fn inc_active_count(&self) {
224 self.active_count.fetch_add(1, Ordering::Relaxed);
225 }
226
227 pub fn dec_active_count(&self) {
228 self.active_count.fetch_sub(1, Ordering::Relaxed);
229 }
230
231 pub fn inc_size(&self) {
232 self.size.fetch_add(1, Ordering::Relaxed);
233 }
234
235 pub fn dec_size(&self) {
236 self.size.fetch_sub(1, Ordering::Relaxed);
237 }
238}
239
240impl Clone for ThreadPoolStatus {
241 fn clone(&self) -> Self {
242 ThreadPoolStatus {
243 size: AtomicUsize::new(self.size()),
244 queued_count: AtomicUsize::new(self.queued()),
245 active_count: AtomicUsize::new(self.active()),
246 }
247 }
248}