croner_scheduler/
threadpool.rs1use std::collections::VecDeque;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Condvar, Mutex};
4use std::thread;
5
6type Task = Box<dyn FnOnce() + Send + 'static>;
7
8pub struct ThreadPool {
9 workers: Vec<Worker>,
10 sender: Arc<Mutex<VecDeque<Task>>>,
11 cvar: Arc<Condvar>,
12 shutdown: Arc<AtomicBool>,
13}
14
15impl ThreadPool {
16 pub fn new(size: usize) -> ThreadPool {
17 assert!(size > 0);
18
19 let sender = Arc::new(Mutex::new(VecDeque::new()));
20 let cvar = Arc::new(Condvar::new());
21 let shutdown = Arc::new(AtomicBool::new(false));
22 let mut workers = Vec::with_capacity(size);
23
24 for _ in 0..size {
25 workers.push(Worker::new(
26 Arc::clone(&sender),
27 Arc::clone(&cvar),
28 Arc::clone(&shutdown),
29 ));
30 }
31
32 ThreadPool {
33 workers,
34 sender,
35 cvar,
36 shutdown,
37 }
38 }
39
40 pub fn execute<F>(&self, f: F)
41 where
42 F: FnOnce() + Send + 'static,
43 {
44 let task = Box::new(f);
45 let mut tasks = self.sender.lock().unwrap();
46 tasks.push_back(task);
47 self.cvar.notify_one();
48 }
49
50 pub fn shutdown(&mut self) {
51 self.shutdown.store(true, Ordering::SeqCst);
52 self.cvar.notify_all();
53
54 for i in 0..self.workers.len() {
56 if let Some(thread) = self.workers[i].thread.take() {
57 thread.join().unwrap();
58 }
59 }
60 }
61
62 pub fn max_count(&self) -> usize {
63 self.workers.len()
64 }
65}
66
67struct Worker {
68 thread: Option<thread::JoinHandle<()>>,
69}
70
71impl Worker {
72 fn new(
73 sender: Arc<Mutex<VecDeque<Task>>>,
74 cvar: Arc<Condvar>,
75 shutdown: Arc<AtomicBool>,
76 ) -> Worker {
77 let thread = thread::spawn(move || loop {
78 let task = {
79 let mut tasks = sender.lock().unwrap();
80 while !shutdown.load(Ordering::SeqCst) && tasks.is_empty() {
81 tasks = cvar.wait(tasks).unwrap();
82 }
83 if shutdown.load(Ordering::SeqCst) {
84 break;
85 }
86 tasks.pop_front()
87 };
88
89 if let Some(task) = task {
90 task();
91 }
92 });
93
94 Worker {
95 thread: Some(thread),
96 }
97 }
98}
99
100impl Drop for ThreadPool {
101 fn drop(&mut self) {
102 self.shutdown();
103 }
104}