fast_able/fast_thread_pool/
task_executor.rs

1use std::{sync::Arc, thread};
2
3use core_affinity::CoreId;
4use crossbeam::atomic::AtomicCell;
5
6/// 基于 features 的 channel 类型定义
7/// 启用 crossbeam_channel feature 时使用 crossbeam::channel,否则使用 std::sync::mpsc
8
9#[cfg(feature = "crossbeam_channel")]
10pub use crossbeam::channel::{bounded, unbounded, Receiver, Sender, TryRecvError};
11
12#[cfg(not(feature = "crossbeam_channel"))]
13pub use std::sync::mpsc::{
14    channel as unbounded, sync_channel as bounded, Receiver, TryRecvError,
15};
16
17// 为 std::sync::mpsc 定义统一的 Sender 类型
18#[cfg(all(not(feature = "crossbeam_channel"), not(feature = "thread_task_bounded")))]
19pub use std::sync::mpsc::Sender;
20
21#[cfg(all(not(feature = "crossbeam_channel"), feature = "thread_task_bounded"))]
22pub use std::sync::mpsc::SyncSender as Sender;
23
24/// 获取默认的有界 channel 容量
25/// 返回 CPU 核心数的 4 倍,至少为 64,最多为 1024
26fn get_default_bounded_capacity() -> usize {
27    let cpu_count = num_cpus::get();
28    let capacity = (cpu_count * 100).max(128).min(4096);
29    capacity
30}
31
32/// 统一的线程池执行器,支持通过 features 切换 channel 实现
33pub struct TaskExecutor {
34    jobs: Sender<Box<dyn FnOnce(&usize) + Send + 'static>>,
35    _handle: thread::JoinHandle<()>,
36    pub count: Arc<AtomicCell<i64>>,
37    core: usize,
38}
39
40impl std::fmt::Debug for TaskExecutor {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        f.debug_struct("TaskExecutor")
43            .field("_handle", &self._handle)
44            .field("count", &self.count)
45            .field("core", &self.core)
46            .finish()
47    }
48}
49
50impl TaskExecutor {
51    /// 创建新的任务执行器
52    /// realtime: 实时内核优先级(1-99,越高越优先),输入 -1 时不开启
53    pub fn new(core: CoreId, realtime: i32) -> TaskExecutor {
54        // 根据 thread_task_bounded feature 选择使用有界或无界 channel
55        #[cfg(feature = "thread_task_bounded")]
56        let (tx, rx) = {
57            let capacity = get_default_bounded_capacity();
58            bounded::<Box<dyn FnOnce(&usize) + Send + 'static>>(capacity)
59        };
60
61        #[cfg(not(feature = "thread_task_bounded"))]
62        let (tx, rx) = unbounded::<Box<dyn FnOnce(&usize) + Send + 'static>>();
63
64        let count = Arc::new(AtomicCell::new(0_i64));
65        let task_count = count.clone();
66
67        let _handle = thread::spawn(move || {
68            // 绑核和开启实时内核
69            super::set_core_affinity_and_realtime(core.id, realtime);
70            let core_id = core.id;
71
72            // 在worker线程启动前设置线程级别的panic hook
73            let old_hook = std::panic::take_hook();
74            std::panic::set_hook(Box::new(move |panic_info| {
75                let thread = std::thread::current();
76                let thread_name = thread.name().unwrap_or("unnamed");
77
78                // 获取panic消息
79                let panic_message = if let Some(s) = panic_info.payload().downcast_ref::<&str>() {
80                    s.to_string()
81                } else if let Some(s) = panic_info.payload().downcast_ref::<String>() {
82                    s.clone()
83                } else {
84                    format!(
85                        "Unknown panic payload type: {:?}",
86                        panic_info.payload().type_id()
87                    )
88                };
89
90                // 获取panic位置信息
91                let location_info = if let Some(location) = panic_info.location() {
92                    format!(
93                        "file: '{}', line: {}, column: {}",
94                        location.file(),
95                        location.line(),
96                        location.column()
97                    )
98                } else {
99                    "unknown location".to_string()
100                };
101
102                // 输出详细的panic信息
103                error!(
104                    "PANIC in TaskExecutor worker thread!\n\
105                     ┌─ Thread Info ─────────────────────────────────────┐\n\
106                     │ Thread Name: {}\n\
107                     │ Core ID: {}\n\
108                     │ Thread ID: {:?}\n\
109                     ├─ Panic Details ──────────────────────────────────┤\n\
110                     │ Message: {}\n\
111                     │ Location: {}\n\
112                     └──────────────────────────────────────────────────┘",
113                    thread_name,
114                    core_id,
115                    thread.id(),
116                    panic_message,
117                    location_info
118                );
119
120                // 调用原来的hook以保持默认行为
121                old_hook(panic_info);
122            }));
123
124            Self::run_worker_loop(rx, task_count, core_id);
125        });
126
127        TaskExecutor {
128            jobs: tx,
129            _handle,
130            count,
131            core: core.id,
132        }
133    }
134
135    /// 创建带自定义容量的任务执行器(仅在启用 thread_task_bounded feature 时有效)
136    /// capacity: 有界 channel 的容量
137    /// realtime: 实时内核优先级(1-99,越高越优先),输入 -1 时不开启
138    #[cfg(feature = "thread_task_bounded")]
139    pub fn new_with_capacity(core: CoreId, capacity: usize, realtime: i32) -> TaskExecutor {
140        let (tx, rx) = bounded::<Box<dyn FnOnce(&usize) + Send + 'static>>(capacity);
141        let count = Arc::new(AtomicCell::new(0_i64));
142        let task_count = count.clone();
143
144        let _handle = thread::spawn(move || {
145            // 绑核和开启实时内核
146            super::set_core_affinity_and_realtime(core.id, realtime);
147            let core_id = core.id;
148
149            // 在worker线程启动前设置线程级别的panic hook
150            let old_hook = std::panic::take_hook();
151            std::panic::set_hook(Box::new(move |panic_info| {
152                let thread = std::thread::current();
153                let thread_name = thread.name().unwrap_or("unnamed");
154
155                // 获取panic消息
156                let panic_message = if let Some(s) = panic_info.payload().downcast_ref::<&str>() {
157                    s.to_string()
158                } else if let Some(s) = panic_info.payload().downcast_ref::<String>() {
159                    s.clone()
160                } else {
161                    format!(
162                        "Unknown panic payload type: {:?}",
163                        panic_info.payload().type_id()
164                    )
165                };
166
167                // 获取panic位置信息
168                let location_info = if let Some(location) = panic_info.location() {
169                    format!(
170                        "file: '{}', line: {}, column: {}",
171                        location.file(),
172                        location.line(),
173                        location.column()
174                    )
175                } else {
176                    "unknown location".to_string()
177                };
178
179                // 输出详细的panic信息
180                error!(
181                    "PANIC in TaskExecutor worker thread!\n\
182                     ┌─ Thread Info ─────────────────────────────────────┐\n\
183                     │ Thread Name: {}\n\
184                     │ Core ID: {}\n\
185                     │ Thread ID: {:?}\n\
186                     ├─ Panic Details ──────────────────────────────────┤\n\
187                     │ Message: {}\n\
188                     │ Location: {}\n\
189                     └──────────────────────────────────────────────────┘",
190                    thread_name,
191                    core_id,
192                    thread.id(),
193                    panic_message,
194                    location_info
195                );
196
197                // 调用原来的hook以保持默认行为
198                old_hook(panic_info);
199            }));
200
201            Self::run_worker_loop(rx, task_count, core_id);
202        });
203
204        TaskExecutor {
205            jobs: tx,
206            _handle,
207            count,
208            core: core.id,
209        }
210    }
211
212    /// 工作线程主循环
213    fn run_worker_loop(
214        rx: Receiver<Box<dyn FnOnce(&usize) + Send + 'static>>,
215        task_count: Arc<AtomicCell<i64>>,
216        core_id: usize,
217    ) {
218        #[cfg(feature = "thread_dispatch")]
219        {
220            let mut empty_count = 0;
221            loop {
222                match rx.try_recv() {
223                    Ok(job) => {
224                        job(&core_id);
225                        task_count.fetch_sub(1);
226                        empty_count = 0;
227                    }
228                    Err(TryRecvError::Empty) => {
229                        empty_count += 1;
230                        if empty_count > 1000 {
231                            empty_count = 0;
232                            // 空闲次数过多时,阻塞等待任务
233                            if let Ok(job) = rx.recv() {
234                                job(&core_id);
235                                task_count.fetch_sub(1);
236                            }
237                        }
238                    }
239                    Err(TryRecvError::Disconnected) => {
240                        error!("TaskExecutor disconnected: {}", core_id);
241                        break;
242                    }
243                }
244            }
245        }
246
247        #[cfg(not(feature = "thread_dispatch"))]
248        loop {
249            if let Ok(job) = rx.try_recv() {
250                job(&core_id);
251                task_count.fetch_sub(1);
252            }
253        }
254    }
255
256    /// 提交任务到线程池
257    #[inline(always)]
258    pub fn spawn<F>(&self, f: F)
259    where
260        F: FnOnce(&usize) + Send + 'static,
261    {
262        self.count.fetch_add(1);
263        
264        if let Err(e) = self.jobs.send(Box::new(f)) {
265            error!("TaskExecutor send error: {:?}", e);
266            // 如果发送失败,直接在当前线程执行
267            e.0(&0);
268            self.count.fetch_sub(1);
269        }
270    }
271
272    /// 尝试提交任务到线程池(非阻塞)
273    /// 仅在启用 thread_task_bounded feature 时提供此方法
274    /// 返回 true 表示成功提交,false 表示队列已满
275    #[cfg(all(feature = "thread_task_bounded", feature = "crossbeam_channel"))]
276    #[inline(always)]
277    pub fn try_spawn<F>(&self, f: F) -> bool
278    where
279        F: FnOnce(&usize) + Send + 'static,
280    {
281        match self.jobs.try_send(Box::new(f)) {
282            Ok(_) => {
283                self.count.fetch_add(1);
284                true
285            }
286            Err(_) => false, // 队列已满或通道已关闭
287        }
288    }
289
290    /// 尝试提交任务到线程池(非阻塞)- std::sync::mpsc 版本
291    /// 仅在启用 thread_task_bounded feature 且使用 std mpsc 时提供此方法
292    /// 返回 true 表示成功提交,false 表示队列已满
293    #[cfg(all(feature = "thread_task_bounded", not(feature = "crossbeam_channel")))]
294    #[inline(always)]
295    pub fn try_spawn<F>(&self, f: F) -> bool
296    where
297        F: FnOnce(&usize) + Send + 'static,
298    {
299        match self.jobs.try_send(Box::new(f)) {
300            Ok(_) => {
301                self.count.fetch_add(1);
302                true
303            }
304            Err(_) => false, // 队列已满或通道已关闭
305        }
306    }
307}