ultra_utils/
thread_pool.rs1use std::thread;
2use crossbeam_channel::{bounded, Sender};
3
4pub struct ThreadPool {
5 threads: Vec<thread::JoinHandle<()>>,
6 task_sender: Sender<Box<dyn FnOnce() + Send>>,
7}
8
9impl ThreadPool {
11 pub fn new(size: usize) -> ThreadPool {
13 assert!(size > 0, "The parameter size must be greater than zero");
14
15 let mut threads: Vec<thread::JoinHandle<()>> = Vec::with_capacity(size);
16
17 let (task_sender, task_receiver) = bounded::<Box<dyn FnOnce() + Send>>(size);
18
19 for _ in 0..size {
20 let task_receiver = task_receiver.clone();
21
22 let handle = thread::spawn(move || {
23 for task in task_receiver {
24 task();
25 }
26 });
27
28 threads.push(handle);
29 }
30
31 ThreadPool {
32 task_sender,
33 threads,
34 }
35 }
36
37 pub fn spawn<T: FnOnce() + Send + 'static>(&self, task: T) {
39 let task = Box::new(task);
40
41 self.task_sender
42 .send(task)
43 .unwrap();
44 }
45
46 pub fn join(self) {
48 drop(self.task_sender);
49
50 self.threads
51 .into_iter()
52 .for_each(|thread| thread.join().unwrap());
53 }
54}
55
56#[cfg(test)]
57mod tests {
58 mod thread_pool {
59 use std::sync::{Arc, Mutex};
60 use crate::thread_pool::ThreadPool;
61
62 #[test]
63 fn test_thread_pool_general() {
64 let pool = ThreadPool::new(1);
65 let result = Arc::new(Mutex::new(false));
66
67 let result_clone = result.clone();
68 pool.spawn(move || {
69 *result_clone.lock().unwrap() = true;
70 });
71 pool.join();
72
73 assert_eq!(
74 *result.lock().unwrap(),
75 true
76 );
77 }
78 }
79}