1use std::thread;
3use crossbeam_deque::{Injector, Stealer, Worker as DequeWorker};
4use rand::seq::SliceRandom;
5use std::sync::Arc;
6type Job = Box<dyn FnOnce() + Send + 'static>;
8
9pub struct ThreadPool {
13 workers: Vec<Worker>,
14 global_injector: Arc<Injector<Message>>,}
16struct Worker {
17 id: usize,
18 thread: Option<thread::JoinHandle<()>>,
19}
20enum Message {
22 NewJob(Job),
23 Terminate,
24}
25impl ThreadPool {
29
30 pub fn new(size: usize) -> ThreadPool {
31 assert!(size > 0, "ThreadPool size must be greater than zero");
32
33 let global_injector = Arc::new(Injector::new());
34
35 let mut workers = Vec::with_capacity(size);
36 let mut local_queues:Vec<DequeWorker<Message>> = (0..size).map(|_| DequeWorker::new_fifo()).collect();
37 let stealers: Vec<Stealer<Message>> = local_queues.iter().map(|w| w.stealer()).collect();
38 for id in 0..size {
39 let local_queue = local_queues.remove(0);
40 let injector_clone = Arc::clone(&global_injector);
41 let stealers_clone = stealers.clone();
42
43 workers.push(Worker::new(id, move || {
44 worker_loop(local_queue, injector_clone, stealers_clone);
45 }));
46 }
47
48 ThreadPool { workers, global_injector }
49 }
50
51 pub fn execute<F>(&self, f: F)
52 where
53 F: FnOnce() + Send + 'static,
54 {
55 let job = Box::new(f);
56
57 self.global_injector.push(Message::NewJob(job));
58 }
59}
60impl Drop for ThreadPool {
61 fn drop(&mut self) {
62 for _ in 0..self.workers.len() {
64 self.global_injector.push(Message::Terminate);
65 }
66
67 for worker in &mut self.workers {
68 if let Some(thread) = worker.thread.take() {
69 thread.join().unwrap();
70 }
71 }
72 }
73}
74
75
76impl Worker {
77 fn new<F>(id: usize, logic: F) -> Worker
79 where
80 F: FnOnce() + Send + 'static,
81 {
82 let thread = thread::spawn(logic);
83
84 Worker { id, thread: Some(thread) }
85 }
86}
87
88fn worker_loop(
89 local_queue: DequeWorker<Message>,
90 global_injector: Arc<Injector<Message>>,
91 stealers: Vec<Stealer<Message>>,
92) {
93 loop {
94 match find_task(&local_queue, &global_injector, &stealers) {
96 Some(Message::NewJob(job)) => {
97 job(); }
99 Some(Message::Terminate) => {
100 break; }
102 None => {
103 thread::yield_now();
105 }
106 }
107 }
108}
109
110fn find_task<'a>(
112 local_queue: &'a DequeWorker<Message>,
113 global_injector: &'a Arc<Injector<Message>>,
114 stealers: &'a [Stealer<Message>],
115) -> Option<Message> {
116 if let Some(job) = local_queue.pop() {
118 return Some(job);
119 }
120
121 match global_injector.steal() {
124 crossbeam_deque::Steal::Success(job) => return Some(job),
125 _ => (), }
127 let mut rng = rand::thread_rng();
131 let mut indices: Vec<usize> = (0..stealers.len()).collect();
132 indices.shuffle(&mut rng);
133
134 for &i in &indices {
135 match stealers[i].steal() {
137 crossbeam_deque::Steal::Success(job) => return Some(job),
138 _ => (), }
140 }
142
143 None
144}