fast_able/fast_thread_pool/
const_num.rs1use std::{fs, io::Write, sync::Arc};
2
3use core_affinity::CoreId;
4use crossbeam::atomic::AtomicCell;
5
6use super::{get_core_skip, TaskExecutor, FILE_CORE_AFFINITY};
7
8pub struct ThreadPoolConstNum<const N: usize> {
12 thread: [TaskExecutor; N],
13 cur_run_core: AtomicCell<usize>,
14}
15
16impl<const N: usize> ThreadPoolConstNum<N> {
17 pub fn new() -> Self {
18 let cores = core_affinity::get_core_ids().unwrap_or_else(|| {
19 warn!("获取cpu核心数失败");
20 vec![]
21 });
22
23 _ = fs::File::create_new(FILE_CORE_AFFINITY);
25 let old_cpu_num = fs::read_to_string(FILE_CORE_AFFINITY)
26 .expect("open core_affinity file read_to_string error");
27 let old_cpu_num = old_cpu_num
28 .replace("\n", ",")
29 .split(',')
30 .filter_map(|x| x.parse::<usize>().ok())
31 .collect::<Vec<_>>();
32
33 let old_last = old_cpu_num.last().map(|x| *x).unwrap_or_else(|| 0);
34 let mut use_core = if !old_cpu_num.is_empty() {
35 old_last
36 } else {
37 cores.last().map(|x| x.id).unwrap_or_else(|| 0)
38 };
39
40 let mut use_core_cur = vec![];
41 let mut threads = vec![];
42
43 let skip = get_core_skip();
44
45 for _ in 0..N {
46 if (use_core as i32) - (skip as i32) < 0 {
47 use_core = cores.len() + 1;
48 }
49 if use_core == 0 {
50 use_core = 2;
51 }
52 use_core -= skip;
53 use_core_cur.push(use_core);
54 threads.push(TaskExecutor::new(CoreId { id: use_core }, 49));
55 }
56
57 let r = ThreadPoolConstNum {
58 thread: threads
59 .try_into()
60 .expect("ThreadPoolLiteNum threads.try_into()"),
61 cur_run_core: 0.into(),
62 };
63
64 if !use_core_cur.is_empty() {
65 println!("old_cpu_num: {old_cpu_num:?}; use_core_cur {use_core_cur:?}");
66 let mut file = fs::OpenOptions::new()
67 .append(true)
68 .open(FILE_CORE_AFFINITY)
69 .unwrap();
70 if !old_cpu_num.is_empty() {
72 let _ = file.write_all("\n".as_bytes());
73 }
74 let _ = file.write_all(
75 format!(
76 "ThreadPoolConstNum,{}",
77 use_core_cur
78 .iter()
79 .map(|x| x.to_string())
80 .collect::<Vec<_>>()
81 .join(",")
82 )
83 .as_bytes(),
84 );
85 file.flush().expect("ThreadPoolLite flush error");
86 }
87
88 r
89 }
90
91 pub fn spawn<F>(&self, f: F)
92 where
93 F: FnOnce(),
94 F: Send + 'static,
95 {
96 let cur_index = self.cur_run_core.fetch_add(1);
97 self.thread[cur_index % N].spawn(|_| f());
98 if cur_index >= usize::MAX - N {
99 self.cur_run_core.store(0);
100 }
101 }
102}
103
104pub fn _test_ThreadPoolConstNum(test_count: u128) {
105 println!("------------------------start------------------------");
106 let pool = ThreadPoolConstNum::<5>::new();
116 std::thread::sleep(std::time::Duration::from_millis(200));
117 let com_time = Arc::new(AtomicCell::new(0_u128));
118 for _ in 0..test_count {
119 let com_time = com_time.clone();
120 let now = std::time::Instant::now();
121 pool.spawn(move || {
122 com_time.fetch_add(now.elapsed().as_nanos());
124 });
125 }
126 println!("------------------------ThreadPoolConstNum 任务提交完成------------------------");
127 std::thread::sleep(std::time::Duration::from_secs(1));
128 println!(
129 "ThreadPoolConstNum 线程开启平均耗时: {} ns",
130 com_time.load() as f64 / test_count as f64
131 );
132}