fast_able/fast_thread_pool/
task_executor_crossbeam.rs

1use std::{
2    sync::{mpsc::Sender, Arc, LazyLock},
3    thread,
4};
5
6use core_affinity::CoreId;
7use crossbeam::{atomic::AtomicCell, queue};
8
9/// 简易线程池
10pub struct TaskExecutor {
11    jobs: crossbeam::channel::Sender<Box<dyn FnOnce(&usize) + Send + 'static>>,
12    _handle: thread::JoinHandle<()>,
13    pub count: Arc<AtomicCell<i64>>,
14    core: usize,
15}
16
17impl std::fmt::Debug for TaskExecutor {
18    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19        f.debug_struct("TaskExecutor")
20            .field("_handle", &self._handle)
21            .field("count", &self.count)
22            .finish()
23    }
24}
25
26impl TaskExecutor {
27    /// realtime 开启实时内核 // 优先级(1-99,越高越优先); 输入 -1 时, 不开启
28    pub fn new(core: CoreId, realtime: i32) -> TaskExecutor {
29        let (tx, rx) = crossbeam::channel::unbounded::<Box<dyn FnOnce(&usize) + Send + 'static>>();
30
31        let count = Arc::new(AtomicCell::new(0_i64));
32
33        let count_c = count.clone();
34        let _handle = thread::spawn(move || {
35            // 绑核 and 开启实时内核
36            super::set_core_affinity_and_realtime(core.id, realtime);
37            let ref core = core.id;
38
39            if cfg!(feature = "thread_dispatch") {
40                let mut n = 0;
41                loop {
42                    if let Ok(job) = rx.try_recv() {
43                        job(core);
44                        count_c.fetch_sub(1);
45                        n = 0;
46                    } else {
47                        n += 1;
48
49                        if n > 1000 {
50                            n = 0;
51                            // if let Ok(job) = rx.recv_timeout(std::time::Duration::from_micros(500))
52                            if let Ok(job) = rx.recv() {
53                                job(core);
54                                count_c.fetch_sub(1);
55                            }
56                        }
57                    }
58                }
59            } else {
60                loop {
61                    if let Ok(job) = rx.try_recv() {
62                        job(core);
63                        count_c.fetch_sub(1);
64                    }
65                }
66            }
67        });
68
69        TaskExecutor {
70            jobs: tx,
71            _handle,
72            count,
73            core: core.id,
74        }
75    }
76
77    #[inline(always)]
78    pub fn spawn<F>(&self, f: F)
79    where
80        F: FnOnce(&usize),
81        F: Send + 'static,
82    {
83        self.count.fetch_add(1);
84        if let Err(e) = self.jobs.send(Box::new(f)) {
85            error!("TaskExecutor send error: {:?}", e);
86        }
87    }
88}