fast_able/fast_thread_pool/
pool.rs

1use crate::{fast_thread_pool::use_last_core, stock_pool::StockPool};
2use core_affinity::CoreId;
3use crossbeam::{atomic::AtomicCell, queue};
4use spin::Mutex;
5use std::{fmt::Debug, fs, io::Write, sync::{Arc, LazyLock}, thread};
6
7use super::{const_num, TaskExecutor};
8
9pub const FILE_CORE_AFFINITY: &str = "./.core_affinity";
10
11pub fn init() {
12    _ = fs::remove_file(FILE_CORE_AFFINITY);
13    warn!(
14        "thread_mod init; remove_file core_affinity: {:?}",
15        FILE_CORE_AFFINITY
16    );
17}
18
19#[cfg(feature = "deal_physical_cpu")]
20pub fn get_core_skip() -> usize {
21    let core_ids = num_cpus::get();
22    let core_physical = num_cpus::get_physical();
23    if core_ids / core_physical == 2 {
24        warn!("core_ids: {core_ids}, core_physical: {core_physical}; skip 2");
25        2
26    } else {
27        1
28    }
29}
30
31#[cfg(not(feature = "deal_physical_cpu"))]
32pub fn get_core_skip() -> usize {
33    1
34}
35
36
37/// 设置绑核
38/// 设置实时内核
39/// 优先级(1-99,越高越优先); 输入 -1 时, 不开启
40pub fn set_core_affinity_and_realtime(core_u: usize, _realtime: i32) -> bool {
41    static ALL_CORES: LazyLock<Vec<CoreId>> =
42        LazyLock::new(|| core_affinity::get_core_ids().unwrap_or_else(|| Vec::with_capacity(0)));
43
44    let core = ALL_CORES.iter().find(|&&core| core.id == core_u);
45
46    let core = match core {
47        Some(core) => core,
48        None => match ALL_CORES.last() {
49            Some(core) => core,
50            None => return false,
51        },
52    };
53
54    let mut b = core_affinity::set_for_current(core.clone());
55    debug!("core: {core_u}, bind: {b}");
56
57    b = false;
58
59    #[cfg(target_os = "linux")]
60    if b && _realtime > 0 {
61        use libc::{sched_param, sched_setscheduler, SCHED_RR};
62        let param = sched_param {
63            sched_priority: _realtime, // 优先级(1-99,越高越优先)
64        };
65        unsafe {
66            b = sched_setscheduler(
67                0, // 当前线程
68                SCHED_RR, // SCHED_RR, SCHED_FIFO; 同优先级 + SCHED_FIFO 先到先得,不释放则独占 CPU; 同优先级 + SCHED_RR 按时间片轮转(默认约 100ms)
69                &param as *const sched_param,
70            ) != -1;
71        }
72    }
73
74    // 修改时间片轮转
75    /*
76    deepseek 提示:
77    2. 注意事项
78    (1)内核的最小时间片限制
79    实际生效的时间片长度可能不精确,内核会将其转换为调度器的时间粒度(通常为 1ms)。
80
81    写入 0 时,内核会自动使用最小粒度(如 1ms)
82
83    (2)性能影响
84    时间片过短(如 <1ms)会导致频繁上下文切换,增加系统开销。
85
86    实时性场景:若需严格的微秒级切换,应优先考虑 SCHED_FIFO + 手动让出 CPU,而非依赖时间片。
87    # 临时修改为 10ms
88    sudo echo 10 > /proc/sys/kernel/sched_rr_timeslice_ms
89
90    # 永久修改(添加到 /etc/sysctl.conf)
91    echo "kernel.sched_rr_timeslice_ms = 10" >> /etc/sysctl.conf
92    sudo sysctl -p
93
94     */
95
96    debug!("core: {core_u}, set_realtime: {b}");
97
98    b
99}
100
101
102/// 线程池
103/// 通过线程池来管理线程
104/// 此线程池的线程个数默认为cpu核心数的4分之1; 任务默认提交在默认线程池;
105/// 通过一个api开启独享高性能模式; 独享高性能模式下,线程池的线程个数最多为cpu核心数的5分之1, 比如128核的cpu, 线程池的线程个数最多为25个;
106/// theads_高性能模式: 通过查看当前线程的任务数, 如果任务数10毫秒内任务数是其它线程中最少的, 则将任务分配给该线程;
107/// 使用 core_affinity 获得cpu核心数
108/// 如果已经有一个股票的任务在一个线程中执行, 则将任务分配给该线程; 如果该股票的任务全部执行完毕, 则将任务分配给任务数最少的线程;
109pub struct ThreadPool {
110    pub threads_share: Vec<TaskExecutor>,
111    pub threads_fast: crate::vec::SyncVec<TaskExecutor>,
112
113    pub switch_thread_index: Mutex<i32>,
114
115    // pub 高性能模式_任务数最少的线程: AtomicCell<usize>,
116    /// 记录高性能模式下, 当前股票代码在哪个线程中执行
117    pub threads_fast_idx: StockPool<AtomicCell<Option<usize>>>,
118
119    /// 记录高性能模式下, 当前股票代码正在执行的任务数
120    // 高性能模式_记录_任务数: StockPool<Arc<()>>,
121
122    /// 防止同一支票并行运行
123    pub stock_lock: StockPool<Arc<spin::Mutex<()>>>,
124    // 多少个核心
125    // core_num: Vec<core_affinity::CoreId>,
126    // current_core: AtomicCell<i32>,
127}
128
129// pub struct Pto<T: Debug + Default>(AtomicCell<T>);
130// impl<T: Debug + Default> Deref for Pto<T> {
131//     type Target = AtomicCell<T>;
132//     fn deref(&self) -> &Self::Target {
133//         &self.0
134//     }
135// }
136// impl<T: Debug + Default + Copy> Debug for Pto<T> {
137//     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138//         f.write_str(&format!("{:?}", self.load()))
139//     }
140// }
141// impl<T: Debug + Default> Default for Pto<T> {
142//     fn default() -> Self {
143//         Pto(AtomicCell::new(T::default()))
144//     }
145// }
146
147impl ThreadPool {
148    pub fn new(mut cpu_fraction_fast: usize, cpu_fraction_share: usize) -> ThreadPool {
149        let cpu_num = core_affinity::get_core_ids()
150            .unwrap_or_else(|| vec![])
151            .len();
152
153        let skip = get_core_skip();
154        let cpu_num = cpu_num / skip;
155        let num = cpu_num / cpu_fraction_share;
156
157        let core_num = core_affinity::get_core_ids().unwrap_or_else(|| {
158            warn!("获取cpu核心数失败");
159            vec![]
160        });
161        let mut current_core: i32 = core_num.len() as i32;
162        let max_core = current_core;
163
164        // 读取之前的绑核信息
165        _ = fs::File::create_new(FILE_CORE_AFFINITY);
166        let old_cpu_num = fs::read_to_string(FILE_CORE_AFFINITY)
167            .expect("open core_affinity file read_to_string error");
168        info!("ThreadPool old_cpu_num: {}", old_cpu_num);
169        if !old_cpu_num.is_empty() {
170            let old_cpu_num = old_cpu_num.replace("\n", ",");
171            let old_cpu_num = old_cpu_num.split(',').collect::<Vec<_>>();
172            if let Some(old_cpu_num) = old_cpu_num.last() {
173                if let Ok(old_cpu_num) = old_cpu_num.parse::<i32>() {
174                    current_core = old_cpu_num;
175                }
176            }
177        }
178        let skip = get_core_skip() as i32;
179        let mut bind_cores = vec!["share_thread".to_string()];
180
181        info!("threads_share cpu count: {num}, skip: {skip}");
182        let mut threads_共享 = Vec::with_capacity(num);
183        for _ in 0..num {
184            current_core -= skip;
185            if current_core < 0 {
186                current_core = max_core - 1;
187            }
188            let core = core_num
189                .get(current_core as usize)
190                .map(|x| x.clone())
191                .unwrap_or_else(|| {
192                    warn!("获取cpu核心数失败");
193                    core_affinity::CoreId { id: 0 }
194                });
195            bind_cores.push(core.id.to_string());
196            threads_共享.push(TaskExecutor::new(core, -1));
197        }
198        bind_cores.push("\n".to_string());
199        bind_cores.push("fast_thread".to_string());
200
201        if cpu_fraction_fast == 0 {
202            cpu_fraction_fast = cpu_num;
203        }
204        let mut num = cpu_num / cpu_fraction_fast;
205        if num == 0 {
206            num = 1;
207        }
208        debug!("theads_fraction_fast cpu count: {}", num);
209        let theads_高性能模式 = crate::vec::SyncVec::with_capacity(num);
210
211        for _ in 0..num {
212            current_core -= skip;
213            if current_core < 0 {
214                current_core = max_core;
215            }
216
217            let core = core_num
218                .get(current_core as usize)
219                .map(|x| x.clone())
220                .unwrap_or_else(|| {
221                    warn!("获取cpu核心数失败");
222                    core_affinity::CoreId { id: 0 }
223                });
224            bind_cores.push(core.id.to_string());
225
226            theads_高性能模式.push(TaskExecutor::new(core, 49));
227        }
228
229        debug!("fast_thread_pool_bind_cores: {:?}", bind_cores);
230        // fs::write(FILE_CORE_AFFINITY, bind_cores.join(","))
231        //     .expect("write core_affinity file write_all error");
232
233        use std::fs::OpenOptions;
234        use std::io::Write;
235
236        {
237            let mut file = OpenOptions::new()
238                .append(true)
239                .open(FILE_CORE_AFFINITY)
240                .unwrap();
241            // let _ = writeln!(file, "aaa");
242            if !old_cpu_num.is_empty() {
243                let _ = file.write_all("\n".as_bytes());
244            }
245            let _ = file.write_all(bind_cores.join(",").as_bytes());
246            file.flush().expect("ThreadPoolLite flush error");
247        }
248
249        // std::env::set_var("fast_thread_pool_bind_cores", bind_cores.join(","));
250
251        let r = ThreadPool {
252            threads_share: threads_共享,
253            threads_fast: theads_高性能模式,
254            // 高性能模式_任务数最少的线程: 0.into(),
255            threads_fast_idx: StockPool::new_default(),
256            // 高性能模式_记录_任务数: StockPool::new(),
257            stock_lock: StockPool::new_default(),
258            switch_thread_index: (-1).into(),
259            // core_num,
260            // current_core: -1.into(),
261        };
262
263        // let r1 = r.clone();
264        // std::thread::spawn(move || loop {
265        //     r1.loop_任务数最少的线程();
266        //     std::thread::sleep(std::time::Duration::from_millis(10));
267        // });
268        r
269    }
270
271    /// 获取拥有最少任务的线程索引
272    ///
273    /// 本函数遍历快速线程池中的线程,寻找任务计数最少的线程。
274    /// 如果找到一个任务计数为0的线程,它将立即返回该线程的索引,
275    /// 因为这表示该线程目前没有任务。否则,函数将返回任务计数最少的线程索引。
276    /// 这个信息用于调度新任务到拥有最少任务的线程,以平衡线程间的工作负载。
277    /*
278    pub fn count_task_min(&self) -> usize {
279        // 初始化最小任务计数为第一个线程的任务计数,最小索引为0
280        let mut min_count = self.theads_fast[0].count.load();
281        let mut min_index = 0;
282
283        // 遍历快速线程池中的线程
284        for (i, thread) in self.theads_fast.iter().enumerate() {
285            let count = thread.count.load();
286
287            // 如果找到一个任务计数为0的线程,立即返回其索引
288            if count == 0 {
289                min_index = i;
290                break;
291            }
292
293            // 如果当前线程的任务计数少于最小任务计数,更新最小任务计数和索引
294            if count < min_count {
295                min_count = count;
296                min_index = i;
297            }
298        }
299
300        // 返回任务计数最少的线程索引
301        min_index
302    }
303    */
304
305    /// 获取下一个要切换到的线程的索引,以实现线程之间的任务平衡。
306    ///
307    /// 这个方法通过循环的方式选择下一个线程索引,以确保任务能够均匀地分配给每个线程。
308    /// 它使用了一个互斥锁来保证在多线程环境下对索引的访问是安全的。
309    ///
310    /// # 返回值
311    /// 根据当前线程池的状态,计算并返回下一个应该执行任务的线程索引。
312    /// 这个方法旨在平衡线程间的任务分配,避免某个线程过载而其他线程闲置的情况。
313    ///
314    /// 参数 `i7` 作为一个辅助的计算参数,用于在无法立即获得锁时决定返回哪个线程索引。
315    ///
316    /// 返回值是一个枚举 `IdxStatus`,它可以指示线程应该记住当前计算出的索引(`Remember`),
317    /// 或者由于锁的竞争失败而丢弃当前的计算并使用另一个索引(`Discard`)。
318    /// 返回当前选择的线程索引。
319    pub fn count_task_min(&self, i7: i32) -> IdxStatus {
320        // 获取线程池中线程的数量,用于后续计算下一个任务线程的索引。
321        // 获取线程池中线程的数量
322        let len = self.threads_fast.len();
323
324        // 尝试获取用于控制任务分配的索引互斥锁,如果无法立即获得锁,则根据 `i7` 返回一个备选索引。
325        // 获取用于控制线程切换的索引的互斥锁
326        let mut min_index: spin::MutexGuard<i32> = match self.switch_thread_index.try_lock() {
327            Some(mutex) => mutex,
328            None => return IdxStatus::Discard(i7 as usize % len),
329        };
330
331        // 如果当前索引为0,将其设置为最大值,否则递减索引,以实现循环分配策略。
332        // 这样做是为了实现循环访问,避免索引越界
333        if *min_index == 0 || *min_index == -1 {
334            *min_index = len as i32 - 1;
335        } else {
336            *min_index -= 1;
337        }
338
339        // 返回之前复制的索引值,指示线程应该记住这个索引以供下次使用。
340        // 返回复制的索引值
341        let r = *min_index as usize;
342        self.threads_fast_idx[i7].store(Some(r));
343        IdxStatus::Remember(r)
344    }
345
346    #[inline(always)]
347    pub fn spawn<F>(&self, i7: i32, f: F)
348    where
349        F: FnOnce(),
350        F: Send + 'static,
351    {
352        let _lock = self.stock_lock[i7].clone();
353        let index = i7 % self.threads_share.len() as i32;
354        self.threads_share[index as usize].spawn(move |_core| {
355            let _lock = _lock.lock();
356            // #[cfg(debug_assertions)]
357            // print!("高性能模式, 共享线程({i7}): {index} ");
358            f();
359            drop(_lock);
360        });
361    }
362
363    #[inline(always)]
364    pub fn spawn_fast<F>(&self, i7: i32, f: F)
365    where
366        F: FnOnce(),
367        F: Send + 'static,
368    {
369        let mut on_fast_idx = -1;
370
371        // 找最少任务数的线程
372        #[cfg(not(feature = "thread_dispatch"))]
373        let thread_idx = self.threads_fast_idx[i7].load().unwrap_or_else(|| {
374            let min = self.count_task_min(i7);
375            let idx = min.get_idx();
376            if let IdxStatus::Remember(idx) = &min {
377                on_fast_idx = *idx as i32;
378            }
379            idx
380        });
381
382        // 如果当前任务堆积小于5个, 则使用当前线程; 否则就去找最少任务数的线程
383        // 有任务调度的线程方法, 如果有任务堆积则通过找任务数最少的线程来提交任务
384        #[cfg(feature = "thread_dispatch")]
385        let thread_idx = match self.threads_fast_idx[i7].load() {
386            Some(i) if self.threads_fast[i].count.load() < 1000 => i,
387            _ => {
388                let min = self.count_task_min(i7);
389                let idx = min.get_idx();
390                if let IdxStatus::Remember(idx) = &min {
391                    on_fast_idx = *idx as i32;
392                }
393                idx
394            }
395        };
396
397        // 提交任务
398        let lock = self.stock_lock[i7].clone();
399        self.threads_fast[thread_idx].spawn(move |core| {
400            let lock_v = lock.lock();
401            // print!(" {i7} theads_fast: {thread_idx} ");
402            f();
403            drop(lock_v);
404            if on_fast_idx != -1 {
405                debug!("on_fast thread; i7: {i7}, cpu: {core}");
406            }
407        });
408    }
409
410    #[inline(always)]
411    pub fn spawn_is_fast<F>(&self, i7: i32, is_fast: bool, f: F)
412    where
413        F: FnOnce(),
414        F: Send + 'static,
415    {
416        if is_fast {
417            self.spawn_fast(i7, f);
418        } else {
419            self.spawn(i7, f);
420        }
421    }
422}
423
424pub enum IdxStatus {
425    // 记住索引
426    Remember(usize),
427    // 丢弃索引
428    Discard(usize),
429}
430
431impl IdxStatus {
432    pub fn get_idx(&self) -> usize {
433        match self {
434            IdxStatus::Remember(idx) => *idx,
435            IdxStatus::Discard(idx) => *idx,
436        }
437    }
438}
439
440#[test]
441fn _test_pool() {
442    std::env::set_var("RUST_LOG", "debug");
443    env_logger::init();
444
445    init();
446    let _core = use_last_core("use_last_core"); // TODO: 0
447    // let _core2 = use_last_core();
448    // let _core = use_last_core();
449    let _lite1 = super::ThreadPoolLite::new(); // TODO: 1
450    let num: super::ThreadPoolConstNum<5> = const_num::ThreadPoolConstNum::new(); // TODO: 3
451    let pool = ThreadPool::new(2, 4); // TODO: 4
452    let _core2 = use_last_core("use_last_core2"); // TODO: 0
453    std::thread::sleep(std::time::Duration::from_millis(200));
454
455    let count = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
456    let elapsed_total = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
457    let elapsed_exp = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
458
459    let count_c = count.clone();
460    let elapsed_total_c = elapsed_total.clone();
461    let elapsed_exp_c = elapsed_exp.clone();
462    std::thread::spawn(move || loop {
463        std::thread::sleep(std::time::Duration::from_secs(3));
464        let count = count_c.fetch_and(0);
465        let elapsed_total = elapsed_total_c.fetch_and(0);
466        let elapsed_exp = elapsed_exp_c.fetch_and(0);
467        info!(
468            "3秒钟执行任务数: {}, 所有任务耗时(微秒): {}, 平均耗时: {}, 耗时任务数(100微秒): {}, 耗时任务数占比: {:.0}/10000",
469            count,
470            elapsed_total,
471            elapsed_total / count,
472            elapsed_exp,
473            elapsed_exp as f64 / count as f64 * 10000.0,
474        );
475    });
476
477    loop {
478        for i in 0..100 {
479            // std::thread::sleep(std::time::Duration::from_micros(i % 50));
480            let time_hs = std::time::Instant::now();
481            let count = count.clone();
482            let elapsed_total = elapsed_total.clone();
483            let elapsed_exp = elapsed_exp.clone();
484            // spin::Barrier::new(i % 10).wait();
485            // spin::relax::Loop::(Duration::from_micros(i % 50));
486            let i7 = if i % 3 == 0 { 1000001 } else { 1000002 };
487            pool.spawn_is_fast(i7, true, move || {
488                let micros = time_hs.elapsed().as_micros();
489                count.fetch_add(1);
490                elapsed_total.fetch_add(micros as i64);
491                if micros > 100 {
492                    elapsed_exp.fetch_add(1);
493                }
494            });
495        }
496        std::thread::sleep(std::time::Duration::from_micros(110));
497    }
498    std::thread::sleep(std::time::Duration::from_secs(9999));
499}