fast_able/fast_thread_pool/
pool.rs1use crate::{fast_thread_pool::use_last_core, stock_pool::StockPool};
2use core_affinity::CoreId;
3use crossbeam::{atomic::AtomicCell, queue};
4use spin::Mutex;
5use std::{fmt::Debug, fs, io::Write, sync::{Arc, LazyLock}, thread};
6
7use super::{const_num, TaskExecutor};
8
9pub const FILE_CORE_AFFINITY: &str = "./.core_affinity";
10
11pub fn init() {
12 _ = fs::remove_file(FILE_CORE_AFFINITY);
13 warn!(
14 "thread_mod init; remove_file core_affinity: {:?}",
15 FILE_CORE_AFFINITY
16 );
17}
18
19#[cfg(feature = "deal_physical_cpu")]
20pub fn get_core_skip() -> usize {
21 let core_ids = num_cpus::get();
22 let core_physical = num_cpus::get_physical();
23 if core_ids / core_physical == 2 {
24 warn!("core_ids: {core_ids}, core_physical: {core_physical}; skip 2");
25 2
26 } else {
27 1
28 }
29}
30
31#[cfg(not(feature = "deal_physical_cpu"))]
32pub fn get_core_skip() -> usize {
33 1
34}
35
36
37pub fn set_core_affinity_and_realtime(core_u: usize, _realtime: i32) -> bool {
41 static ALL_CORES: LazyLock<Vec<CoreId>> =
42 LazyLock::new(|| core_affinity::get_core_ids().unwrap_or_else(|| Vec::with_capacity(0)));
43
44 let core = ALL_CORES.iter().find(|&&core| core.id == core_u);
45
46 let core = match core {
47 Some(core) => core,
48 None => match ALL_CORES.last() {
49 Some(core) => core,
50 None => return false,
51 },
52 };
53
54 let mut b = core_affinity::set_for_current(core.clone());
55 debug!("core: {core_u}, bind: {b}");
56
57 b = false;
58
59 #[cfg(target_os = "linux")]
60 if b && _realtime > 0 {
61 use libc::{sched_param, sched_setscheduler, SCHED_RR};
62 let param = sched_param {
63 sched_priority: _realtime, };
65 unsafe {
66 b = sched_setscheduler(
67 0, SCHED_RR, ¶m as *const sched_param,
70 ) != -1;
71 }
72 }
73
74 debug!("core: {core_u}, set_realtime: {b}");
97
98 b
99}
100
101
102pub struct ThreadPool {
110 pub threads_share: Vec<TaskExecutor>,
111 pub threads_fast: crate::vec::SyncVec<TaskExecutor>,
112
113 pub switch_thread_index: Mutex<i32>,
114
115 pub threads_fast_idx: StockPool<AtomicCell<Option<usize>>>,
118
119 pub stock_lock: StockPool<Arc<spin::Mutex<()>>>,
124 }
128
129impl ThreadPool {
148 pub fn new(mut cpu_fraction_fast: usize, cpu_fraction_share: usize) -> ThreadPool {
149 let cpu_num = core_affinity::get_core_ids()
150 .unwrap_or_else(|| vec![])
151 .len();
152
153 let skip = get_core_skip();
154 let cpu_num = cpu_num / skip;
155 let num = cpu_num / cpu_fraction_share;
156
157 let core_num = core_affinity::get_core_ids().unwrap_or_else(|| {
158 warn!("获取cpu核心数失败");
159 vec![]
160 });
161 let mut current_core: i32 = core_num.len() as i32;
162 let max_core = current_core;
163
164 _ = fs::File::create_new(FILE_CORE_AFFINITY);
166 let old_cpu_num = fs::read_to_string(FILE_CORE_AFFINITY)
167 .expect("open core_affinity file read_to_string error");
168 info!("ThreadPool old_cpu_num: {}", old_cpu_num);
169 if !old_cpu_num.is_empty() {
170 let old_cpu_num = old_cpu_num.replace("\n", ",");
171 let old_cpu_num = old_cpu_num.split(',').collect::<Vec<_>>();
172 if let Some(old_cpu_num) = old_cpu_num.last() {
173 if let Ok(old_cpu_num) = old_cpu_num.parse::<i32>() {
174 current_core = old_cpu_num;
175 }
176 }
177 }
178 let skip = get_core_skip() as i32;
179 let mut bind_cores = vec!["share_thread".to_string()];
180
181 info!("threads_share cpu count: {num}, skip: {skip}");
182 let mut threads_共享 = Vec::with_capacity(num);
183 for _ in 0..num {
184 current_core -= skip;
185 if current_core < 0 {
186 current_core = max_core - 1;
187 }
188 let core = core_num
189 .get(current_core as usize)
190 .map(|x| x.clone())
191 .unwrap_or_else(|| {
192 warn!("获取cpu核心数失败");
193 core_affinity::CoreId { id: 0 }
194 });
195 bind_cores.push(core.id.to_string());
196 threads_共享.push(TaskExecutor::new(core, -1));
197 }
198 bind_cores.push("\n".to_string());
199 bind_cores.push("fast_thread".to_string());
200
201 if cpu_fraction_fast == 0 {
202 cpu_fraction_fast = cpu_num;
203 }
204 let mut num = cpu_num / cpu_fraction_fast;
205 if num == 0 {
206 num = 1;
207 }
208 debug!("theads_fraction_fast cpu count: {}", num);
209 let theads_高性能模式 = crate::vec::SyncVec::with_capacity(num);
210
211 for _ in 0..num {
212 current_core -= skip;
213 if current_core < 0 {
214 current_core = max_core;
215 }
216
217 let core = core_num
218 .get(current_core as usize)
219 .map(|x| x.clone())
220 .unwrap_or_else(|| {
221 warn!("获取cpu核心数失败");
222 core_affinity::CoreId { id: 0 }
223 });
224 bind_cores.push(core.id.to_string());
225
226 theads_高性能模式.push(TaskExecutor::new(core, 49));
227 }
228
229 debug!("fast_thread_pool_bind_cores: {:?}", bind_cores);
230 use std::fs::OpenOptions;
234 use std::io::Write;
235
236 {
237 let mut file = OpenOptions::new()
238 .append(true)
239 .open(FILE_CORE_AFFINITY)
240 .unwrap();
241 if !old_cpu_num.is_empty() {
243 let _ = file.write_all("\n".as_bytes());
244 }
245 let _ = file.write_all(bind_cores.join(",").as_bytes());
246 file.flush().expect("ThreadPoolLite flush error");
247 }
248
249 let r = ThreadPool {
252 threads_share: threads_共享,
253 threads_fast: theads_高性能模式,
254 threads_fast_idx: StockPool::new_default(),
256 stock_lock: StockPool::new_default(),
258 switch_thread_index: (-1).into(),
259 };
262
263 r
269 }
270
271 pub fn count_task_min(&self, i7: i32) -> IdxStatus {
320 let len = self.threads_fast.len();
323
324 let mut min_index: spin::MutexGuard<i32> = match self.switch_thread_index.try_lock() {
327 Some(mutex) => mutex,
328 None => return IdxStatus::Discard(i7 as usize % len),
329 };
330
331 if *min_index == 0 || *min_index == -1 {
334 *min_index = len as i32 - 1;
335 } else {
336 *min_index -= 1;
337 }
338
339 let r = *min_index as usize;
342 self.threads_fast_idx[i7].store(Some(r));
343 IdxStatus::Remember(r)
344 }
345
346 #[inline(always)]
347 pub fn spawn<F>(&self, i7: i32, f: F)
348 where
349 F: FnOnce(),
350 F: Send + 'static,
351 {
352 let _lock = self.stock_lock[i7].clone();
353 let index = i7 % self.threads_share.len() as i32;
354 self.threads_share[index as usize].spawn(move |_core| {
355 let _lock = _lock.lock();
356 f();
359 drop(_lock);
360 });
361 }
362
363 #[inline(always)]
364 pub fn spawn_fast<F>(&self, i7: i32, f: F)
365 where
366 F: FnOnce(),
367 F: Send + 'static,
368 {
369 let mut on_fast_idx = -1;
370
371 #[cfg(not(feature = "thread_dispatch"))]
373 let thread_idx = self.threads_fast_idx[i7].load().unwrap_or_else(|| {
374 let min = self.count_task_min(i7);
375 let idx = min.get_idx();
376 if let IdxStatus::Remember(idx) = &min {
377 on_fast_idx = *idx as i32;
378 }
379 idx
380 });
381
382 #[cfg(feature = "thread_dispatch")]
385 let thread_idx = match self.threads_fast_idx[i7].load() {
386 Some(i) if self.threads_fast[i].count.load() < 1000 => i,
387 _ => {
388 let min = self.count_task_min(i7);
389 let idx = min.get_idx();
390 if let IdxStatus::Remember(idx) = &min {
391 on_fast_idx = *idx as i32;
392 }
393 idx
394 }
395 };
396
397 let lock = self.stock_lock[i7].clone();
399 self.threads_fast[thread_idx].spawn(move |core| {
400 let lock_v = lock.lock();
401 f();
403 drop(lock_v);
404 if on_fast_idx != -1 {
405 debug!("on_fast thread; i7: {i7}, cpu: {core}");
406 }
407 });
408 }
409
410 #[inline(always)]
411 pub fn spawn_is_fast<F>(&self, i7: i32, is_fast: bool, f: F)
412 where
413 F: FnOnce(),
414 F: Send + 'static,
415 {
416 if is_fast {
417 self.spawn_fast(i7, f);
418 } else {
419 self.spawn(i7, f);
420 }
421 }
422}
423
424pub enum IdxStatus {
425 Remember(usize),
427 Discard(usize),
429}
430
431impl IdxStatus {
432 pub fn get_idx(&self) -> usize {
433 match self {
434 IdxStatus::Remember(idx) => *idx,
435 IdxStatus::Discard(idx) => *idx,
436 }
437 }
438}
439
440#[test]
441fn _test_pool() {
442 std::env::set_var("RUST_LOG", "debug");
443 env_logger::init();
444
445 init();
446 let _core = use_last_core("use_last_core"); let _lite1 = super::ThreadPoolLite::new(); let num: super::ThreadPoolConstNum<5> = const_num::ThreadPoolConstNum::new(); let pool = ThreadPool::new(2, 4); let _core2 = use_last_core("use_last_core2"); std::thread::sleep(std::time::Duration::from_millis(200));
454
455 let count = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
456 let elapsed_total = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
457 let elapsed_exp = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
458
459 let count_c = count.clone();
460 let elapsed_total_c = elapsed_total.clone();
461 let elapsed_exp_c = elapsed_exp.clone();
462 std::thread::spawn(move || loop {
463 std::thread::sleep(std::time::Duration::from_secs(3));
464 let count = count_c.fetch_and(0);
465 let elapsed_total = elapsed_total_c.fetch_and(0);
466 let elapsed_exp = elapsed_exp_c.fetch_and(0);
467 info!(
468 "3秒钟执行任务数: {}, 所有任务耗时(微秒): {}, 平均耗时: {}, 耗时任务数(100微秒): {}, 耗时任务数占比: {:.0}/10000",
469 count,
470 elapsed_total,
471 elapsed_total / count,
472 elapsed_exp,
473 elapsed_exp as f64 / count as f64 * 10000.0,
474 );
475 });
476
477 loop {
478 for i in 0..100 {
479 let time_hs = std::time::Instant::now();
481 let count = count.clone();
482 let elapsed_total = elapsed_total.clone();
483 let elapsed_exp = elapsed_exp.clone();
484 let i7 = if i % 3 == 0 { 1000001 } else { 1000002 };
487 pool.spawn_is_fast(i7, true, move || {
488 let micros = time_hs.elapsed().as_micros();
489 count.fetch_add(1);
490 elapsed_total.fetch_add(micros as i64);
491 if micros > 100 {
492 elapsed_exp.fetch_add(1);
493 }
494 });
495 }
496 std::thread::sleep(std::time::Duration::from_micros(110));
497 }
498 std::thread::sleep(std::time::Duration::from_secs(9999));
499}