use crate::fast_thread_pool::{get_core_skip, CORES};
use crate::stock_pool::StockPool;
use crossbeam::atomic::AtomicCell;
use spin::Mutex;
use std::{
fs,
sync::{Arc, LazyLock},
};
use super::TaskExecutor;
pub const FILE_CORE_AFFINITY: &str = "./.core_affinity";
pub fn init(is_realtime_system: bool) {
_ = fs::remove_file(FILE_CORE_AFFINITY);
warn!(
"thread_mod init; remove_file core_affinity: {}, is_realtime_system: {is_realtime_system}",
FILE_CORE_AFFINITY
);
if is_realtime_system {
fs::write(FILE_CORE_AFFINITY, "realtime_system,\n").unwrap();
}
}
pub fn set_core_affinity_and_realtime(core_u: usize, _realtime: i32) -> bool {
let ALL_CORES = CORES.clone();
let core = ALL_CORES.iter().find(|&&core| core.id == core_u);
let core = match core {
Some(core) => core,
None => match ALL_CORES.last() {
Some(core) => core,
None => return false,
},
};
let b = core_affinity::set_for_current(core.clone());
#[allow(unused_mut)]
let mut b2 = true;
#[allow(unused_mut)]
let mut realtime_msg = "";
static _IS_REALTIME_SYSTEM: LazyLock<bool> = LazyLock::new(|| {
std::fs::read_to_string(FILE_CORE_AFFINITY)
.unwrap_or_else(|_| "".to_string())
.contains("realtime_system")
});
#[cfg(target_os = "linux")]
if b && _realtime > 0 && *_IS_REALTIME_SYSTEM {
use libc::{EINVAL, EPERM, ESRCH, SCHED_RR, sched_param, sched_setscheduler};
let param = sched_param {
sched_priority: _realtime,
};
unsafe {
let ret = sched_setscheduler(0, SCHED_RR, ¶m);
if ret != 0 {
realtime_msg = ", set_realtime: false";
b2 = false;
} else {
realtime_msg = ", set_realtime: true";
}
}
}
debug!("thread core: {core_u}, set_core_affinity: {b}{realtime_msg}");
b && b2
}
pub struct ThreadPool {
pub threads_share: Vec<TaskExecutor>,
pub threads_fast: crate::vec::SyncVec<TaskExecutor>,
pub switch_thread_index: Mutex<i32>,
pub threads_fast_idx: StockPool<AtomicCell<Option<usize>>>,
pub stock_lock: StockPool<Arc<spin::Mutex<()>>>,
}
impl ThreadPool {
pub fn new(mut cpu_fraction_fast: usize, cpu_fraction_share: usize) -> ThreadPool {
let cpu_num = CORES.len();
let skip = get_core_skip();
let cpu_num = cpu_num / skip;
let mut cpu_fraction_share_num = cpu_num / cpu_fraction_share;
if cpu_fraction_share_num == 0 {
cpu_fraction_share_num = 1;
warn!(
"cpu_fraction_share_num({cpu_fraction_share_num}) = cpu_num({cpu_num}) / cpu_fraction_share({cpu_fraction_share}), set cpu_fraction_share_num to 1"
);
}
debug!("threads_share cpu count: {cpu_fraction_share_num}, skip: {}", get_core_skip());
let mut threads_共享 = Vec::with_capacity(cpu_fraction_share_num);
let share_cores = super::use_last_core2("share_thread", cpu_fraction_share_num);
for core_id in &share_cores {
let core = core_affinity::CoreId { id: *core_id };
threads_共享.push(TaskExecutor::new(core, -1));
}
if cpu_fraction_fast == 0 {
cpu_fraction_fast = cpu_num;
}
let mut cpu_fraction_fast_num = cpu_num / cpu_fraction_fast;
if cpu_fraction_fast_num == 0 {
cpu_fraction_fast_num = 1;
warn!(
"cpu_fraction_fast_num({cpu_fraction_fast_num}) = cpu_num({cpu_num}) / cpu_fraction_fast({cpu_fraction_fast}), set cpu_fraction_fast_num to 1"
);
}
debug!("theads_fraction_fast cpu count: {}", cpu_fraction_fast_num);
let theads_高性能模式 = crate::vec::SyncVec::with_capacity(cpu_fraction_fast_num);
let fast_cores = super::use_last_core2("fast_thread", cpu_fraction_fast_num);
for core_id in &fast_cores {
let core = core_affinity::CoreId { id: *core_id };
theads_高性能模式.push(TaskExecutor::new(core, 49));
}
debug!("fast_thread_pool_bind_cores: share cores: {:?}, fast cores: {:?}", share_cores, fast_cores);
let r = ThreadPool {
threads_share: threads_共享,
threads_fast: theads_高性能模式,
threads_fast_idx: StockPool::new_default(),
stock_lock: StockPool::new_default(),
switch_thread_index: (-1).into(),
};
r
}
pub fn count_task_min(&self, i7: i32) -> IdxStatus {
let len = self.threads_fast.len();
let mut min_index: spin::MutexGuard<i32> = match self.switch_thread_index.try_lock() {
Some(mutex) => mutex,
None => return IdxStatus::Discard(i7 as usize % len),
};
if *min_index == 0 || *min_index == -1 {
*min_index = len as i32 - 1;
} else {
*min_index -= 1;
}
let r = *min_index as usize;
self.threads_fast_idx[i7].store(Some(r));
IdxStatus::Remember(r)
}
#[inline(always)]
pub fn spawn<F>(&self, i7: i32, f: F)
where
F: FnOnce(),
F: Send + 'static,
{
let _lock = self.stock_lock[i7].clone();
let index = i7 % self.threads_share.len() as i32;
self.threads_share[index as usize].spawn(move |_core| {
let _lock = _lock.lock();
f();
drop(_lock);
});
}
#[inline(always)]
pub fn spawn_fast<F>(&self, i7: i32, f: F)
where
F: FnOnce(),
F: Send + 'static,
{
let mut on_fast_idx = -1;
#[cfg(not(feature = "thread_dispatch"))]
let thread_idx = self.threads_fast_idx[i7].load().unwrap_or_else(|| {
let min = self.count_task_min(i7);
let idx = min.get_idx();
if let IdxStatus::Remember(idx) = &min {
on_fast_idx = *idx as i32;
}
idx
});
#[cfg(feature = "thread_dispatch")]
let thread_idx = match self.threads_fast_idx[i7].load() {
Some(i) if self.threads_fast.get_uncheck(i).count.load() < 1000 => i,
_ => {
let min = self.count_task_min(i7);
let idx = min.get_idx();
if let IdxStatus::Remember(idx) = &min {
on_fast_idx = *idx as i32;
}
idx
}
};
let lock = self.stock_lock[i7].clone();
self.threads_fast.get_uncheck(thread_idx).spawn(move |_core| {
let lock_v = lock.lock();
f();
drop(lock_v);
if on_fast_idx != -1 {
}
});
}
#[inline(always)]
pub fn spawn_is_fast<F>(&self, i7: i32, is_fast: bool, f: F)
where
F: FnOnce(),
F: Send + 'static,
{
if is_fast {
self.spawn_fast(i7, f);
} else {
self.spawn(i7, f);
}
}
}
pub enum IdxStatus {
Remember(usize),
Discard(usize),
}
impl IdxStatus {
pub fn get_idx(&self) -> usize {
match self {
IdxStatus::Remember(idx) => *idx,
IdxStatus::Discard(idx) => *idx,
}
}
}
#[test]
fn _test_pool() {
use crate::fast_thread_pool::*;
unsafe { std::env::set_var("RUST_LOG", "debug") };
env_logger::init();
init(true);
let _core = use_last_core("use_last_core");
info!("use_last_core_多个核心: {:?}", _core);
let pool = ThreadPool::new(2, 4); let _core = use_last_core2("use_last_core_多个核心2", 5);
let _core2 = use_last_core("use_last_core2");
let _core3 = use_last_core("use_last_core3");
let _core4 = use_last_core("use_last_core4");
let _core5 = use_last_core("use_last_core5");
let _core6 = use_last_core("use_last_core6");
std::thread::sleep(std::time::Duration::from_millis(200));
let count = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
let elapsed_total = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
let elapsed_exp = std::sync::Arc::new(crossbeam::atomic::AtomicCell::new(0_i64));
let count_c = count.clone();
let elapsed_total_c = elapsed_total.clone();
let elapsed_exp_c = elapsed_exp.clone();
std::thread::spawn(move || {
loop {
std::thread::sleep(std::time::Duration::from_secs(3));
let count = count_c.fetch_and(0);
let elapsed_total = elapsed_total_c.fetch_and(0);
let elapsed_exp = elapsed_exp_c.fetch_and(0);
info!(
"3秒钟执行任务数: {}, 所有任务耗时(微秒): {}, 平均耗时: {}, 耗时任务数(100微秒): {}, 耗时任务数占比: {:.0}/10000",
count,
elapsed_total,
elapsed_total / count,
elapsed_exp,
elapsed_exp as f64 / count as f64 * 10000.0,
);
}
});
loop {
for i in 0..100 {
let time_hs = std::time::Instant::now();
let count = count.clone();
let elapsed_total = elapsed_total.clone();
let elapsed_exp = elapsed_exp.clone();
let i7 = if i % 3 == 0 { 1000001 } else { 1000002 };
pool.spawn_is_fast(i7, true, move || {
let micros = time_hs.elapsed().as_micros();
count.fetch_add(1);
elapsed_total.fetch_add(micros as i64);
if micros > 100 {
elapsed_exp.fetch_add(1);
}
});
}
std::thread::sleep(std::time::Duration::from_micros(110));
}
std::thread::sleep(std::time::Duration::from_secs(9999));
}