Skip to main content

ultra_utils/
thread_pool.rs

1use std::thread;
2use crossbeam_channel::{bounded, Sender};
3
4pub struct ThreadPool {
5    threads: Vec<thread::JoinHandle<()>>,
6    task_sender: Sender<Box<dyn FnOnce() + Send>>,
7}
8
9/// 一个线程池实现.
10impl ThreadPool {
11    /// 创建线程池的同时会创建相应数量的线程.
12    pub fn new(size: usize) -> ThreadPool {
13        assert!(size > 0, "The parameter size must be greater than zero");
14
15        let mut threads: Vec<thread::JoinHandle<()>> = Vec::with_capacity(size);
16
17        let (task_sender, task_receiver) = bounded::<Box<dyn FnOnce() + Send>>(size);
18
19        for _ in 0..size {
20            let task_receiver = task_receiver.clone();
21
22            let handle = thread::spawn(move || {
23                for task in task_receiver {
24                    task();
25                }
26            });
27
28            threads.push(handle);
29        }
30
31        ThreadPool {
32            task_sender,
33            threads,
34        }
35    }
36
37    /// 调用时, 如果有空闲线程, 提交任务后立即返回, 反之阻塞直到有空闲线程.
38    pub fn spawn<T: FnOnce() + Send + 'static>(&self, task: T) {
39        let task = Box::new(task);
40
41        self.task_sender
42            .send(task)
43            .unwrap();
44    }
45
46    /// 等待线程池内所有线程的任务执行完毕, 并销毁线程池内的所有线程.
47    pub fn join(self) {
48        drop(self.task_sender);
49
50        self.threads
51            .into_iter()
52            .for_each(|thread| thread.join().unwrap());
53    }
54}
55
56#[cfg(test)]
57mod tests {
58    mod thread_pool {
59        use std::sync::{Arc, Mutex};
60        use crate::thread_pool::ThreadPool;
61
62        #[test]
63        fn test_thread_pool_general() {
64            let pool = ThreadPool::new(1);
65            let result = Arc::new(Mutex::new(false));
66
67            let result_clone = result.clone();
68            pool.spawn(move || {
69                *result_clone.lock().unwrap() = true;
70            });
71            pool.join();
72
73            assert_eq!(
74                *result.lock().unwrap(),
75                true
76            );
77        }
78    }
79}