fast_able/fast_thread_pool/
task_executor_crossbeam.rs1use std::{
2 sync::{mpsc::Sender, Arc, LazyLock},
3 thread,
4};
5
6use core_affinity::CoreId;
7use crossbeam::{atomic::AtomicCell, queue};
8
9pub struct TaskExecutor {
11 jobs: crossbeam::channel::Sender<Box<dyn FnOnce(&usize) + Send + 'static>>,
12 _handle: thread::JoinHandle<()>,
13 pub count: Arc<AtomicCell<i64>>,
14 core: usize,
15}
16
17impl std::fmt::Debug for TaskExecutor {
18 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19 f.debug_struct("TaskExecutor")
20 .field("_handle", &self._handle)
21 .field("count", &self.count)
22 .finish()
23 }
24}
25
26impl TaskExecutor {
27 pub fn new(core: CoreId, realtime: i32) -> TaskExecutor {
29 let (tx, rx) = crossbeam::channel::unbounded::<Box<dyn FnOnce(&usize) + Send + 'static>>();
30
31 let count = Arc::new(AtomicCell::new(0_i64));
32
33 let count_c = count.clone();
34 let _handle = thread::spawn(move || {
35 super::set_core_affinity_and_realtime(core.id, realtime);
37 let ref core = core.id;
38
39 if cfg!(feature = "thread_dispatch") {
40 let mut n = 0;
41 loop {
42 if let Ok(job) = rx.try_recv() {
43 job(core);
44 count_c.fetch_sub(1);
45 n = 0;
46 } else {
47 n += 1;
48
49 if n > 1000 {
50 n = 0;
51 if let Ok(job) = rx.recv() {
53 job(core);
54 count_c.fetch_sub(1);
55 }
56 }
57 }
58 }
59 } else {
60 loop {
61 if let Ok(job) = rx.try_recv() {
62 job(core);
63 count_c.fetch_sub(1);
64 }
65 }
66 }
67 });
68
69 TaskExecutor {
70 jobs: tx,
71 _handle,
72 count,
73 core: core.id,
74 }
75 }
76
77 #[inline(always)]
78 pub fn spawn<F>(&self, f: F)
79 where
80 F: FnOnce(&usize),
81 F: Send + 'static,
82 {
83 self.count.fetch_add(1);
84 if let Err(e) = self.jobs.send(Box::new(f)) {
85 error!("TaskExecutor send error: {:?}", e);
86 }
87 }
88}