fast_able/fast_thread_pool/
utils.rs1use std::{
2 fs::{self, File, OpenOptions},
3 io::{Error, ErrorKind, Read, Seek, SeekFrom, Write},
4 ops::{Deref, DerefMut},
5 sync::LazyLock,
6 thread,
7 time::Duration,
8};
9
10use arraystring::error;
11use core_affinity::CoreId;
12use fs2::FileExt;
13
14use crate::fast_thread_pool::FILE_CORE_AFFINITY;
15
16pub static CORES: LazyLock<Vec<CoreId>> = LazyLock::new(|| {
18 let core_ids = core_affinity::get_core_ids().unwrap_or_else(|| {
19 warn!("get core ids from core_affinity failed, use default empty vector");
20 vec![]
21 });
22
23 debug!(
27 "use core_affinity core_ids: {:?}",
28 core_ids.iter().map(|x| x.id).collect::<Vec<_>>()
29 );
30 core_ids
31});
32
33struct LockFile {
36 file: File,
37 locked: bool,
38}
39
40impl LockFile {
41 fn open(path: &str) -> Result<Self, Error> {
49 let file = OpenOptions::new().read(true).write(true).open(path)?;
50
51 match file.try_lock_exclusive() {
53 Ok(_) => Ok(LockFile { file, locked: true }),
54 Err(e) => Err(e),
55 }
56 }
57}
58
59impl Deref for LockFile {
61 type Target = File;
62
63 fn deref(&self) -> &Self::Target {
64 &self.file
65 }
66}
67
68impl DerefMut for LockFile {
69 fn deref_mut(&mut self) -> &mut Self::Target {
70 &mut self.file
71 }
72}
73
74impl Drop for LockFile {
75 fn drop(&mut self) {
76 if self.locked {
77 let _ = fs2::FileExt::unlock(&self.file);
79 }
80 }
81}
82
83pub fn use_last_core(use_name: &str) -> usize {
85 use_last_core2(use_name, 1)[0]
86}
87
88pub fn use_last_core2(use_name: &str, count: usize) -> Vec<usize> {
89 let core = CORES.clone();
90
91 _ = fs::File::create_new(FILE_CORE_AFFINITY);
93
94 const MAX_RETRY: usize = 10;
96
97 let mut lock_file = {
99 let mut retry_count = 0;
100 loop {
101 if retry_count >= MAX_RETRY {
102 error!(
103 "open core_affinity file overflow max retry {MAX_RETRY}, conot continue execute, return default core: 0"
104 );
105 let use_core = core.last().map(|x| x.clone()).unwrap_or_else(|| {
107 warn!("get cpu core number failed; get use default core: 0");
108 core_affinity::CoreId { id: 0 }
109 });
110 return vec![use_core.id];
111 }
112
113 match LockFile::open(FILE_CORE_AFFINITY) {
114 Ok(file) => break file,
115 Err(e) => {
116 warn!(
118 ".core_affinity is locked, waiting to release: {}, retry times: {}/{}",
119 e,
120 retry_count + 1,
121 MAX_RETRY
122 );
123 retry_count += 1;
124 thread::sleep(Duration::from_millis(150));
125 }
126 }
127 }
128 };
129
130 let mut file_content = String::new();
131
132 let use_core_ids = {
134 if let Err(e) = lock_file.read_to_string(&mut file_content) {
136 error!("读取core_affinity文件内容失败: {},无法继续执行", e);
137 let use_core = core.last().map(|x| x.clone()).unwrap_or_else(|| {
139 warn!("获取cpu核心数失败");
140 core_affinity::CoreId { id: 0 }
141 });
142 return vec![use_core.id];
143 }
144
145 let mut used_cores = Vec::new();
147 for line in file_content.lines() {
148 if line.trim().is_empty() || line.trim().starts_with("realtime_system") {
150 continue;
151 }
152
153 let parts: Vec<&str> = line.split(',').collect();
154 for i in 1..parts.len() {
156 if let Ok(core_id) = parts[i].trim().parse::<usize>() {
157 used_cores.push(core_id);
158 }
159 }
160 }
161
162 let mut all_cores: Vec<usize> = core.iter().map(|c| c.id).collect();
166 all_cores.sort_by(|a, b| b.cmp(a)); if all_cores.is_empty() {
169 let default_cores = (0..count).collect::<Vec<_>>();
170 warn!("没有可用的核心,使用默认核心: {:?}", default_cores);
171 return default_cores;
172 }
173
174 let available_cores: Vec<usize> = all_cores
178 .iter()
179 .filter(|&id| !used_cores.contains(id))
180 .cloned()
181 .collect();
182
183 let mut selected_cores = Vec::with_capacity(count);
187
188 if available_cores.is_empty() {
191 let last_core = used_cores.last().cloned().unwrap_or(all_cores[0]);
195
196 let mut last_index = 0;
198 for (i, &core_id) in all_cores.iter().enumerate() {
199 if core_id == last_core {
200 last_index = i;
201 break;
202 }
203 }
204
205 let mut start_index = (last_index + 1) % all_cores.len();
207
208 for i in 0..count {
213 let current_index = (start_index + i) % all_cores.len();
214 selected_cores.push(all_cores[current_index]);
215 }
216 } else {
217 let mut remaining = count;
219 for &core_id in &available_cores {
220 if remaining == 0 {
221 break;
222 }
223 selected_cores.push(core_id);
224 remaining -= 1;
225 }
226
227 if remaining > 0 {
229 let last_core = if selected_cores.is_empty() {
233 used_cores.last().cloned().unwrap_or(all_cores[0])
234 } else {
235 selected_cores.last().cloned().unwrap()
236 };
237
238 let mut last_index = 0;
240 for (i, &core_id) in all_cores.iter().enumerate() {
241 if core_id == last_core {
242 last_index = i;
243 break;
244 }
245 }
246
247 let mut start_index = (last_index + 1) % all_cores.len();
249
250 for i in 0..remaining {
255 let current_index = (start_index + i) % all_cores.len();
256 selected_cores.push(all_cores[current_index]);
257 }
258 }
259 }
260
261 if let Err(e) = lock_file.seek(SeekFrom::End(0)) {
265 error!("移动文件指针到文件末尾失败: {},无法写入新数据", e);
266 return selected_cores;
267 }
268
269 let mut write_content = format!("{use_name}");
271 for core_id in &selected_cores {
272 write_content.push_str(&format!(",{}", core_id));
273 }
274 write_content.push_str("\n");
275
276 if let Err(e) = lock_file.write_all(write_content.as_bytes()) {
278 error!("写入数据到core_affinity文件失败: {}", e);
279 return selected_cores;
280 }
281
282 if let Err(e) = lock_file.flush() {
284 error!("刷新文件缓冲区失败: {}", e);
285 }
286
287 selected_cores
288 };
289
290 debug!("{use_name} use_cores: {:?}", use_core_ids);
291
292 use_core_ids
294}
295
296fn read_linux_system_cpu_cores() -> Option<Vec<CoreId>> {
297 std::fs::read_to_string("/sys/devices/system/cpu/present")
299 .ok()
300 .and_then(|content| {
301 parse_cpu_range(&content)
302 .map(|cores| cores.into_iter().map(|id| CoreId { id }).collect())
303 })
304}
305
306fn parse_cpu_range(content: &str) -> Option<Vec<usize>> {
307 let mut cpus = Vec::new();
308 let content = content.trim();
309
310 if content.is_empty() {
311 return None;
312 }
313
314 for part in content.split(',') {
315 if let Some((start, end)) = part.split_once('-') {
316 let start = start.parse::<usize>().ok()?;
317 let end = end.parse::<usize>().ok()?;
318 cpus.extend(start..=end);
319 } else {
320 let cpu = part.parse::<usize>().ok()?;
321 cpus.push(cpu);
322 }
323 }
324
325 Some(cpus)
326}
327
328#[cfg(feature = "deal_physical_cpu")]
329pub fn get_core_skip() -> usize {
330 let core_ids = num_cpus::get();
331 let core_physical = num_cpus::get_physical();
332 if core_ids / core_physical == 2 {
333 warn!("core_ids: {core_ids}, core_physical: {core_physical}; skip 2");
334 2
335 } else {
336 1
337 }
338}
339
340#[cfg(not(feature = "deal_physical_cpu"))]
341pub fn get_core_skip() -> usize {
342 1
343}