fast_able/fast_thread_pool/
const_num.rs

1use std::{fs, io::Write, sync::Arc};
2
3use core_affinity::CoreId;
4use crossbeam::atomic::AtomicCell;
5
6use super::{get_core_skip, TaskExecutor, FILE_CORE_AFFINITY};
7
8/// 简易线程池
9/// 用于快速提交任务
10/// 只有一个线程, 只绑定一个核心
11pub struct ThreadPoolConstNum<const N: usize> {
12    thread: [TaskExecutor; N],
13    cur_run_core: AtomicCell<usize>,
14}
15
16impl<const N: usize> ThreadPoolConstNum<N> {
17    pub fn new() -> Self {
18        let cores = core_affinity::get_core_ids().unwrap_or_else(|| {
19            warn!("获取cpu核心数失败");
20            vec![]
21        });
22
23        // 获得之前已经绑定的核心
24        _ = fs::File::create_new(FILE_CORE_AFFINITY);
25        let old_cpu_num = fs::read_to_string(FILE_CORE_AFFINITY)
26            .expect("open core_affinity file read_to_string error");
27        let old_cpu_num = old_cpu_num
28            .replace("\n", ",")
29            .split(',')
30            .filter_map(|x| x.parse::<usize>().ok())
31            .collect::<Vec<_>>();
32
33        let old_last = old_cpu_num.last().map(|x| *x).unwrap_or_else(|| 0);
34        let mut use_core = if !old_cpu_num.is_empty() {
35            old_last
36        } else {
37            cores.last().map(|x| x.id).unwrap_or_else(|| 0)
38        };
39
40        let mut use_core_cur = vec![];
41        let mut threads = vec![];
42
43        let skip = get_core_skip();
44
45        for _ in 0..N {
46            if (use_core as i32) - (skip as i32) < 0 {
47                use_core = cores.len() + 1;
48            }
49            if use_core == 0 {
50                use_core = 2;
51            }
52            use_core -= skip;
53            use_core_cur.push(use_core);
54            threads.push(TaskExecutor::new(CoreId { id: use_core }, 49));
55        }
56
57        let r = ThreadPoolConstNum {
58            thread: threads
59                .try_into()
60                .expect("ThreadPoolLiteNum threads.try_into()"),
61            cur_run_core: 0.into(),
62        };
63
64        if !use_core_cur.is_empty() {
65            println!("old_cpu_num: {old_cpu_num:?}; use_core_cur {use_core_cur:?}");
66            let mut file = fs::OpenOptions::new()
67                .append(true)
68                .open(FILE_CORE_AFFINITY)
69                .unwrap();
70            // let _ = writeln!(file, "aaa");
71            if !old_cpu_num.is_empty() {
72                let _ = file.write_all("\n".as_bytes());
73            }
74            let _ = file.write_all(
75                format!(
76                    "ThreadPoolConstNum,{}",
77                    use_core_cur
78                        .iter()
79                        .map(|x| x.to_string())
80                        .collect::<Vec<_>>()
81                        .join(",")
82                )
83                .as_bytes(),
84            );
85            file.flush().expect("ThreadPoolLite flush error");
86        }
87
88        r
89    }
90
91    pub fn spawn<F>(&self, f: F)
92    where
93        F: FnOnce(),
94        F: Send + 'static,
95    {
96        let cur_index = self.cur_run_core.fetch_add(1);
97        self.thread[cur_index % N].spawn(|_| f());
98        if cur_index >= usize::MAX - N {
99            self.cur_run_core.store(0);
100        }
101    }
102}
103
104pub fn _test_ThreadPoolConstNum(test_count: u128) {
105    println!("------------------------start------------------------");
106    // println!("1%3: {}", 1 % 3);
107    // println!("2%3: {}", 2 % 3);
108    // println!("3%3: {}", 3 % 3);
109    // println!("4%3: {}", 4 % 3);
110    // println!("5%3: {}", 5 % 3);
111    // println!("6%3: {}", 6 % 3);
112    // println!("7%3: {}", 7 % 3);
113    // println!("8%3: {}", 8 % 3);
114
115    let pool = ThreadPoolConstNum::<5>::new();
116    std::thread::sleep(std::time::Duration::from_millis(200));
117    let com_time = Arc::new(AtomicCell::new(0_u128));
118    for _ in 0..test_count {
119        let com_time = com_time.clone();
120        let now = std::time::Instant::now();
121        pool.spawn(move || {
122            // println!("run _test_cur_index i: {}", i);
123            com_time.fetch_add(now.elapsed().as_nanos());
124        });
125    }
126    println!("------------------------ThreadPoolConstNum 任务提交完成------------------------");
127    std::thread::sleep(std::time::Duration::from_secs(1));
128    println!(
129        "ThreadPoolConstNum 线程开启平均耗时: {} ns",
130        com_time.load() as f64 / test_count as f64
131    );
132}