fast_able/fast_thread_pool/
pool.rs1use crate::fast_thread_pool::{get_core_skip, CORES};
2use crate::stock_pool::StockPool;
3use crossbeam::atomic::AtomicCell;
4use spin::Mutex;
5use std::{
6 fs,
7 sync::{Arc, LazyLock},
8};
9
10use super::TaskExecutor;
11
12pub const FILE_CORE_AFFINITY: &str = "./.core_affinity";
13
14pub fn init(is_realtime_system: bool) {
16 _ = fs::remove_file(FILE_CORE_AFFINITY);
17 warn!(
18 "thread_mod init; remove_file core_affinity: {}, is_realtime_system: {is_realtime_system}",
19 FILE_CORE_AFFINITY
20 );
21
22 if is_realtime_system {
23 fs::write(FILE_CORE_AFFINITY, "realtime_system,\n").unwrap();
24 }
25}
26
27
28pub fn set_core_affinity_and_realtime(core_u: usize, _realtime: i32) -> bool {
32 let ALL_CORES = CORES.clone();
33
34 let core = ALL_CORES.iter().find(|&&core| core.id == core_u);
35
36 let core = match core {
37 Some(core) => core,
38 None => match ALL_CORES.last() {
39 Some(core) => core,
40 None => return false,
41 },
42 };
43
44 let b = core_affinity::set_for_current(core.clone());
45
46 #[allow(unused_mut)]
47 let mut b2 = true;
48
49 #[allow(unused_mut)]
50 let mut realtime_msg = "";
51
52 static _IS_REALTIME_SYSTEM: LazyLock<bool> = LazyLock::new(|| {
53 std::fs::read_to_string(FILE_CORE_AFFINITY)
54 .unwrap_or_else(|_| "".to_string())
55 .contains("realtime_system")
56 });
57
58 #[cfg(target_os = "linux")]
59 if b && _realtime > 0 && *_IS_REALTIME_SYSTEM {
60 use libc::{EINVAL, EPERM, ESRCH, SCHED_RR, sched_param, sched_setscheduler};
61 let param = sched_param {
62 sched_priority: _realtime,
63 };
64 unsafe {
65 let ret = sched_setscheduler(0, SCHED_RR, ¶m);
67 if ret != 0 {
68 realtime_msg = ", set_realtime: false";
85 b2 = false;
86 } else {
87 realtime_msg = ", set_realtime: true";
88 }
89 }
90 }
91
92 debug!("thread core: {core_u}, set_core_affinity: {b}{realtime_msg}");
93 b && b2
94}
95
96pub struct ThreadPool {
104 pub threads_share: Vec<TaskExecutor>,
105 pub threads_fast: crate::vec::SyncVec<TaskExecutor>,
106
107 pub switch_thread_index: Mutex<i32>,
108
109 pub threads_fast_idx: StockPool<AtomicCell<Option<usize>>>,
112
113 pub stock_lock: StockPool<Arc<spin::Mutex<()>>>,
118 }
122
123impl ThreadPool {
142 pub fn new(mut cpu_fraction_fast: usize, cpu_fraction_share: usize) -> ThreadPool {
143 let cpu_num = CORES.len();
144
145 let skip = get_core_skip();
146 let cpu_num = cpu_num / skip;
147 let mut cpu_fraction_share_num = cpu_num / cpu_fraction_share;
148
149 if cpu_fraction_share_num == 0 {
151 cpu_fraction_share_num = 1;
152 warn!(
153 "cpu_fraction_share_num({cpu_fraction_share_num}) = cpu_num({cpu_num}) / cpu_fraction_share({cpu_fraction_share}), set cpu_fraction_share_num to 1"
154 );
155 }
156
157 debug!("threads_share cpu count: {cpu_fraction_share_num}, skip: {}", get_core_skip());
159 let mut threads_共享 = Vec::with_capacity(cpu_fraction_share_num);
160
161 let share_cores = super::use_last_core2("share_thread", cpu_fraction_share_num);
163
164 for core_id in &share_cores {
166 let core = core_affinity::CoreId { id: *core_id };
167 threads_共享.push(TaskExecutor::new(core, -1));
168 }
169
170 if cpu_fraction_fast == 0 {
172 cpu_fraction_fast = cpu_num;
173 }
174 let mut cpu_fraction_fast_num = cpu_num / cpu_fraction_fast;
175 if cpu_fraction_fast_num == 0 {
176 cpu_fraction_fast_num = 1;
177 warn!(
178 "cpu_fraction_fast_num({cpu_fraction_fast_num}) = cpu_num({cpu_num}) / cpu_fraction_fast({cpu_fraction_fast}), set cpu_fraction_fast_num to 1"
179 );
180 }
181 debug!("theads_fraction_fast cpu count: {}", cpu_fraction_fast_num);
182 let theads_高性能模式 = crate::vec::SyncVec::with_capacity(cpu_fraction_fast_num);
183
184 let fast_cores = super::use_last_core2("fast_thread", cpu_fraction_fast_num);
186
187 for core_id in &fast_cores {
189 let core = core_affinity::CoreId { id: *core_id };
190 theads_高性能模式.push(TaskExecutor::new(core, 49));
191 }
192
193 debug!("fast_thread_pool_bind_cores: share cores: {:?}, fast cores: {:?}", share_cores, fast_cores);
194
195 let r = ThreadPool {
196 threads_share: threads_共享,
197 threads_fast: theads_高性能模式,
198 threads_fast_idx: StockPool::new_default(),
200 stock_lock: StockPool::new_default(),
202 switch_thread_index: (-1).into(),
203 };
206
207 r
213 }
214
215 pub fn count_task_min(&self, i7: i32) -> IdxStatus {
264 let len = self.threads_fast.len();
267
268 let mut min_index: spin::MutexGuard<i32> = match self.switch_thread_index.try_lock() {
271 Some(mutex) => mutex,
272 None => return IdxStatus::Discard(i7 as usize % len),
273 };
274
275 if *min_index == 0 || *min_index == -1 {
278 *min_index = len as i32 - 1;
279 } else {
280 *min_index -= 1;
281 }
282
283 let r = *min_index as usize;
286 self.threads_fast_idx[i7].store(Some(r));
287 IdxStatus::Remember(r)
288 }
289
290 #[inline(always)]
291 pub fn spawn<F>(&self, i7: i32, f: F)
292 where
293 F: FnOnce(),
294 F: Send + 'static,
295 {
296 let _lock = self.stock_lock[i7].clone();
297 let index = i7 % self.threads_share.len() as i32;
298 self.threads_share[index as usize].spawn(move |_core| {
299 let _lock = _lock.lock();
300 f();
303 drop(_lock);
304 });
305 }
306
307 #[inline(always)]
308 pub fn spawn_fast<F>(&self, i7: i32, f: F)
309 where
310 F: FnOnce(),
311 F: Send + 'static,
312 {
313 let mut on_fast_idx = -1;
314
315 #[cfg(not(feature = "thread_dispatch"))]
317 let thread_idx = self.threads_fast_idx[i7].load().unwrap_or_else(|| {
318 let min = self.count_task_min(i7);
319 let idx = min.get_idx();
320 if let IdxStatus::Remember(idx) = &min {
321 on_fast_idx = *idx as i32;
322 }
323 idx
324 });
325
326 #[cfg(feature = "thread_dispatch")]
329 let thread_idx = match self.threads_fast_idx[i7].load() {
330 Some(i) if self.threads_fast[i].count.load() < 1000 => i,
331 _ => {
332 let min = self.count_task_min(i7);
333 let idx = min.get_idx();
334 if let IdxStatus::Remember(idx) = &min {
335 on_fast_idx = *idx as i32;
336 }
337 idx
338 }
339 };
340
341 let lock = self.stock_lock[i7].clone();
343 self.threads_fast[thread_idx].spawn(move |_core| {
344 let lock_v = lock.lock();
345 f();
347 drop(lock_v);
348 if on_fast_idx != -1 {
349 }
351 });
352 }
353
354 #[inline(always)]
355 pub fn spawn_is_fast<F>(&self, i7: i32, is_fast: bool, f: F)
356 where
357 F: FnOnce(),
358 F: Send + 'static,
359 {
360 if is_fast {
361 self.spawn_fast(i7, f);
362 } else {
363 self.spawn(i7, f);
364 }
365 }
366}
367
368pub enum IdxStatus {
369 Remember(usize),
371 Discard(usize),
373}
374
375impl IdxStatus {
376 pub fn get_idx(&self) -> usize {
377 match self {
378 IdxStatus::Remember(idx) => *idx,
379 IdxStatus::Discard(idx) => *idx,
380 }
381 }
382}
383
384#[test]
385fn _test_pool() {
386 use crate::fast_thread_pool::*;
387
388 unsafe { std::env::set_var("RUST_LOG", "debug") };
389 env_logger::init();
390
391 init(true);
392 let _core = use_last_core("use_last_core");
394 info!("use_last_core_多个核心: {:?}", _core);
396 let pool = ThreadPool::new(2, 4); let _core = use_last_core2("use_last_core_多个核心2", 5);
402 let _core2 = use_last_core("use_last_core2");
403 let _core3 = use_last_core("use_last_core3");
404 let _core4 = use_last_core("use_last_core4");
405 let _core5 = use_last_core("use_last_core5");
406 let _core6 = use_last_core("use_last_core6");
407 std::thread::sleep(std::time::Duration::from_millis(200));
408
409 let count = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
410 let elapsed_total = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
411 let elapsed_exp = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
412
413 let count_c = count.clone();
414 let elapsed_total_c = elapsed_total.clone();
415 let elapsed_exp_c = elapsed_exp.clone();
416 std::thread::spawn(move || {
417 loop {
418 std::thread::sleep(std::time::Duration::from_secs(3));
419 let count = count_c.fetch_and(0);
420 let elapsed_total = elapsed_total_c.fetch_and(0);
421 let elapsed_exp = elapsed_exp_c.fetch_and(0);
422 info!(
423 "3秒钟执行任务数: {}, 所有任务耗时(微秒): {}, 平均耗时: {}, 耗时任务数(100微秒): {}, 耗时任务数占比: {:.0}/10000",
424 count,
425 elapsed_total,
426 elapsed_total / count,
427 elapsed_exp,
428 elapsed_exp as f64 / count as f64 * 10000.0,
429 );
430 }
431 });
432
433 loop {
434 for i in 0..100 {
435 let time_hs = std::time::Instant::now();
437 let count = count.clone();
438 let elapsed_total = elapsed_total.clone();
439 let elapsed_exp = elapsed_exp.clone();
440 let i7 = if i % 3 == 0 { 1000001 } else { 1000002 };
443 pool.spawn_is_fast(i7, true, move || {
444 let micros = time_hs.elapsed().as_micros();
445 count.fetch_add(1);
446 elapsed_total.fetch_add(micros as i64);
447 if micros > 100 {
448 elapsed_exp.fetch_add(1);
449 }
450 });
451 }
452 std::thread::sleep(std::time::Duration::from_micros(110));
453 }
454 std::thread::sleep(std::time::Duration::from_secs(9999));
455}