hyper_thread_pool/
lib.rs

1//! A simple thread pool implementation in Rust
2use std::thread;
3use crossbeam_deque::{Injector, Stealer, Worker as DequeWorker}; 
4use rand::seq::SliceRandom; 
5use std::sync::Arc;
6/// type alias for a job that can be sent to workers
7type Job = Box<dyn FnOnce() + Send + 'static>; 
8
9/// ThreadPool struct managing a pool of worker threads
10
11/// and a channel for sending jobs to them
12pub struct ThreadPool {
13    workers: Vec<Worker>,
14    global_injector: Arc<Injector<Message>>,// Global injector for work stealing
15}
16struct Worker {
17    id: usize,
18    thread: Option<thread::JoinHandle<()>>,
19}
20/// ENUM distinguishing between new jobs and termination signals
21enum Message {
22    NewJob(Job),
23    Terminate,
24}
25///
26
27
28impl ThreadPool {
29
30    pub fn new(size: usize) -> ThreadPool {
31        assert!(size > 0, "ThreadPool size must be greater than zero");
32
33        let global_injector = Arc::new(Injector::new());
34
35        let mut workers = Vec::with_capacity(size);
36        let mut local_queues:Vec<DequeWorker<Message>>  = (0..size).map(|_| DequeWorker::new_fifo()).collect();
37        let stealers: Vec<Stealer<Message>> = local_queues.iter().map(|w| w.stealer()).collect();
38        for id in 0..size {
39            let local_queue = local_queues.remove(0);
40            let injector_clone = Arc::clone(&global_injector); 
41            let stealers_clone = stealers.clone();
42            
43            workers.push(Worker::new(id, move || {
44                worker_loop(local_queue, injector_clone, stealers_clone);
45            }));
46        }
47
48        ThreadPool { workers, global_injector }
49    }
50
51    pub fn execute<F>(&self, f: F)
52    where
53        F: FnOnce() + Send + 'static,
54    {
55        let job = Box::new(f);
56
57        self.global_injector.push(Message::NewJob(job));
58    }
59}
60impl Drop for ThreadPool {
61    fn drop(&mut self) {
62        // 发送关闭信号给所有工作线程
63       for _ in 0..self.workers.len() {
64            self.global_injector.push(Message::Terminate);
65        }
66
67        for worker in &mut self.workers {
68            if let Some(thread) = worker.thread.take() {
69                thread.join().unwrap();
70            }
71        }
72    }
73}
74
75
76impl Worker {
77    /// receives a closure representing thread's logic
78    fn new<F>(id: usize, logic: F) -> Worker
79    where
80        F: FnOnce() + Send + 'static,
81    {
82        let thread = thread::spawn(logic);
83
84        Worker { id, thread: Some(thread) }
85    }
86}
87
88fn worker_loop(
89    local_queue: DequeWorker<Message>,
90    global_injector: Arc<Injector<Message>>,
91    stealers: Vec<Stealer<Message>>,
92) {
93    loop {
94        // 按照优先级寻找任务
95        match find_task(&local_queue, &global_injector, &stealers) {
96            Some(Message::NewJob(job)) => {
97                job(); // 执行任务
98            }
99            Some(Message::Terminate) => {
100                break; // 收到关闭信号,退出循环
101            }
102            None => {
103                // 如果在所有地方都找不到任务,就让出CPU时间片,避免空转
104                thread::yield_now();
105            }
106        }
107    }
108}
109
110// 核心的“寻找任务”算法
111fn find_task<'a>(
112    local_queue: &'a DequeWorker<Message>,
113    global_injector: &'a Arc<Injector<Message>>,
114    stealers: &'a [Stealer<Message>],
115) -> Option<Message> {
116    // 优先级1:处理自己的本地任务 (LIFO)
117    if let Some(job) = local_queue.pop() {
118        return Some(job);
119    }
120
121    // 优先级2:从全局注入器获取任务 (FIFO)
122    // --- 修改开始 ---
123    match global_injector.steal() {
124        crossbeam_deque::Steal::Success(job) => return Some(job),
125        _ => (), // 对 Empty 和 Retry 情况什么都不做
126    }
127    // --- 修改结束 ---
128
129    // 优先级3:从其他随机一个 worker 窃取任务 (FIFO)
130    let mut rng = rand::thread_rng();
131    let mut indices: Vec<usize> = (0..stealers.len()).collect();
132    indices.shuffle(&mut rng);
133
134    for &i in &indices {
135        // --- 修改开始 ---
136        match stealers[i].steal() {
137            crossbeam_deque::Steal::Success(job) => return Some(job),
138            _ => (), // 对 Empty 和 Retry 情况什么都不做
139        }
140        // --- 修改结束 ---
141    }
142
143    None
144}