fast_able/fast_thread_pool/
pool.rs1use crate::fast_thread_pool::{get_core_skip, CORES};
2use crate::stock_pool::StockPool;
3use crossbeam::atomic::AtomicCell;
4use spin::Mutex;
5use std::{
6 fs,
7 sync::{Arc, LazyLock},
8};
9
10use super::TaskExecutor;
11
12pub const FILE_CORE_AFFINITY: &str = "./.core_affinity";
13
14pub fn init(is_realtime_system: bool) {
17 _ = fs::remove_file(FILE_CORE_AFFINITY);
18 warn!(
19 "thread_mod init; remove_file core_affinity: {}, is_realtime_system: {is_realtime_system}",
20 FILE_CORE_AFFINITY
21 );
22
23 if is_realtime_system {
24 fs::write(FILE_CORE_AFFINITY, "realtime_system,\n").unwrap();
25 }
26}
27
28
29pub fn set_core_affinity_and_realtime(core_u: usize, _realtime: i32) -> bool {
36 let ALL_CORES = CORES.clone();
37
38 let core = ALL_CORES.iter().find(|&&core| core.id == core_u);
39
40 let core = match core {
41 Some(core) => core,
42 None => match ALL_CORES.last() {
43 Some(core) => core,
44 None => return false,
45 },
46 };
47
48 let b = core_affinity::set_for_current(core.clone());
49
50 #[allow(unused_mut)]
51 let mut b2 = true;
52
53 #[allow(unused_mut)]
54 let mut realtime_msg = "";
55
56 static _IS_REALTIME_SYSTEM: LazyLock<bool> = LazyLock::new(|| {
57 std::fs::read_to_string(FILE_CORE_AFFINITY)
58 .unwrap_or_else(|_| "".to_string())
59 .contains("realtime_system")
60 });
61
62 #[cfg(target_os = "linux")]
63 if b && _realtime > 0 && *_IS_REALTIME_SYSTEM {
64 use libc::{EINVAL, EPERM, ESRCH, SCHED_RR, sched_param, sched_setscheduler};
65 let param = sched_param {
66 sched_priority: _realtime,
67 };
68 unsafe {
69 let ret = sched_setscheduler(0, SCHED_RR, ¶m);
71 if ret != 0 {
72 realtime_msg = ", set_realtime: false";
89 b2 = false;
90 } else {
91 realtime_msg = ", set_realtime: true";
92 }
93 }
94 }
95
96 debug!("thread core: {core_u}, set_core_affinity: {b}{realtime_msg}");
97 b && b2
98}
99
100pub struct ThreadPool {
113 pub threads_share: Vec<TaskExecutor>,
114 pub threads_fast: crate::vec::SyncVec<TaskExecutor>,
115
116 pub switch_thread_index: Mutex<i32>,
117
118 pub threads_fast_idx: StockPool<AtomicCell<Option<usize>>>,
121
122 pub stock_lock: StockPool<Arc<spin::Mutex<()>>>,
127 }
131
132impl ThreadPool {
151 pub fn new(mut cpu_fraction_fast: usize, cpu_fraction_share: usize) -> ThreadPool {
152 let cpu_num = CORES.len();
153
154 let skip = get_core_skip();
155 let cpu_num = cpu_num / skip;
156 let mut cpu_fraction_share_num = cpu_num / cpu_fraction_share;
157
158 if cpu_fraction_share_num == 0 {
160 cpu_fraction_share_num = 1;
161 warn!(
162 "cpu_fraction_share_num({cpu_fraction_share_num}) = cpu_num({cpu_num}) / cpu_fraction_share({cpu_fraction_share}), set cpu_fraction_share_num to 1"
163 );
164 }
165
166 debug!("threads_share cpu count: {cpu_fraction_share_num}, skip: {}", get_core_skip());
168 let mut threads_共享 = Vec::with_capacity(cpu_fraction_share_num);
169
170 let share_cores = super::use_last_core2("share_thread", cpu_fraction_share_num);
172
173 for core_id in &share_cores {
175 let core = core_affinity::CoreId { id: *core_id };
176 threads_共享.push(TaskExecutor::new(core, -1));
177 }
178
179 if cpu_fraction_fast == 0 {
181 cpu_fraction_fast = cpu_num;
182 }
183 let mut cpu_fraction_fast_num = cpu_num / cpu_fraction_fast;
184 if cpu_fraction_fast_num == 0 {
185 cpu_fraction_fast_num = 1;
186 warn!(
187 "cpu_fraction_fast_num({cpu_fraction_fast_num}) = cpu_num({cpu_num}) / cpu_fraction_fast({cpu_fraction_fast}), set cpu_fraction_fast_num to 1"
188 );
189 }
190 debug!("theads_fraction_fast cpu count: {}", cpu_fraction_fast_num);
191 let theads_高性能模式 = crate::vec::SyncVec::with_capacity(cpu_fraction_fast_num);
192
193 let fast_cores = super::use_last_core2("fast_thread", cpu_fraction_fast_num);
195
196 for core_id in &fast_cores {
198 let core = core_affinity::CoreId { id: *core_id };
199 theads_高性能模式.push(TaskExecutor::new(core, 49));
200 }
201
202 debug!("fast_thread_pool_bind_cores: share cores: {:?}, fast cores: {:?}", share_cores, fast_cores);
203
204 let r = ThreadPool {
205 threads_share: threads_共享,
206 threads_fast: theads_高性能模式,
207 threads_fast_idx: StockPool::new_default(),
209 stock_lock: StockPool::new_default(),
211 switch_thread_index: (-1).into(),
212 };
215
216 r
222 }
223
224 pub fn count_task_min(&self, i7: i32) -> IdxStatus {
273 let len = self.threads_fast.len();
276
277 let mut min_index: spin::MutexGuard<i32> = match self.switch_thread_index.try_lock() {
280 Some(mutex) => mutex,
281 None => return IdxStatus::Discard(i7 as usize % len),
282 };
283
284 if *min_index == 0 || *min_index == -1 {
287 *min_index = len as i32 - 1;
288 } else {
289 *min_index -= 1;
290 }
291
292 let r = *min_index as usize;
295 self.threads_fast_idx[i7].store(Some(r));
296 IdxStatus::Remember(r)
297 }
298
299 #[inline(always)]
300 pub fn spawn<F>(&self, i7: i32, f: F)
301 where
302 F: FnOnce(),
303 F: Send + 'static,
304 {
305 let _lock = self.stock_lock[i7].clone();
306 let index = i7 % self.threads_share.len() as i32;
307 self.threads_share[index as usize].spawn(move |_core| {
308 let _lock = _lock.lock();
309 f();
312 drop(_lock);
313 });
314 }
315
316 #[inline(always)]
317 pub fn spawn_fast<F>(&self, i7: i32, f: F)
318 where
319 F: FnOnce(),
320 F: Send + 'static,
321 {
322 let mut on_fast_idx = -1;
323
324 #[cfg(not(feature = "thread_dispatch"))]
326 let thread_idx = self.threads_fast_idx[i7].load().unwrap_or_else(|| {
327 let min = self.count_task_min(i7);
328 let idx = min.get_idx();
329 if let IdxStatus::Remember(idx) = &min {
330 on_fast_idx = *idx as i32;
331 }
332 idx
333 });
334
335 #[cfg(feature = "thread_dispatch")]
338 let thread_idx = match self.threads_fast_idx[i7].load() {
339 Some(i) if self.threads_fast.get_uncheck(i).count.load() < 1000 => i,
340 _ => {
341 let min = self.count_task_min(i7);
342 let idx = min.get_idx();
343 if let IdxStatus::Remember(idx) = &min {
344 on_fast_idx = *idx as i32;
345 }
346 idx
347 }
348 };
349
350 let lock = self.stock_lock[i7].clone();
352 self.threads_fast.get_uncheck(thread_idx).spawn(move |_core| {
353 let lock_v = lock.lock();
354 f();
356 drop(lock_v);
357 if on_fast_idx != -1 {
358 }
360 });
361 }
362
363 #[inline(always)]
364 pub fn spawn_is_fast<F>(&self, i7: i32, is_fast: bool, f: F)
365 where
366 F: FnOnce(),
367 F: Send + 'static,
368 {
369 if is_fast {
370 self.spawn_fast(i7, f);
371 } else {
372 self.spawn(i7, f);
373 }
374 }
375}
376
377pub enum IdxStatus {
378 Remember(usize),
380 Discard(usize),
382}
383
384impl IdxStatus {
385 pub fn get_idx(&self) -> usize {
386 match self {
387 IdxStatus::Remember(idx) => *idx,
388 IdxStatus::Discard(idx) => *idx,
389 }
390 }
391}
392
393#[test]
394fn _test_pool() {
395 use crate::fast_thread_pool::*;
396
397 unsafe { std::env::set_var("RUST_LOG", "debug") };
398 env_logger::init();
399
400 init(true);
401 let _core = use_last_core("use_last_core");
403 info!("use_last_core_多个核心: {:?}", _core);
405 let pool = ThreadPool::new(2, 4); let _core = use_last_core2("use_last_core_多个核心2", 5);
411 let _core2 = use_last_core("use_last_core2");
412 let _core3 = use_last_core("use_last_core3");
413 let _core4 = use_last_core("use_last_core4");
414 let _core5 = use_last_core("use_last_core5");
415 let _core6 = use_last_core("use_last_core6");
416 std::thread::sleep(std::time::Duration::from_millis(200));
417
418 let count = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
419 let elapsed_total = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
420 let elapsed_exp = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
421
422 let count_c = count.clone();
423 let elapsed_total_c = elapsed_total.clone();
424 let elapsed_exp_c = elapsed_exp.clone();
425 std::thread::spawn(move || {
426 loop {
427 std::thread::sleep(std::time::Duration::from_secs(3));
428 let count = count_c.fetch_and(0);
429 let elapsed_total = elapsed_total_c.fetch_and(0);
430 let elapsed_exp = elapsed_exp_c.fetch_and(0);
431 info!(
432 "3秒钟执行任务数: {}, 所有任务耗时(微秒): {}, 平均耗时: {}, 耗时任务数(100微秒): {}, 耗时任务数占比: {:.0}/10000",
433 count,
434 elapsed_total,
435 elapsed_total / count,
436 elapsed_exp,
437 elapsed_exp as f64 / count as f64 * 10000.0,
438 );
439 }
440 });
441
442 loop {
443 for i in 0..100 {
444 let time_hs = std::time::Instant::now();
446 let count = count.clone();
447 let elapsed_total = elapsed_total.clone();
448 let elapsed_exp = elapsed_exp.clone();
449 let i7 = if i % 3 == 0 { 1000001 } else { 1000002 };
452 pool.spawn_is_fast(i7, true, move || {
453 let micros = time_hs.elapsed().as_micros();
454 count.fetch_add(1);
455 elapsed_total.fetch_add(micros as i64);
456 if micros > 100 {
457 elapsed_exp.fetch_add(1);
458 }
459 });
460 }
461 std::thread::sleep(std::time::Duration::from_micros(110));
462 }
463 std::thread::sleep(std::time::Duration::from_secs(9999));
464}