fast_able/fast_thread_pool/
task_executor.rs1use std::{sync::Arc, thread};
2
3use core_affinity::CoreId;
4use crossbeam::atomic::AtomicCell;
5
6#[cfg(feature = "crossbeam_channel")]
10pub use crossbeam::channel::{bounded, unbounded, Receiver, Sender, TryRecvError};
11
12#[cfg(not(feature = "crossbeam_channel"))]
13pub use std::sync::mpsc::{
14 channel as unbounded, sync_channel as bounded, Receiver, TryRecvError,
15};
16
17#[cfg(all(not(feature = "crossbeam_channel"), not(feature = "thread_task_bounded")))]
19pub use std::sync::mpsc::Sender;
20
21#[cfg(all(not(feature = "crossbeam_channel"), feature = "thread_task_bounded"))]
22pub use std::sync::mpsc::SyncSender as Sender;
23
24fn get_default_bounded_capacity() -> usize {
27 let cpu_count = num_cpus::get();
28 let capacity = (cpu_count * 100).max(128).min(4096);
29 capacity
30}
31
32pub struct TaskExecutor {
34 jobs: Sender<Box<dyn FnOnce(&usize) + Send + 'static>>,
35 _handle: thread::JoinHandle<()>,
36 pub count: Arc<AtomicCell<i64>>,
37 core: usize,
38}
39
40impl std::fmt::Debug for TaskExecutor {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 f.debug_struct("TaskExecutor")
43 .field("_handle", &self._handle)
44 .field("count", &self.count)
45 .field("core", &self.core)
46 .finish()
47 }
48}
49
50impl TaskExecutor {
51 pub fn new(core: CoreId, realtime: i32) -> TaskExecutor {
54 #[cfg(feature = "thread_task_bounded")]
56 let (tx, rx) = {
57 let capacity = get_default_bounded_capacity();
58 bounded::<Box<dyn FnOnce(&usize) + Send + 'static>>(capacity)
59 };
60
61 #[cfg(not(feature = "thread_task_bounded"))]
62 let (tx, rx) = unbounded::<Box<dyn FnOnce(&usize) + Send + 'static>>();
63
64 let count = Arc::new(AtomicCell::new(0_i64));
65 let task_count = count.clone();
66
67 let _handle = thread::spawn(move || {
68 super::set_core_affinity_and_realtime(core.id, realtime);
70 let core_id = core.id;
71
72 let old_hook = std::panic::take_hook();
74 std::panic::set_hook(Box::new(move |panic_info| {
75 let thread = std::thread::current();
76 let thread_name = thread.name().unwrap_or("unnamed");
77
78 let panic_message = if let Some(s) = panic_info.payload().downcast_ref::<&str>() {
80 s.to_string()
81 } else if let Some(s) = panic_info.payload().downcast_ref::<String>() {
82 s.clone()
83 } else {
84 format!(
85 "Unknown panic payload type: {:?}",
86 panic_info.payload().type_id()
87 )
88 };
89
90 let location_info = if let Some(location) = panic_info.location() {
92 format!(
93 "file: '{}', line: {}, column: {}",
94 location.file(),
95 location.line(),
96 location.column()
97 )
98 } else {
99 "unknown location".to_string()
100 };
101
102 error!(
104 "PANIC in TaskExecutor worker thread!\n\
105 ┌─ Thread Info ─────────────────────────────────────┐\n\
106 │ Thread Name: {}\n\
107 │ Core ID: {}\n\
108 │ Thread ID: {:?}\n\
109 ├─ Panic Details ──────────────────────────────────┤\n\
110 │ Message: {}\n\
111 │ Location: {}\n\
112 └──────────────────────────────────────────────────┘",
113 thread_name,
114 core_id,
115 thread.id(),
116 panic_message,
117 location_info
118 );
119
120 old_hook(panic_info);
122 }));
123
124 Self::run_worker_loop(rx, task_count, core_id);
125 });
126
127 TaskExecutor {
128 jobs: tx,
129 _handle,
130 count,
131 core: core.id,
132 }
133 }
134
135 #[cfg(feature = "thread_task_bounded")]
139 pub fn new_with_capacity(core: CoreId, capacity: usize, realtime: i32) -> TaskExecutor {
140 let (tx, rx) = bounded::<Box<dyn FnOnce(&usize) + Send + 'static>>(capacity);
141 let count = Arc::new(AtomicCell::new(0_i64));
142 let task_count = count.clone();
143
144 let _handle = thread::spawn(move || {
145 super::set_core_affinity_and_realtime(core.id, realtime);
147 let core_id = core.id;
148
149 let old_hook = std::panic::take_hook();
151 std::panic::set_hook(Box::new(move |panic_info| {
152 let thread = std::thread::current();
153 let thread_name = thread.name().unwrap_or("unnamed");
154
155 let panic_message = if let Some(s) = panic_info.payload().downcast_ref::<&str>() {
157 s.to_string()
158 } else if let Some(s) = panic_info.payload().downcast_ref::<String>() {
159 s.clone()
160 } else {
161 format!(
162 "Unknown panic payload type: {:?}",
163 panic_info.payload().type_id()
164 )
165 };
166
167 let location_info = if let Some(location) = panic_info.location() {
169 format!(
170 "file: '{}', line: {}, column: {}",
171 location.file(),
172 location.line(),
173 location.column()
174 )
175 } else {
176 "unknown location".to_string()
177 };
178
179 error!(
181 "PANIC in TaskExecutor worker thread!\n\
182 ┌─ Thread Info ─────────────────────────────────────┐\n\
183 │ Thread Name: {}\n\
184 │ Core ID: {}\n\
185 │ Thread ID: {:?}\n\
186 ├─ Panic Details ──────────────────────────────────┤\n\
187 │ Message: {}\n\
188 │ Location: {}\n\
189 └──────────────────────────────────────────────────┘",
190 thread_name,
191 core_id,
192 thread.id(),
193 panic_message,
194 location_info
195 );
196
197 old_hook(panic_info);
199 }));
200
201 Self::run_worker_loop(rx, task_count, core_id);
202 });
203
204 TaskExecutor {
205 jobs: tx,
206 _handle,
207 count,
208 core: core.id,
209 }
210 }
211
212 fn run_worker_loop(
214 rx: Receiver<Box<dyn FnOnce(&usize) + Send + 'static>>,
215 task_count: Arc<AtomicCell<i64>>,
216 core_id: usize,
217 ) {
218 #[cfg(feature = "thread_dispatch")]
219 {
220 let mut empty_count = 0;
221 loop {
222 match rx.try_recv() {
223 Ok(job) => {
224 job(&core_id);
225 task_count.fetch_sub(1);
226 empty_count = 0;
227 }
228 Err(TryRecvError::Empty) => {
229 empty_count += 1;
230 if empty_count > 1000 {
231 empty_count = 0;
232 if let Ok(job) = rx.recv() {
234 job(&core_id);
235 task_count.fetch_sub(1);
236 }
237 }
238 }
239 Err(TryRecvError::Disconnected) => {
240 error!("TaskExecutor disconnected: {}", core_id);
241 break;
242 }
243 }
244 }
245 }
246
247 #[cfg(not(feature = "thread_dispatch"))]
248 loop {
249 if let Ok(job) = rx.try_recv() {
250 job(&core_id);
251 task_count.fetch_sub(1);
252 }
253 }
254 }
255
256 #[inline(always)]
258 pub fn spawn<F>(&self, f: F)
259 where
260 F: FnOnce(&usize) + Send + 'static,
261 {
262 self.count.fetch_add(1);
263
264 if let Err(e) = self.jobs.send(Box::new(f)) {
265 error!("TaskExecutor send error: {:?}", e);
266 e.0(&0);
268 self.count.fetch_sub(1);
269 }
270 }
271
272 #[cfg(all(feature = "thread_task_bounded", feature = "crossbeam_channel"))]
276 #[inline(always)]
277 pub fn try_spawn<F>(&self, f: F) -> bool
278 where
279 F: FnOnce(&usize) + Send + 'static,
280 {
281 match self.jobs.try_send(Box::new(f)) {
282 Ok(_) => {
283 self.count.fetch_add(1);
284 true
285 }
286 Err(_) => false, }
288 }
289
290 #[cfg(all(feature = "thread_task_bounded", not(feature = "crossbeam_channel")))]
294 #[inline(always)]
295 pub fn try_spawn<F>(&self, f: F) -> bool
296 where
297 F: FnOnce(&usize) + Send + 'static,
298 {
299 match self.jobs.try_send(Box::new(f)) {
300 Ok(_) => {
301 self.count.fetch_add(1);
302 true
303 }
304 Err(_) => false, }
306 }
307}