croner_scheduler/
threadpool.rs

1use std::collections::VecDeque;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Condvar, Mutex};
4use std::thread;
5
6type Task = Box<dyn FnOnce() + Send + 'static>;
7
8pub struct ThreadPool {
9    workers: Vec<Worker>,
10    sender: Arc<Mutex<VecDeque<Task>>>,
11    cvar: Arc<Condvar>,
12    shutdown: Arc<AtomicBool>,
13}
14
15impl ThreadPool {
16    pub fn new(size: usize) -> ThreadPool {
17        assert!(size > 0);
18
19        let sender = Arc::new(Mutex::new(VecDeque::new()));
20        let cvar = Arc::new(Condvar::new());
21        let shutdown = Arc::new(AtomicBool::new(false));
22        let mut workers = Vec::with_capacity(size);
23
24        for _ in 0..size {
25            workers.push(Worker::new(
26                Arc::clone(&sender),
27                Arc::clone(&cvar),
28                Arc::clone(&shutdown),
29            ));
30        }
31
32        ThreadPool {
33            workers,
34            sender,
35            cvar,
36            shutdown,
37        }
38    }
39
40    pub fn execute<F>(&self, f: F)
41    where
42        F: FnOnce() + Send + 'static,
43    {
44        let task = Box::new(f);
45        let mut tasks = self.sender.lock().unwrap();
46        tasks.push_back(task);
47        self.cvar.notify_one();
48    }
49
50    pub fn shutdown(&mut self) {
51        self.shutdown.store(true, Ordering::SeqCst);
52        self.cvar.notify_all();
53
54        // We need to use indices to borrow each worker mutably
55        for i in 0..self.workers.len() {
56            if let Some(thread) = self.workers[i].thread.take() {
57                thread.join().unwrap();
58            }
59        }
60    }
61
62    pub fn max_count(&self) -> usize {
63        self.workers.len()
64    }
65}
66
67struct Worker {
68    thread: Option<thread::JoinHandle<()>>,
69}
70
71impl Worker {
72    fn new(
73        sender: Arc<Mutex<VecDeque<Task>>>,
74        cvar: Arc<Condvar>,
75        shutdown: Arc<AtomicBool>,
76    ) -> Worker {
77        let thread = thread::spawn(move || loop {
78            let task = {
79                let mut tasks = sender.lock().unwrap();
80                while !shutdown.load(Ordering::SeqCst) && tasks.is_empty() {
81                    tasks = cvar.wait(tasks).unwrap();
82                }
83                if shutdown.load(Ordering::SeqCst) {
84                    break;
85                }
86                tasks.pop_front()
87            };
88
89            if let Some(task) = task {
90                task();
91            }
92        });
93
94        Worker {
95            thread: Some(thread),
96        }
97    }
98}
99
100impl Drop for ThreadPool {
101    fn drop(&mut self) {
102        self.shutdown();
103    }
104}