fast_able/fast_thread_pool/
utils.rs1use std::{
2 fs::{self, File, OpenOptions},
3 io::{Error, ErrorKind, Read, Seek, SeekFrom, Write},
4 ops::{Deref, DerefMut},
5 sync::LazyLock,
6 thread,
7 time::Duration,
8};
9
10use arraystring::error;
11use core_affinity::CoreId;
12use fs2::FileExt;
13
14use crate::fast_thread_pool::FILE_CORE_AFFINITY;
15
16pub static CORES: LazyLock<Vec<CoreId>> = LazyLock::new(|| {
19 let core_ids = core_affinity::get_core_ids().unwrap_or_else(|| {
20 warn!("get core ids from core_affinity failed, use default empty vector");
21 vec![]
22 });
23
24 debug!(
28 "use core_affinity core_ids: {:?}",
29 core_ids.iter().map(|x| x.id).collect::<Vec<_>>()
30 );
31 core_ids
32});
33
34struct LockFile {
39 file: File,
40 locked: bool,
41}
42
43impl LockFile {
44 fn open(path: &str) -> Result<Self, Error> {
57 let file = OpenOptions::new().read(true).write(true).open(path)?;
58
59 match file.try_lock_exclusive() {
61 Ok(_) => Ok(LockFile { file, locked: true }),
62 Err(e) => Err(e),
63 }
64 }
65}
66
67impl Deref for LockFile {
69 type Target = File;
70
71 fn deref(&self) -> &Self::Target {
72 &self.file
73 }
74}
75
76impl DerefMut for LockFile {
77 fn deref_mut(&mut self) -> &mut Self::Target {
78 &mut self.file
79 }
80}
81
82impl Drop for LockFile {
83 fn drop(&mut self) {
84 if self.locked {
85 let _ = fs2::FileExt::unlock(&self.file);
87 }
88 }
89}
90
91pub fn use_last_core(use_name: &str) -> usize {
93 use_last_core2(use_name, 1)[0]
94}
95
96pub fn use_last_core2(use_name: &str, count: usize) -> Vec<usize> {
97 let core = CORES.clone();
98
99 _ = fs::File::create_new(FILE_CORE_AFFINITY);
101
102 const MAX_RETRY: usize = 10;
104
105 let mut lock_file = {
107 let mut retry_count = 0;
108 loop {
109 if retry_count >= MAX_RETRY {
110 error!(
111 "open core_affinity file overflow max retry {MAX_RETRY}, conot continue execute, return default core: 0"
112 );
113 let use_core = core.last().map(|x| x.clone()).unwrap_or_else(|| {
115 warn!("get cpu core number failed; get use default core: 0");
116 core_affinity::CoreId { id: 0 }
117 });
118 return vec![use_core.id];
119 }
120
121 match LockFile::open(FILE_CORE_AFFINITY) {
122 Ok(file) => break file,
123 Err(e) => {
124 warn!(
126 ".core_affinity is locked, waiting to release: {}, retry times: {}/{}",
127 e,
128 retry_count + 1,
129 MAX_RETRY
130 );
131 retry_count += 1;
132 thread::sleep(Duration::from_millis(150));
133 }
134 }
135 }
136 };
137
138 let mut file_content = String::new();
139
140 let use_core_ids = {
142 if let Err(e) = lock_file.read_to_string(&mut file_content) {
144 error!("读取core_affinity文件内容失败: {},无法继续执行", e);
145 let use_core = core.last().map(|x| x.clone()).unwrap_or_else(|| {
147 warn!("获取cpu核心数失败");
148 core_affinity::CoreId { id: 0 }
149 });
150 return vec![use_core.id];
151 }
152
153 let mut used_cores = Vec::new();
155 for line in file_content.lines() {
156 if line.trim().is_empty() || line.trim().starts_with("realtime_system") {
158 continue;
159 }
160
161 let parts: Vec<&str> = line.split(',').collect();
162 for i in 1..parts.len() {
164 if let Ok(core_id) = parts[i].trim().parse::<usize>() {
165 used_cores.push(core_id);
166 }
167 }
168 }
169
170 let mut all_cores: Vec<usize> = core.iter().map(|c| c.id).collect();
174 all_cores.sort_by(|a, b| b.cmp(a)); if all_cores.is_empty() {
177 let default_cores = (0..count).collect::<Vec<_>>();
178 warn!("没有可用的核心,使用默认核心: {:?}", default_cores);
179 return default_cores;
180 }
181
182 let available_cores: Vec<usize> = all_cores
186 .iter()
187 .filter(|&id| !used_cores.contains(id))
188 .cloned()
189 .collect();
190
191 let mut selected_cores = Vec::with_capacity(count);
195
196 if available_cores.is_empty() {
199 let last_core = used_cores.last().cloned().unwrap_or(all_cores[0]);
203
204 let mut last_index = 0;
206 for (i, &core_id) in all_cores.iter().enumerate() {
207 if core_id == last_core {
208 last_index = i;
209 break;
210 }
211 }
212
213 let mut start_index = (last_index + 1) % all_cores.len();
215
216 for i in 0..count {
221 let current_index = (start_index + i) % all_cores.len();
222 selected_cores.push(all_cores[current_index]);
223 }
224 } else {
225 let mut remaining = count;
227 for &core_id in &available_cores {
228 if remaining == 0 {
229 break;
230 }
231 selected_cores.push(core_id);
232 remaining -= 1;
233 }
234
235 if remaining > 0 {
237 let last_core = if selected_cores.is_empty() {
241 used_cores.last().cloned().unwrap_or(all_cores[0])
242 } else {
243 selected_cores.last().cloned().unwrap()
244 };
245
246 let mut last_index = 0;
248 for (i, &core_id) in all_cores.iter().enumerate() {
249 if core_id == last_core {
250 last_index = i;
251 break;
252 }
253 }
254
255 let mut start_index = (last_index + 1) % all_cores.len();
257
258 for i in 0..remaining {
263 let current_index = (start_index + i) % all_cores.len();
264 selected_cores.push(all_cores[current_index]);
265 }
266 }
267 }
268
269 if let Err(e) = lock_file.seek(SeekFrom::End(0)) {
273 error!("移动文件指针到文件末尾失败: {},无法写入新数据", e);
274 return selected_cores;
275 }
276
277 let mut write_content = format!("{use_name}");
279 for core_id in &selected_cores {
280 write_content.push_str(&format!(",{}", core_id));
281 }
282 write_content.push_str("\n");
283
284 if let Err(e) = lock_file.write_all(write_content.as_bytes()) {
286 error!("写入数据到core_affinity文件失败: {}", e);
287 return selected_cores;
288 }
289
290 if let Err(e) = lock_file.flush() {
292 error!("刷新文件缓冲区失败: {}", e);
293 }
294
295 selected_cores
296 };
297
298 debug!("{use_name} use_cores: {:?}", use_core_ids);
299
300 use_core_ids
302}
303
304fn read_linux_system_cpu_cores() -> Option<Vec<CoreId>> {
305 std::fs::read_to_string("/sys/devices/system/cpu/present")
307 .ok()
308 .and_then(|content| {
309 parse_cpu_range(&content)
310 .map(|cores| cores.into_iter().map(|id| CoreId { id }).collect())
311 })
312}
313
314fn parse_cpu_range(content: &str) -> Option<Vec<usize>> {
315 let mut cpus = Vec::new();
316 let content = content.trim();
317
318 if content.is_empty() {
319 return None;
320 }
321
322 for part in content.split(',') {
323 if let Some((start, end)) = part.split_once('-') {
324 let start = start.parse::<usize>().ok()?;
325 let end = end.parse::<usize>().ok()?;
326 cpus.extend(start..=end);
327 } else {
328 let cpu = part.parse::<usize>().ok()?;
329 cpus.push(cpu);
330 }
331 }
332
333 Some(cpus)
334}
335
336#[cfg(feature = "deal_physical_cpu")]
337pub fn get_core_skip() -> usize {
338 let core_ids = num_cpus::get();
339 let core_physical = num_cpus::get_physical();
340 if core_ids / core_physical == 2 {
341 warn!("core_ids: {core_ids}, core_physical: {core_physical}; skip 2");
342 2
343 } else {
344 1
345 }
346}
347
348#[cfg(not(feature = "deal_physical_cpu"))]
349pub fn get_core_skip() -> usize {
350 1
351}