jlizard_simple_threadpool/
worker.rs1use crate::common::Job;
3#[cfg(feature = "log")]
4use log::debug;
5use std::sync::mpsc::Receiver;
6use std::sync::{Arc, Mutex};
7use std::thread::JoinHandle;
8
9pub struct Worker {
10 pub(super) id: u8,
11 pub(super) thread: Option<JoinHandle<()>>,
12}
13
14impl Worker {
15 pub(crate) fn new(id: u8, receiver: Arc<Mutex<Receiver<Job>>>) -> Self {
20 let thread = std::thread::spawn(move || {
21 loop {
22 let job_msg = receiver.lock().unwrap().recv();
24
25 match job_msg {
26 Ok(job) => {
27 #[cfg(feature = "log")]
28 {
29 debug!("Worker {id} got a job; executing.");
30 }
31 job();
32 }
33 Err(_) => {
34 #[cfg(feature = "log")]
35 {
36 debug!("Worker {id} disconnected; shutting down;");
37 }
38 break;
39 }
40 }
41 }
42 });
43
44 Self {
45 id,
46 thread: Some(thread),
47 }
48 }
49
50 #[inline]
52 pub fn get_id(&self) -> u8 {
53 self.id
54 }
55}
56
57#[cfg(test)]
58mod tests {
59 use super::*;
60 use std::sync::mpsc;
61 use std::time::Duration;
62
63 #[test]
64 fn test_worker_creation() {
65 let (sender, receiver) = mpsc::channel::<Job>();
66 let receiver = Arc::new(Mutex::new(receiver));
67
68 let worker = Worker::new(1, Arc::clone(&receiver));
69
70 assert_eq!(worker.id, 1);
71 assert!(worker.thread.is_some());
72
73 drop(sender);
75 worker.thread.unwrap().join().unwrap();
76 }
77
78 #[test]
79 fn test_worker_executes_job() {
80 let (sender, receiver) = mpsc::channel::<Job>();
81 let receiver = Arc::new(Mutex::new(receiver));
82
83 let executed = Arc::new(Mutex::new(false));
84 let executed_clone = Arc::clone(&executed);
85
86 let worker = Worker::new(2, Arc::clone(&receiver));
87
88 sender
90 .send(Box::new(move || {
91 *executed_clone.lock().unwrap() = true;
92 }))
93 .unwrap();
94
95 std::thread::sleep(Duration::from_millis(50));
97
98 assert!(*executed.lock().unwrap());
100
101 drop(sender);
103 worker.thread.unwrap().join().unwrap();
104 }
105
106 #[test]
107 fn test_worker_shutdown_on_channel_close() {
108 let (sender, receiver) = mpsc::channel::<Job>();
109 let receiver = Arc::new(Mutex::new(receiver));
110
111 let worker = Worker::new(3, Arc::clone(&receiver));
112
113 drop(sender);
115
116 let result = worker.thread.unwrap().join();
118 assert!(result.is_ok());
119 }
120}