jlizard_simple_threadpool/
worker.rs

1//! Worker model for concurrent jobs handling
2use crate::common::Job;
3#[cfg(feature = "log")]
4use log::debug;
5use std::sync::mpsc::Receiver;
6use std::sync::{Arc, Mutex};
7use std::thread::JoinHandle;
8
9pub struct Worker {
10    pub(super) id: u8,
11    pub(super) thread: Option<JoinHandle<()>>,
12}
13
14impl Worker {
15    /// Creates a new worker that spawns a thread to process jobs from the shared receiver.
16    ///
17    /// The worker continuously receives jobs from the channel until the sender is dropped,
18    /// at which point it exits gracefully.
19    pub(crate) fn new(id: u8, receiver: Arc<Mutex<Receiver<Job>>>) -> Self {
20        let thread = std::thread::spawn(move || {
21            loop {
22                // FIXME modify this to handle errors and push them to the joinhandle
23                let job_msg = receiver.lock().unwrap().recv();
24
25                match job_msg {
26                    Ok(job) => {
27                        #[cfg(feature = "log")]
28                        {
29                            debug!("Worker {id} got a job; executing.");
30                        }
31                        job();
32                    }
33                    Err(_) => {
34                        #[cfg(feature = "log")]
35                        {
36                            debug!("Worker {id} disconnected; shutting down;");
37                        }
38                        break;
39                    }
40                }
41            }
42        });
43
44        Self {
45            id,
46            thread: Some(thread),
47        }
48    }
49
50    /// get id of the worker
51    #[inline]
52    pub fn get_id(&self) -> u8 {
53        self.id
54    }
55}
56
57#[cfg(test)]
58mod tests {
59    use super::*;
60    use std::sync::mpsc;
61    use std::time::Duration;
62
63    #[test]
64    fn test_worker_creation() {
65        let (sender, receiver) = mpsc::channel::<Job>();
66        let receiver = Arc::new(Mutex::new(receiver));
67
68        let worker = Worker::new(1, Arc::clone(&receiver));
69
70        assert_eq!(worker.id, 1);
71        assert!(worker.thread.is_some());
72
73        // Clean up
74        drop(sender);
75        worker.thread.unwrap().join().unwrap();
76    }
77
78    #[test]
79    fn test_worker_executes_job() {
80        let (sender, receiver) = mpsc::channel::<Job>();
81        let receiver = Arc::new(Mutex::new(receiver));
82
83        let executed = Arc::new(Mutex::new(false));
84        let executed_clone = Arc::clone(&executed);
85
86        let worker = Worker::new(2, Arc::clone(&receiver));
87
88        // Send a job
89        sender
90            .send(Box::new(move || {
91                *executed_clone.lock().unwrap() = true;
92            }))
93            .unwrap();
94
95        // Give worker time to execute
96        std::thread::sleep(Duration::from_millis(50));
97
98        // Verify job was executed
99        assert!(*executed.lock().unwrap());
100
101        // Clean up
102        drop(sender);
103        worker.thread.unwrap().join().unwrap();
104    }
105
106    #[test]
107    fn test_worker_shutdown_on_channel_close() {
108        let (sender, receiver) = mpsc::channel::<Job>();
109        let receiver = Arc::new(Mutex::new(receiver));
110
111        let worker = Worker::new(3, Arc::clone(&receiver));
112
113        // Close channel by dropping sender
114        drop(sender);
115
116        // Worker thread should exit gracefully
117        let result = worker.thread.unwrap().join();
118        assert!(result.is_ok());
119    }
120}