fast_able/fast_thread_pool/
pool.rs

1use crate::fast_thread_pool::{get_core_skip, CORES};
2use crate::stock_pool::StockPool;
3use crossbeam::atomic::AtomicCell;
4use spin::Mutex;
5use std::{
6    fs,
7    sync::{Arc, LazyLock},
8};
9
10use super::TaskExecutor;
11
12pub const FILE_CORE_AFFINITY: &str = "./.core_affinity";
13
14/// 如果是实时系统, 则把"realtime_system"写入文件
15pub fn init(is_realtime_system: bool) {
16    _ = fs::remove_file(FILE_CORE_AFFINITY);
17    warn!(
18        "thread_mod init; remove_file core_affinity: {}, is_realtime_system: {is_realtime_system}",
19        FILE_CORE_AFFINITY
20    );
21
22    if is_realtime_system {
23        fs::write(FILE_CORE_AFFINITY, "realtime_system,\n").unwrap();
24    }
25}
26
27
28/// 设置绑核
29/// 设置实时内核
30/// 优先级(1-99,越高越优先); 输入 -1 时, 不开启
31pub fn set_core_affinity_and_realtime(core_u: usize, _realtime: i32) -> bool {
32    let ALL_CORES = CORES.clone();
33
34    let core = ALL_CORES.iter().find(|&&core| core.id == core_u);
35
36    let core = match core {
37        Some(core) => core,
38        None => match ALL_CORES.last() {
39            Some(core) => core,
40            None => return false,
41        },
42    };
43
44    let b = core_affinity::set_for_current(core.clone());
45
46    #[allow(unused_mut)]
47    let mut b2 = true;
48
49    #[allow(unused_mut)]
50    let mut realtime_msg = "";
51
52    static _IS_REALTIME_SYSTEM: LazyLock<bool> = LazyLock::new(|| {
53        std::fs::read_to_string(FILE_CORE_AFFINITY)
54            .unwrap_or_else(|_| "".to_string())
55            .contains("realtime_system")
56    });
57
58    #[cfg(target_os = "linux")]
59    if b && _realtime > 0 && *_IS_REALTIME_SYSTEM {
60        use libc::{EINVAL, EPERM, ESRCH, SCHED_RR, sched_param, sched_setscheduler};
61        let param = sched_param {
62            sched_priority: _realtime,
63        };
64        unsafe {
65            // SCHED_FIFO, SCHED_RR
66            let ret = sched_setscheduler(0, SCHED_RR, &param);
67            if ret != 0 {
68                /*
69                let errno = *libc::__errno_location();
70                let err_msg2 = match errno {
71                    EPERM => "权限不足 (EPERM) - 检查:\n1. getcap -v ./your_program 输出是否包含 cap_sys_nice\n2. 文件所有者是否正确\n3. 是否需要重新登录使组权限生效",
72                    EINVAL => "参数无效 (EINVAL) - 可能原因:\n1. 优先级超出范围(1-99)\n2. 调度策略无效",
73                    ESRCH => "指定的进程不存在 (ESRCH)",
74                    _ => "未知错误",
75                };
76                realtime_msg = format!(
77                    "set_realtime: false, 设置实时内核调度器失败: errno={errno}, UID={}, EUID={}, 设置优先级={}, 错误信息={}",
78                    libc::getuid(),
79                    libc::geteuid(),
80                    _realtime,
81                    err_msg2
82                );
83                */
84                realtime_msg = ", set_realtime: false";
85                b2 = false;
86            } else {
87                realtime_msg = ", set_realtime: true";
88            }
89        }
90    }
91
92    debug!("thread core: {core_u}, set_core_affinity: {b}{realtime_msg}");
93    b && b2
94}
95
96/// 线程池
97/// 通过线程池来管理线程
98/// 此线程池的线程个数默认为cpu核心数的4分之一; 任务默认提交在默认线程池;
99/// 通过一个api开启独享高性能模式; 独享高性能模式下,线程池的线程个数最多为cpu核心数的5分之1, 比如128核的cpu, 线程池的线程个数最多为25个;
100/// theads_高性能模式: 通过查看当前线程的任务数, 如果任务数10毫秒内任务数是其它线程中最少的, 则将任务分配给该线程;
101/// 使用 core_affinity 获得cpu核心数
102/// 如果已经有一个股票的任务在一个线程中执行, 则将任务分配给该线程; 如果该股票的任务全部执行完毕, 则将任务分配给任务数最少的线程;
103pub struct ThreadPool {
104    pub threads_share: Vec<TaskExecutor>,
105    pub threads_fast: crate::vec::SyncVec<TaskExecutor>,
106
107    pub switch_thread_index: Mutex<i32>,
108
109    // pub 高性能模式_任务数最少的线程: AtomicCell<usize>,
110    /// 记录高性能模式下, 当前股票代码在哪个线程中执行
111    pub threads_fast_idx: StockPool<AtomicCell<Option<usize>>>,
112
113    /// 记录高性能模式下, 当前股票代码正在执行的任务数
114    // 高性能模式_记录_任务数: StockPool<Arc<()>>,
115
116    /// 防止同一支票并行运行
117    pub stock_lock: StockPool<Arc<spin::Mutex<()>>>,
118    // 多少个核心
119    // core_num: Vec<core_affinity::CoreId>,
120    // current_core: AtomicCell<i32>,
121}
122
123// pub struct Pto<T: Debug + Default>(AtomicCell<T>);
124// impl<T: Debug + Default> Deref for Pto<T> {
125//     type Target = AtomicCell<T>;
126//     fn deref(&self) -> &Self::Target {
127//         &self.0
128//     }
129// }
130// impl<T: Debug + Default + Copy> Debug for Pto<T> {
131//     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132//         f.write_str(&format!("{:?}", self.load()))
133//     }
134// }
135// impl<T: Debug + Default> Default for Pto<T> {
136//     fn default() -> Self {
137//         Pto(AtomicCell::new(T::default()))
138//     }
139// }
140
141impl ThreadPool {
142    pub fn new(mut cpu_fraction_fast: usize, cpu_fraction_share: usize) -> ThreadPool {
143        let cpu_num = CORES.len();
144
145        let skip = get_core_skip();
146        let cpu_num = cpu_num / skip;
147        let mut cpu_fraction_share_num = cpu_num / cpu_fraction_share;
148
149        // 修复: 确保线程数至少为1
150        if cpu_fraction_share_num == 0 {
151            cpu_fraction_share_num = 1;
152            warn!(
153                "cpu_fraction_share_num({cpu_fraction_share_num}) = cpu_num({cpu_num}) / cpu_fraction_share({cpu_fraction_share}), set cpu_fraction_share_num to 1"
154            );
155        }
156
157        // 使用 use_last_core2 获取共享线程使用的核心
158        debug!("threads_share cpu count: {cpu_fraction_share_num}, skip: {}", get_core_skip());
159        let mut threads_共享 = Vec::with_capacity(cpu_fraction_share_num);
160        
161        // 获取共享线程使用的核心
162        let share_cores = super::use_last_core2("share_thread", cpu_fraction_share_num);
163        
164        // 创建共享线程
165        for core_id in &share_cores {
166            let core = core_affinity::CoreId { id: *core_id };
167            threads_共享.push(TaskExecutor::new(core, -1));
168        }
169        
170        // 创建快速线程
171        if cpu_fraction_fast == 0 {
172            cpu_fraction_fast = cpu_num;
173        }
174        let mut cpu_fraction_fast_num = cpu_num / cpu_fraction_fast;
175        if cpu_fraction_fast_num == 0 {
176            cpu_fraction_fast_num = 1;
177            warn!(
178                "cpu_fraction_fast_num({cpu_fraction_fast_num}) = cpu_num({cpu_num}) / cpu_fraction_fast({cpu_fraction_fast}), set cpu_fraction_fast_num to 1"
179            );
180        }
181        debug!("theads_fraction_fast cpu count: {}", cpu_fraction_fast_num);
182        let theads_高性能模式 = crate::vec::SyncVec::with_capacity(cpu_fraction_fast_num);
183
184        // 获取快速线程使用的核心
185        let fast_cores = super::use_last_core2("fast_thread", cpu_fraction_fast_num);
186        
187        // 创建快速线程
188        for core_id in &fast_cores {
189            let core = core_affinity::CoreId { id: *core_id };
190            theads_高性能模式.push(TaskExecutor::new(core, 49));
191        }
192        
193        debug!("fast_thread_pool_bind_cores: share cores: {:?}, fast cores: {:?}", share_cores, fast_cores);
194
195        let r = ThreadPool {
196            threads_share: threads_共享,
197            threads_fast: theads_高性能模式,
198            // 高性能模式_任务数最少的线程: 0.into(),
199            threads_fast_idx: StockPool::new_default(),
200            // 高性能模式_记录_任务数: StockPool::new(),
201            stock_lock: StockPool::new_default(),
202            switch_thread_index: (-1).into(),
203            // core_num,
204            // current_core: -1.into(),
205        };
206
207        // let r1 = r.clone();
208        // std::thread::spawn(move || loop {
209        //     r1.loop_任务数最少的线程();
210        //     std::thread::sleep(std::time::Duration::from_millis(10));
211        // });
212        r
213    }
214
215    /// 获取拥有最少任务的线程索引
216    ///
217    /// 本函数遍历快速线程池中的线程,寻找任务计数最少的线程。
218    /// 如果找到一个任务计数为0的线程,它将立即返回该线程的索引,
219    /// 因为这表示该线程目前没有任务。否则,函数将返回任务计数最少的线程索引。
220    /// 这个信息用于调度新任务到拥有最少任务的线程,以平衡线程间的工作负载。
221    /*
222    pub fn count_task_min(&self) -> usize {
223        // 初始化最小任务计数为第一个线程的任务计数,最小索引为0
224        let mut min_count = self.theads_fast[0].count.load();
225        let mut min_index = 0;
226
227        // 遍历快速线程池中的线程
228        for (i, thread) in self.theads_fast.iter().enumerate() {
229            let count = thread.count.load();
230
231            // 如果找到一个任务计数为0的线程,立即返回其索引
232            if count == 0 {
233                min_index = i;
234                break;
235            }
236
237            // 如果当前线程的任务计数少于最小任务计数,更新最小任务计数和索引
238            if count < min_count {
239                min_count = count;
240                min_index = i;
241            }
242        }
243
244        // 返回任务计数最少的线程索引
245        min_index
246    }
247    */
248
249    /// 获取下一个要切换到的线程的索引,以实现线程之间的任务平衡。
250    ///
251    /// 这个方法通过循环的方式选择下一个线程索引,以确保任务能够均匀地分配给每个线程。
252    /// 它使用了一个互斥锁来保证在多线程环境下对索引的访问是安全的。
253    ///
254    /// # 返回值
255    /// 根据当前线程池的状态,计算并返回下一个应该执行任务的线程索引。
256    /// 这个方法旨在平衡线程间的任务分配,避免某个线程过载而其他线程闲置的情况。
257    ///
258    /// 参数 `i7` 作为一个辅助的计算参数,用于在无法立即获得锁时决定返回哪个线程索引。
259    ///
260    /// 返回值是一个枚举 `IdxStatus`,它可以指示线程应该记住当前计算出的索引(`Remember`),
261    /// 或者由于锁的竞争失败而丢弃当前的计算并使用另一个索引(`Discard`)。
262    /// 返回当前选择的线程索引。
263    pub fn count_task_min(&self, i7: i32) -> IdxStatus {
264        // 获取线程池中线程的数量,用于后续计算下一个任务线程的索引。
265        // 获取线程池中线程的数量
266        let len = self.threads_fast.len();
267
268        // 尝试获取用于控制任务分配的索引互斥锁,如果无法立即获得锁,则根据 `i7` 返回一个备选索引。
269        // 获取用于控制线程切换的索引的互斥锁
270        let mut min_index: spin::MutexGuard<i32> = match self.switch_thread_index.try_lock() {
271            Some(mutex) => mutex,
272            None => return IdxStatus::Discard(i7 as usize % len),
273        };
274
275        // 如果当前索引为0,将其设置为最大值,否则递减索引,以实现循环分配策略。
276        // 这样做是为了实现循环访问,避免索引越界
277        if *min_index == 0 || *min_index == -1 {
278            *min_index = len as i32 - 1;
279        } else {
280            *min_index -= 1;
281        }
282
283        // 返回之前复制的索引值,指示线程应该记住这个索引以供下次使用。
284        // 返回复制的索引值
285        let r = *min_index as usize;
286        self.threads_fast_idx[i7].store(Some(r));
287        IdxStatus::Remember(r)
288    }
289
290    #[inline(always)]
291    pub fn spawn<F>(&self, i7: i32, f: F)
292    where
293        F: FnOnce(),
294        F: Send + 'static,
295    {
296        let _lock = self.stock_lock[i7].clone();
297        let index = i7 % self.threads_share.len() as i32;
298        self.threads_share[index as usize].spawn(move |_core| {
299            let _lock = _lock.lock();
300            // #[cfg(debug_assertions)]
301            // print!("高性能模式, 共享线程({i7}): {index} ");
302            f();
303            drop(_lock);
304        });
305    }
306
307    #[inline(always)]
308    pub fn spawn_fast<F>(&self, i7: i32, f: F)
309    where
310        F: FnOnce(),
311        F: Send + 'static,
312    {
313        let mut on_fast_idx = -1;
314
315        // 找最少任务数的线程
316        #[cfg(not(feature = "thread_dispatch"))]
317        let thread_idx = self.threads_fast_idx[i7].load().unwrap_or_else(|| {
318            let min = self.count_task_min(i7);
319            let idx = min.get_idx();
320            if let IdxStatus::Remember(idx) = &min {
321                on_fast_idx = *idx as i32;
322            }
323            idx
324        });
325
326        // 如果当前任务堆积小于5个, 则使用当前线程; 否则就去找最少任务数的线程
327        // 有任务调度的线程方法, 如果有任务堆积则通过找任务数最少的线程来提交任务
328        #[cfg(feature = "thread_dispatch")]
329        let thread_idx = match self.threads_fast_idx[i7].load() {
330            Some(i) if self.threads_fast[i].count.load() < 1000 => i,
331            _ => {
332                let min = self.count_task_min(i7);
333                let idx = min.get_idx();
334                if let IdxStatus::Remember(idx) = &min {
335                    on_fast_idx = *idx as i32;
336                }
337                idx
338            }
339        };
340
341        // 提交任务
342        let lock = self.stock_lock[i7].clone();
343        self.threads_fast[thread_idx].spawn(move |_core| {
344            let lock_v = lock.lock();
345            // print!(" {i7} theads_fast: {thread_idx} ");
346            f();
347            drop(lock_v);
348            if on_fast_idx != -1 {
349                // debug!("on_fast thread; i7: {i7}, cpu: {_core}");
350            }
351        });
352    }
353
354    #[inline(always)]
355    pub fn spawn_is_fast<F>(&self, i7: i32, is_fast: bool, f: F)
356    where
357        F: FnOnce(),
358        F: Send + 'static,
359    {
360        if is_fast {
361            self.spawn_fast(i7, f);
362        } else {
363            self.spawn(i7, f);
364        }
365    }
366}
367
368pub enum IdxStatus {
369    // 记住索引
370    Remember(usize),
371    // 丢弃索引
372    Discard(usize),
373}
374
375impl IdxStatus {
376    pub fn get_idx(&self) -> usize {
377        match self {
378            IdxStatus::Remember(idx) => *idx,
379            IdxStatus::Discard(idx) => *idx,
380        }
381    }
382}
383
384#[test]
385fn _test_pool() {
386    use crate::fast_thread_pool::*;
387    
388    unsafe { std::env::set_var("RUST_LOG", "debug") };
389    env_logger::init();
390
391    init(true);
392    // let _core = use_last_core2("use_last_core_多个核心", 40); 
393    let _core = use_last_core("use_last_core"); 
394    // let _core = use_last_core2("use_last_core_多个核心", 5); 
395    info!("use_last_core_多个核心: {:?}", _core);
396    // let _core2 = use_last_core();
397    // let _core = use_last_core();
398    // let _lite1 = super::ThreadPoolLite::new(); // TODO: 1
399    // let num: super::ThreadPoolConstNum<5> = const_num::ThreadPoolConstNum::new(); // TODO: 3
400    let pool = ThreadPool::new(2, 4); // TODO: 4
401    let _core = use_last_core2("use_last_core_多个核心2", 5); 
402    let _core2 = use_last_core("use_last_core2"); 
403    let _core3 = use_last_core("use_last_core3"); 
404    let _core4 = use_last_core("use_last_core4"); 
405    let _core5 = use_last_core("use_last_core5");
406    let _core6 = use_last_core("use_last_core6");
407    std::thread::sleep(std::time::Duration::from_millis(200));
408
409    let count = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
410    let elapsed_total = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
411    let elapsed_exp = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
412
413    let count_c = count.clone();
414    let elapsed_total_c = elapsed_total.clone();
415    let elapsed_exp_c = elapsed_exp.clone();
416    std::thread::spawn(move || {
417        loop {
418            std::thread::sleep(std::time::Duration::from_secs(3));
419            let count = count_c.fetch_and(0);
420            let elapsed_total = elapsed_total_c.fetch_and(0);
421            let elapsed_exp = elapsed_exp_c.fetch_and(0);
422            info!(
423                "3秒钟执行任务数: {}, 所有任务耗时(微秒): {}, 平均耗时: {}, 耗时任务数(100微秒): {}, 耗时任务数占比: {:.0}/10000",
424                count,
425                elapsed_total,
426                elapsed_total / count,
427                elapsed_exp,
428                elapsed_exp as f64 / count as f64 * 10000.0,
429            );
430        }
431    });
432
433    loop {
434        for i in 0..100 {
435            // std::thread::sleep(std::time::Duration::from_micros(i % 50));
436            let time_hs = std::time::Instant::now();
437            let count = count.clone();
438            let elapsed_total = elapsed_total.clone();
439            let elapsed_exp = elapsed_exp.clone();
440            // spin::Barrier::new(i % 10).wait();
441            // spin::relax::Loop::(Duration::from_micros(i % 50));
442            let i7 = if i % 3 == 0 { 1000001 } else { 1000002 };
443            pool.spawn_is_fast(i7, true, move || {
444                let micros = time_hs.elapsed().as_micros();
445                count.fetch_add(1);
446                elapsed_total.fetch_add(micros as i64);
447                if micros > 100 {
448                    elapsed_exp.fetch_add(1);
449                }
450            });
451        }
452        std::thread::sleep(std::time::Duration::from_micros(110));
453    }
454    std::thread::sleep(std::time::Duration::from_secs(9999));
455}