pconvert_rust/
parallelism.rs

1//! Thread pool, thread pool status and workers implementation.
2
3use crate::constants;
4use crate::errors::PConvertError;
5use crate::utils::min;
6use image::{ImageBuffer, Rgba};
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::{mpsc, Arc, Mutex};
9use std::thread::{spawn, JoinHandle};
10
11/// Thread pool used in multi-threaded pconvert calls.
12pub struct ThreadPool {
13    workers: Vec<Worker>,
14    work_channel_sender: mpsc::Sender<WorkMessage>,
15    work_channel_receiver_mutex: Arc<Mutex<mpsc::Receiver<WorkMessage>>>,
16    status: Arc<ThreadPoolStatus>,
17}
18
19impl ThreadPool {
20    /// Creates a thread pool with `size` worker threads.
21    pub fn new(size: usize) -> Result<ThreadPool, PConvertError> {
22        if size == 0 {
23            return Err(PConvertError::ArgumentError(
24                "Thread Pool size should be a positive number".to_string(),
25            ));
26        }
27
28        let (work_channel_sender, work_channel_receiver) = mpsc::channel();
29        let workers = Vec::with_capacity(size);
30        let work_channel_receiver_mutex = Arc::new(Mutex::new(work_channel_receiver));
31
32        let status = Arc::new(ThreadPoolStatus::new(size));
33
34        Ok(ThreadPool {
35            workers,
36            work_channel_sender,
37            work_channel_receiver_mutex,
38            status,
39        })
40    }
41
42    /// Begin execution of worker threads.
43    pub fn start(&mut self) {
44        for _ in 0..self.workers.capacity() {
45            self.spawn_worker();
46        }
47    }
48
49    /// Stops worker threads and joins them with the calling thread.
50    fn stop(&mut self) {
51        // sends a Terminate message to all Workers
52        for _ in &self.workers {
53            self.work_channel_sender
54                .send(WorkMessage::Terminate)
55                .unwrap_or_default();
56        }
57
58        // joins main thread with Worker threads
59        for worker in &mut self.workers {
60            if let Some(thread) = worker.thread.take() {
61                thread.join().unwrap_or_default();
62            }
63        }
64    }
65
66    /// Enqueues a task for execution by any of the worker threads.
67    ///
68    /// # Arguments
69    ///
70    /// * `func` - The task to execute.
71    ///
72    /// # Return
73    ///
74    /// Returns the receiver end of a channel where the result will be placed.
75    ///
76    /// # Examples
77    ///
78    /// ```no_run
79    /// use pconvert_rust::parallelism::{ResultMessage, ThreadPool};
80    /// use pconvert_rust::utils::read_png_from_file;
81    ///
82    /// let mut thread_pool = ThreadPool::new(10).unwrap();
83    /// let path = "path/to/file.png".to_owned();
84    /// let demultiply = false;
85    /// let result_channel = thread_pool.execute(move || ResultMessage::ImageResult(read_png_from_file(path, demultiply)));
86    /// let top = match result_channel.recv().unwrap() {
87    ///     ResultMessage::ImageResult(result) => result,
88    /// }.unwrap();
89    /// ```
90    pub fn execute<F>(&self, func: F) -> mpsc::Receiver<ResultMessage>
91    where
92        F: FnOnce() -> ResultMessage + Send + 'static,
93    {
94        let (result_channel_sender, result_channel_receiver) = mpsc::channel();
95        let task = Box::new(func);
96
97        // sends task to task queue and attaches the sender end of the result channel
98        // so that the Worker can send the task result
99        self.status.inc_queued_count();
100        self.work_channel_sender
101            .send(WorkMessage::NewTask(task, result_channel_sender))
102            .unwrap_or_default();
103
104        result_channel_receiver
105    }
106
107    /// Expands the thread pool to `num_threads`.
108    /// Creates `n` workers, where `n = num_threads - thread_pool_size`.
109    pub fn expand_to(&mut self, num_threads: usize) {
110        let num_threads = min(num_threads, constants::MAX_THREAD_POOL_SIZE);
111        let to_spawn = num_threads as isize - self.status.size() as isize;
112        for _ in 0..to_spawn {
113            self.spawn_worker();
114            self.status.inc_size();
115        }
116    }
117
118    pub fn get_status(&self) -> ThreadPoolStatus {
119        (*self.status).clone()
120    }
121
122    fn spawn_worker(&mut self) {
123        // creates Worker instances that receive the receiver end
124        // of the channel where jobs/tasks are submitted
125        self.workers.push(Worker::new(
126            self.status.clone(),
127            Arc::clone(&self.work_channel_receiver_mutex),
128        ));
129    }
130}
131
132impl Drop for ThreadPool {
133    fn drop(&mut self) {
134        self.stop();
135    }
136}
137
138struct Worker {
139    thread: Option<JoinHandle<()>>,
140}
141
142impl Worker {
143    fn new(
144        thread_pool_status: Arc<ThreadPoolStatus>,
145        receiver: Arc<Mutex<mpsc::Receiver<WorkMessage>>>,
146    ) -> Worker {
147        let thread = spawn(move || loop {
148            let message = receiver.lock().unwrap().recv().unwrap();
149
150            match message {
151                WorkMessage::NewTask(task, result_channel_sender) => {
152                    thread_pool_status.dec_queued_count();
153                    thread_pool_status.inc_active_count();
154
155                    let result = task();
156
157                    result_channel_sender.send(result).unwrap_or_default();
158
159                    thread_pool_status.dec_active_count();
160                }
161
162                WorkMessage::Terminate => {
163                    thread_pool_status.dec_size();
164                    break;
165                }
166            }
167        });
168
169        Worker {
170            thread: Some(thread),
171        }
172    }
173}
174
175type Task = Box<dyn FnOnce() -> ResultMessage + Send>;
176enum WorkMessage {
177    NewTask(Task, mpsc::Sender<ResultMessage>),
178    Terminate,
179}
180
181/// Result message types for `self.execute()`.
182pub enum ResultMessage {
183    ImageResult(Result<ImageBuffer<Rgba<u8>, Vec<u8>>, PConvertError>),
184}
185
186/// Represents the status of the thread pool (e.g. size, queued jobs, active jobs).
187/// Status counts use `Atomic*` data types in order to be safely shared across workers.
188pub struct ThreadPoolStatus {
189    size: AtomicUsize,
190    queued_count: AtomicUsize,
191    active_count: AtomicUsize,
192}
193
194impl ThreadPoolStatus {
195    pub fn new(size: usize) -> Self {
196        ThreadPoolStatus {
197            size: AtomicUsize::new(size),
198            queued_count: AtomicUsize::new(0),
199            active_count: AtomicUsize::new(0),
200        }
201    }
202
203    pub fn size(&self) -> usize {
204        self.size.load(Ordering::Acquire)
205    }
206
207    pub fn queued(&self) -> usize {
208        self.queued_count.load(Ordering::Relaxed)
209    }
210
211    pub fn active(&self) -> usize {
212        self.active_count.load(Ordering::Relaxed)
213    }
214
215    pub fn inc_queued_count(&self) {
216        self.queued_count.fetch_add(1, Ordering::Relaxed);
217    }
218
219    pub fn dec_queued_count(&self) {
220        self.queued_count.fetch_sub(1, Ordering::Relaxed);
221    }
222
223    pub fn inc_active_count(&self) {
224        self.active_count.fetch_add(1, Ordering::Relaxed);
225    }
226
227    pub fn dec_active_count(&self) {
228        self.active_count.fetch_sub(1, Ordering::Relaxed);
229    }
230
231    pub fn inc_size(&self) {
232        self.size.fetch_add(1, Ordering::Relaxed);
233    }
234
235    pub fn dec_size(&self) {
236        self.size.fetch_sub(1, Ordering::Relaxed);
237    }
238}
239
240impl Clone for ThreadPoolStatus {
241    fn clone(&self) -> Self {
242        ThreadPoolStatus {
243            size: AtomicUsize::new(self.size()),
244            queued_count: AtomicUsize::new(self.queued()),
245            active_count: AtomicUsize::new(self.active()),
246        }
247    }
248}