fast_able/fast_thread_pool/
pool.rs

1use crate::fast_thread_pool::{get_core_skip, CORES};
2use crate::stock_pool::StockPool;
3use crossbeam::atomic::AtomicCell;
4use spin::Mutex;
5use std::{
6    fs,
7    sync::{Arc, LazyLock},
8};
9
10use super::TaskExecutor;
11
12pub const FILE_CORE_AFFINITY: &str = "./.core_affinity";
13
14/// If it is a real-time system, write "realtime_system" to the file
15/// 如果是实时系统, 则把"realtime_system"写入文件
16pub fn init(is_realtime_system: bool) {
17    _ = fs::remove_file(FILE_CORE_AFFINITY);
18    warn!(
19        "thread_mod init; remove_file core_affinity: {}, is_realtime_system: {is_realtime_system}",
20        FILE_CORE_AFFINITY
21    );
22
23    if is_realtime_system {
24        fs::write(FILE_CORE_AFFINITY, "realtime_system,\n").unwrap();
25    }
26}
27
28
29/// Set core affinity
30/// Set real-time kernel
31/// Priority (1-99, higher is more prioritized); Input -1 to disable
32/// 设置绑核
33/// 设置实时内核
34/// 优先级(1-99,越高越优先); 输入 -1 时, 不开启
35pub fn set_core_affinity_and_realtime(core_u: usize, _realtime: i32) -> bool {
36    let ALL_CORES = CORES.clone();
37
38    let core = ALL_CORES.iter().find(|&&core| core.id == core_u);
39
40    let core = match core {
41        Some(core) => core,
42        None => match ALL_CORES.last() {
43            Some(core) => core,
44            None => return false,
45        },
46    };
47
48    let b = core_affinity::set_for_current(core.clone());
49
50    #[allow(unused_mut)]
51    let mut b2 = true;
52
53    #[allow(unused_mut)]
54    let mut realtime_msg = "";
55
56    static _IS_REALTIME_SYSTEM: LazyLock<bool> = LazyLock::new(|| {
57        std::fs::read_to_string(FILE_CORE_AFFINITY)
58            .unwrap_or_else(|_| "".to_string())
59            .contains("realtime_system")
60    });
61
62    #[cfg(target_os = "linux")]
63    if b && _realtime > 0 && *_IS_REALTIME_SYSTEM {
64        use libc::{EINVAL, EPERM, ESRCH, SCHED_RR, sched_param, sched_setscheduler};
65        let param = sched_param {
66            sched_priority: _realtime,
67        };
68        unsafe {
69            // SCHED_FIFO, SCHED_RR
70            let ret = sched_setscheduler(0, SCHED_RR, &param);
71            if ret != 0 {
72                /*
73                let errno = *libc::__errno_location();
74                let err_msg2 = match errno {
75                    EPERM => "权限不足 (EPERM) - 检查:\n1. getcap -v ./your_program 输出是否包含 cap_sys_nice\n2. 文件所有者是否正确\n3. 是否需要重新登录使组权限生效",
76                    EINVAL => "参数无效 (EINVAL) - 可能原因:\n1. 优先级超出范围(1-99)\n2. 调度策略无效",
77                    ESRCH => "指定的进程不存在 (ESRCH)",
78                    _ => "未知错误",
79                };
80                realtime_msg = format!(
81                    "set_realtime: false, 设置实时内核调度器失败: errno={errno}, UID={}, EUID={}, 设置优先级={}, 错误信息={}",
82                    libc::getuid(),
83                    libc::geteuid(),
84                    _realtime,
85                    err_msg2
86                );
87                */
88                realtime_msg = ", set_realtime: false";
89                b2 = false;
90            } else {
91                realtime_msg = ", set_realtime: true";
92            }
93        }
94    }
95
96    debug!("thread core: {core_u}, set_core_affinity: {b}{realtime_msg}");
97    b && b2
98}
99
100/// Thread pool
101/// Manage threads through a thread pool
102/// The default number of threads in this thread pool is 1/4 of the CPU cores; tasks are submitted to the default thread pool by default;
103/// Enable exclusive high-performance mode through an API; In exclusive high-performance mode, the number of threads in the thread pool is at most 1/5 of the CPU cores. For example, for a 128-core CPU, the thread pool has at most 25 threads;
104/// threads_high_performance_mode: By checking the number of tasks in the current thread, if the number of tasks within 10 milliseconds is the least among other threads, the task is assigned to that thread;
105/// 线程池
106/// 通过线程池来管理线程
107/// 此线程池的线程个数默认为cpu核心数的4分之一; 任务默认提交在默认线程池;
108/// 通过一个api开启独享高性能模式; 独享高性能模式下,线程池的线程个数最多为cpu核心数的5分之1, 比如128核的cpu, 线程池的线程个数最多为25个;
109/// theads_高性能模式: 通过查看当前线程的任务数, 如果任务数10毫秒内任务数是其它线程中最少的, 则将任务分配给该线程;
110/// 使用 core_affinity 获得cpu核心数
111/// 如果已经有一个股票的任务在一个线程中执行, 则将任务分配给该线程; 如果该股票的任务全部执行完毕, 则将任务分配给任务数最少的线程;
112pub struct ThreadPool {
113    pub threads_share: Vec<TaskExecutor>,
114    pub threads_fast: crate::vec::SyncVec<TaskExecutor>,
115
116    pub switch_thread_index: Mutex<i32>,
117
118    // pub 高性能模式_任务数最少的线程: AtomicCell<usize>,
119    /// 记录高性能模式下, 当前股票代码在哪个线程中执行
120    pub threads_fast_idx: StockPool<AtomicCell<Option<usize>>>,
121
122    /// 记录高性能模式下, 当前股票代码正在执行的任务数
123    // 高性能模式_记录_任务数: StockPool<Arc<()>>,
124
125    /// 防止同一支票并行运行
126    pub stock_lock: StockPool<Arc<spin::Mutex<()>>>,
127    // 多少个核心
128    // core_num: Vec<core_affinity::CoreId>,
129    // current_core: AtomicCell<i32>,
130}
131
132// pub struct Pto<T: Debug + Default>(AtomicCell<T>);
133// impl<T: Debug + Default> Deref for Pto<T> {
134//     type Target = AtomicCell<T>;
135//     fn deref(&self) -> &Self::Target {
136//         &self.0
137//     }
138// }
139// impl<T: Debug + Default + Copy> Debug for Pto<T> {
140//     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141//         f.write_str(&format!("{:?}", self.load()))
142//     }
143// }
144// impl<T: Debug + Default> Default for Pto<T> {
145//     fn default() -> Self {
146//         Pto(AtomicCell::new(T::default()))
147//     }
148// }
149
150impl ThreadPool {
151    pub fn new(mut cpu_fraction_fast: usize, cpu_fraction_share: usize) -> ThreadPool {
152        let cpu_num = CORES.len();
153
154        let skip = get_core_skip();
155        let cpu_num = cpu_num / skip;
156        let mut cpu_fraction_share_num = cpu_num / cpu_fraction_share;
157
158        // 修复: 确保线程数至少为1
159        if cpu_fraction_share_num == 0 {
160            cpu_fraction_share_num = 1;
161            warn!(
162                "cpu_fraction_share_num({cpu_fraction_share_num}) = cpu_num({cpu_num}) / cpu_fraction_share({cpu_fraction_share}), set cpu_fraction_share_num to 1"
163            );
164        }
165
166        // 使用 use_last_core2 获取共享线程使用的核心
167        debug!("threads_share cpu count: {cpu_fraction_share_num}, skip: {}", get_core_skip());
168        let mut threads_共享 = Vec::with_capacity(cpu_fraction_share_num);
169        
170        // 获取共享线程使用的核心
171        let share_cores = super::use_last_core2("share_thread", cpu_fraction_share_num);
172        
173        // 创建共享线程
174        for core_id in &share_cores {
175            let core = core_affinity::CoreId { id: *core_id };
176            threads_共享.push(TaskExecutor::new(core, -1));
177        }
178        
179        // 创建快速线程
180        if cpu_fraction_fast == 0 {
181            cpu_fraction_fast = cpu_num;
182        }
183        let mut cpu_fraction_fast_num = cpu_num / cpu_fraction_fast;
184        if cpu_fraction_fast_num == 0 {
185            cpu_fraction_fast_num = 1;
186            warn!(
187                "cpu_fraction_fast_num({cpu_fraction_fast_num}) = cpu_num({cpu_num}) / cpu_fraction_fast({cpu_fraction_fast}), set cpu_fraction_fast_num to 1"
188            );
189        }
190        debug!("theads_fraction_fast cpu count: {}", cpu_fraction_fast_num);
191        let theads_高性能模式 = crate::vec::SyncVec::with_capacity(cpu_fraction_fast_num);
192
193        // 获取快速线程使用的核心
194        let fast_cores = super::use_last_core2("fast_thread", cpu_fraction_fast_num);
195        
196        // 创建快速线程
197        for core_id in &fast_cores {
198            let core = core_affinity::CoreId { id: *core_id };
199            theads_高性能模式.push(TaskExecutor::new(core, 49));
200        }
201        
202        debug!("fast_thread_pool_bind_cores: share cores: {:?}, fast cores: {:?}", share_cores, fast_cores);
203
204        let r = ThreadPool {
205            threads_share: threads_共享,
206            threads_fast: theads_高性能模式,
207            // 高性能模式_任务数最少的线程: 0.into(),
208            threads_fast_idx: StockPool::new_default(),
209            // 高性能模式_记录_任务数: StockPool::new(),
210            stock_lock: StockPool::new_default(),
211            switch_thread_index: (-1).into(),
212            // core_num,
213            // current_core: -1.into(),
214        };
215
216        // let r1 = r.clone();
217        // std::thread::spawn(move || loop {
218        //     r1.loop_任务数最少的线程();
219        //     std::thread::sleep(std::time::Duration::from_millis(10));
220        // });
221        r
222    }
223
224    /// 获取拥有最少任务的线程索引
225    ///
226    /// 本函数遍历快速线程池中的线程,寻找任务计数最少的线程。
227    /// 如果找到一个任务计数为0的线程,它将立即返回该线程的索引,
228    /// 因为这表示该线程目前没有任务。否则,函数将返回任务计数最少的线程索引。
229    /// 这个信息用于调度新任务到拥有最少任务的线程,以平衡线程间的工作负载。
230    /*
231    pub fn count_task_min(&self) -> usize {
232        // 初始化最小任务计数为第一个线程的任务计数,最小索引为0
233        let mut min_count = self.theads_fast[0].count.load();
234        let mut min_index = 0;
235
236        // 遍历快速线程池中的线程
237        for (i, thread) in self.theads_fast.iter().enumerate() {
238            let count = thread.count.load();
239
240            // 如果找到一个任务计数为0的线程,立即返回其索引
241            if count == 0 {
242                min_index = i;
243                break;
244            }
245
246            // 如果当前线程的任务计数少于最小任务计数,更新最小任务计数和索引
247            if count < min_count {
248                min_count = count;
249                min_index = i;
250            }
251        }
252
253        // 返回任务计数最少的线程索引
254        min_index
255    }
256    */
257
258    /// 获取下一个要切换到的线程的索引,以实现线程之间的任务平衡。
259    ///
260    /// 这个方法通过循环的方式选择下一个线程索引,以确保任务能够均匀地分配给每个线程。
261    /// 它使用了一个互斥锁来保证在多线程环境下对索引的访问是安全的。
262    ///
263    /// # 返回值
264    /// 根据当前线程池的状态,计算并返回下一个应该执行任务的线程索引。
265    /// 这个方法旨在平衡线程间的任务分配,避免某个线程过载而其他线程闲置的情况。
266    ///
267    /// 参数 `i7` 作为一个辅助的计算参数,用于在无法立即获得锁时决定返回哪个线程索引。
268    ///
269    /// 返回值是一个枚举 `IdxStatus`,它可以指示线程应该记住当前计算出的索引(`Remember`),
270    /// 或者由于锁的竞争失败而丢弃当前的计算并使用另一个索引(`Discard`)。
271    /// 返回当前选择的线程索引。
272    pub fn count_task_min(&self, i7: i32) -> IdxStatus {
273        // 获取线程池中线程的数量,用于后续计算下一个任务线程的索引。
274        // 获取线程池中线程的数量
275        let len = self.threads_fast.len();
276
277        // 尝试获取用于控制任务分配的索引互斥锁,如果无法立即获得锁,则根据 `i7` 返回一个备选索引。
278        // 获取用于控制线程切换的索引的互斥锁
279        let mut min_index: spin::MutexGuard<i32> = match self.switch_thread_index.try_lock() {
280            Some(mutex) => mutex,
281            None => return IdxStatus::Discard(i7 as usize % len),
282        };
283
284        // 如果当前索引为0,将其设置为最大值,否则递减索引,以实现循环分配策略。
285        // 这样做是为了实现循环访问,避免索引越界
286        if *min_index == 0 || *min_index == -1 {
287            *min_index = len as i32 - 1;
288        } else {
289            *min_index -= 1;
290        }
291
292        // 返回之前复制的索引值,指示线程应该记住这个索引以供下次使用。
293        // 返回复制的索引值
294        let r = *min_index as usize;
295        self.threads_fast_idx[i7].store(Some(r));
296        IdxStatus::Remember(r)
297    }
298
299    #[inline(always)]
300    pub fn spawn<F>(&self, i7: i32, f: F)
301    where
302        F: FnOnce(),
303        F: Send + 'static,
304    {
305        let _lock = self.stock_lock[i7].clone();
306        let index = i7 % self.threads_share.len() as i32;
307        self.threads_share[index as usize].spawn(move |_core| {
308            let _lock = _lock.lock();
309            // #[cfg(debug_assertions)]
310            // print!("高性能模式, 共享线程({i7}): {index} ");
311            f();
312            drop(_lock);
313        });
314    }
315
316    #[inline(always)]
317    pub fn spawn_fast<F>(&self, i7: i32, f: F)
318    where
319        F: FnOnce(),
320        F: Send + 'static,
321    {
322        let mut on_fast_idx = -1;
323
324        // 找最少任务数的线程
325        #[cfg(not(feature = "thread_dispatch"))]
326        let thread_idx = self.threads_fast_idx[i7].load().unwrap_or_else(|| {
327            let min = self.count_task_min(i7);
328            let idx = min.get_idx();
329            if let IdxStatus::Remember(idx) = &min {
330                on_fast_idx = *idx as i32;
331            }
332            idx
333        });
334
335        // 如果当前任务堆积小于5个, 则使用当前线程; 否则就去找最少任务数的线程
336        // 有任务调度的线程方法, 如果有任务堆积则通过找任务数最少的线程来提交任务
337        #[cfg(feature = "thread_dispatch")]
338        let thread_idx = match self.threads_fast_idx[i7].load() {
339            Some(i) if self.threads_fast.get_uncheck(i).count.load() < 1000 => i,
340            _ => {
341                let min = self.count_task_min(i7);
342                let idx = min.get_idx();
343                if let IdxStatus::Remember(idx) = &min {
344                    on_fast_idx = *idx as i32;
345                }
346                idx
347            }
348        };
349
350        // 提交任务
351        let lock = self.stock_lock[i7].clone();
352        self.threads_fast.get_uncheck(thread_idx).spawn(move |_core| {
353            let lock_v = lock.lock();
354            // print!(" {i7} theads_fast: {thread_idx} ");
355            f();
356            drop(lock_v);
357            if on_fast_idx != -1 {
358                // debug!("on_fast thread; i7: {i7}, cpu: {_core}");
359            }
360        });
361    }
362
363    #[inline(always)]
364    pub fn spawn_is_fast<F>(&self, i7: i32, is_fast: bool, f: F)
365    where
366        F: FnOnce(),
367        F: Send + 'static,
368    {
369        if is_fast {
370            self.spawn_fast(i7, f);
371        } else {
372            self.spawn(i7, f);
373        }
374    }
375}
376
377pub enum IdxStatus {
378    // 记住索引
379    Remember(usize),
380    // 丢弃索引
381    Discard(usize),
382}
383
384impl IdxStatus {
385    pub fn get_idx(&self) -> usize {
386        match self {
387            IdxStatus::Remember(idx) => *idx,
388            IdxStatus::Discard(idx) => *idx,
389        }
390    }
391}
392
393#[test]
394fn _test_pool() {
395    use crate::fast_thread_pool::*;
396    
397    unsafe { std::env::set_var("RUST_LOG", "debug") };
398    env_logger::init();
399
400    init(true);
401    // let _core = use_last_core2("use_last_core_多个核心", 40); 
402    let _core = use_last_core("use_last_core"); 
403    // let _core = use_last_core2("use_last_core_多个核心", 5); 
404    info!("use_last_core_多个核心: {:?}", _core);
405    // let _core2 = use_last_core();
406    // let _core = use_last_core();
407    // let _lite1 = super::ThreadPoolLite::new(); // TODO: 1
408    // let num: super::ThreadPoolConstNum<5> = const_num::ThreadPoolConstNum::new(); // TODO: 3
409    let pool = ThreadPool::new(2, 4); // TODO: 4
410    let _core = use_last_core2("use_last_core_多个核心2", 5); 
411    let _core2 = use_last_core("use_last_core2"); 
412    let _core3 = use_last_core("use_last_core3"); 
413    let _core4 = use_last_core("use_last_core4"); 
414    let _core5 = use_last_core("use_last_core5");
415    let _core6 = use_last_core("use_last_core6");
416    std::thread::sleep(std::time::Duration::from_millis(200));
417
418    let count = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
419    let elapsed_total = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
420    let elapsed_exp = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
421
422    let count_c = count.clone();
423    let elapsed_total_c = elapsed_total.clone();
424    let elapsed_exp_c = elapsed_exp.clone();
425    std::thread::spawn(move || {
426        loop {
427            std::thread::sleep(std::time::Duration::from_secs(3));
428            let count = count_c.fetch_and(0);
429            let elapsed_total = elapsed_total_c.fetch_and(0);
430            let elapsed_exp = elapsed_exp_c.fetch_and(0);
431            info!(
432                "3秒钟执行任务数: {}, 所有任务耗时(微秒): {}, 平均耗时: {}, 耗时任务数(100微秒): {}, 耗时任务数占比: {:.0}/10000",
433                count,
434                elapsed_total,
435                elapsed_total / count,
436                elapsed_exp,
437                elapsed_exp as f64 / count as f64 * 10000.0,
438            );
439        }
440    });
441
442    loop {
443        for i in 0..100 {
444            // std::thread::sleep(std::time::Duration::from_micros(i % 50));
445            let time_hs = std::time::Instant::now();
446            let count = count.clone();
447            let elapsed_total = elapsed_total.clone();
448            let elapsed_exp = elapsed_exp.clone();
449            // spin::Barrier::new(i % 10).wait();
450            // spin::relax::Loop::(Duration::from_micros(i % 50));
451            let i7 = if i % 3 == 0 { 1000001 } else { 1000002 };
452            pool.spawn_is_fast(i7, true, move || {
453                let micros = time_hs.elapsed().as_micros();
454                count.fetch_add(1);
455                elapsed_total.fetch_add(micros as i64);
456                if micros > 100 {
457                    elapsed_exp.fetch_add(1);
458                }
459            });
460        }
461        std::thread::sleep(std::time::Duration::from_micros(110));
462    }
463    std::thread::sleep(std::time::Duration::from_secs(9999));
464}