1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
use std::{
fs::{self, File, OpenOptions},
io::{Error, ErrorKind, Read, Seek, SeekFrom, Write},
ops::{Deref, DerefMut},
sync::LazyLock,
thread,
time::Duration,
};
use arraystring::error;
use core_affinity::CoreId;
use fs2::FileExt;
use crate::fast_thread_pool::FILE_CORE_AFFINITY;
/// Get CPU core count
/// 获取cpu核心数
pub static CORES: LazyLock<Vec<CoreId>> = LazyLock::new(|| {
let core_ids = core_affinity::get_core_ids().unwrap_or_else(|| {
warn!("get core ids from core_affinity failed, use default empty vector");
vec![]
});
// 测试代码, 模拟实时内核隔离, 将只能使用固定的cpu核心; 使用硬编码的固定核心列表而非系统检测的核心
// let core_ids = vec![20, 21, 22, 23, 24, 25, 26, 27, 28].iter().map(|x| CoreId { id: *x }).collect::<Vec<_>>();
debug!(
"use core_affinity core_ids: {:?}",
core_ids.iter().map(|x| x.id).collect::<Vec<_>>()
);
core_ids
});
/// File wrapper type with exclusive lock
/// Automatically acquires exclusive lock on creation, and unlocks on drop
/// 带有排他锁的文件包装类型
/// 在创建时自动获取排他锁,在释放时自动解锁
struct LockFile {
file: File,
locked: bool,
}
impl LockFile {
/// Open file in read-write mode and try to acquire exclusive lock
/// 以读写模式打开文件并尝试获取排他锁
///
/// # Arguments
/// * `path` - File path
/// # 参数
/// * `path` - 文件路径
///
/// # Returns
/// * `Result<LockFile, Error>` - Returns LockFile instance on success, error on failure
/// # 返回值
/// * `Result<LockFile, Error>` - 成功返回LockFile实例,失败返回错误
fn open(path: &str) -> Result<Self, Error> {
let file = OpenOptions::new().read(true).write(true).open(path)?;
// 尝试获取排他锁
match file.try_lock_exclusive() {
Ok(_) => Ok(LockFile { file, locked: true }),
Err(e) => Err(e),
}
}
}
// 实现Deref和DerefMut,这样LockFile可以自动调用内部File的方法
impl Deref for LockFile {
type Target = File;
fn deref(&self) -> &Self::Target {
&self.file
}
}
impl DerefMut for LockFile {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.file
}
}
impl Drop for LockFile {
fn drop(&mut self) {
if self.locked {
// 在析构时自动解锁
let _ = fs2::FileExt::unlock(&self.file);
}
}
}
/// 单核心分配函数 - 使用最后一个闲置核心
pub fn use_last_core(use_name: &str) -> usize {
use_last_core2(use_name, 1)[0]
}
pub fn use_last_core2(use_name: &str, count: usize) -> Vec<usize> {
let core = CORES.clone();
// 获得之前已经绑定的核心
_ = fs::File::create_new(FILE_CORE_AFFINITY);
// 最大重试次数
const MAX_RETRY: usize = 10;
// 打开文件并获取排他锁
let mut lock_file = {
let mut retry_count = 0;
loop {
if retry_count >= MAX_RETRY {
error!(
"open core_affinity file overflow max retry {MAX_RETRY}, conot continue execute, return default core: 0"
);
// 获取默认的核心ID并返回
let use_core = core.last().map(|x| x.clone()).unwrap_or_else(|| {
warn!("get cpu core number failed; get use default core: 0");
core_affinity::CoreId { id: 0 }
});
return vec![use_core.id];
}
match LockFile::open(FILE_CORE_AFFINITY) {
Ok(file) => break file,
Err(e) => {
// 文件被锁定,等待一段时间后重试
warn!(
".core_affinity is locked, waiting to release: {}, retry times: {}/{}",
e,
retry_count + 1,
MAX_RETRY
);
retry_count += 1;
thread::sleep(Duration::from_millis(150));
}
}
}
};
let mut file_content = String::new();
// 使用作用域和提前返回来确保在所有情况下都能正确释放文件锁
let use_core_ids = {
// 读取文件内容
if let Err(e) = lock_file.read_to_string(&mut file_content) {
error!("读取core_affinity文件内容失败: {},无法继续执行", e);
// 获取默认的核心ID并返回
let use_core = core.last().map(|x| x.clone()).unwrap_or_else(|| {
warn!("获取cpu核心数失败");
core_affinity::CoreId { id: 0 }
});
return vec![use_core.id];
}
// 解析出已分配的所有核心
let mut used_cores = Vec::new();
for line in file_content.lines() {
// 跳过注释和空行
if line.trim().is_empty() || line.trim().starts_with("realtime_system") {
continue;
}
let parts: Vec<&str> = line.split(',').collect();
// 跳过第一部分(名称),解析后面的所有核心ID
for i in 1..parts.len() {
if let Ok(core_id) = parts[i].trim().parse::<usize>() {
used_cores.push(core_id);
}
}
}
// debug!("已经分配的所有核心: {:?}", used_cores);
// 获取所有核心ID并按照从大到小排序
let mut all_cores: Vec<usize> = core.iter().map(|c| c.id).collect();
all_cores.sort_by(|a, b| b.cmp(a)); // 从大到小排序
if all_cores.is_empty() {
let default_cores = (0..count).collect::<Vec<_>>();
warn!("没有可用的核心,使用默认核心: {:?}", default_cores);
return default_cores;
}
// debug!("所有核心(从大到小排序): {:?}", all_cores);
// 找出未使用的核心
let available_cores: Vec<usize> = all_cores
.iter()
.filter(|&id| !used_cores.contains(id))
.cloned()
.collect();
// debug!("可用的未分配核心: {:?}", available_cores);
// 创建已选择的核心列表
let mut selected_cores = Vec::with_capacity(count);
// 如果使用的是硬编码核心列表且所有核心都已被分配(即available_cores为空)
// 则我们需要从高到低依次分配核心,而不是都分配同一个核心
if available_cores.is_empty() {
// debug!("所有核心都已被分配,将按照从高到低顺序循环使用");
// 获取最后一次分配的核心
let last_core = used_cores.last().cloned().unwrap_or(all_cores[0]);
// 在all_cores中查找上一次使用的核心的位置
let mut last_index = 0;
for (i, &core_id) in all_cores.iter().enumerate() {
if core_id == last_core {
last_index = i;
break;
}
}
// 下一个要使用的核心索引 - 从这个索引开始分配
let mut start_index = (last_index + 1) % all_cores.len();
// debug!("上一次使用的核心: {}, 索引: {}, 下一个起始索引: {}",
// last_core, last_index, start_index);
// 按序循环选择核心
for i in 0..count {
let current_index = (start_index + i) % all_cores.len();
selected_cores.push(all_cores[current_index]);
}
} else {
// 首先使用未分配的核心
let mut remaining = count;
for &core_id in &available_cores {
if remaining == 0 {
break;
}
selected_cores.push(core_id);
remaining -= 1;
}
// 如果未分配的核心不足,则从所有核心开始循环使用
if remaining > 0 {
// debug!("可用核心不足,将按照从高到低顺序循环使用");
// 获取最后一次分配的核心
let last_core = if selected_cores.is_empty() {
used_cores.last().cloned().unwrap_or(all_cores[0])
} else {
selected_cores.last().cloned().unwrap()
};
// 在all_cores中查找上一次使用的核心的位置
let mut last_index = 0;
for (i, &core_id) in all_cores.iter().enumerate() {
if core_id == last_core {
last_index = i;
break;
}
}
// 下一个要使用的核心索引 - 从这个索引开始分配
let mut start_index = (last_index + 1) % all_cores.len();
// debug!("上一次使用的核心: {}, 索引: {}, 下一个起始索引: {}",
// last_core, last_index, start_index);
// 从所有核心中循环选择剩余需要的核心
for i in 0..remaining {
let current_index = (start_index + i) % all_cores.len();
selected_cores.push(all_cores[current_index]);
}
}
}
// debug!("本次分配的核心: {:?}", selected_cores);
// 将文件指针移到文件末尾进行追加写入
if let Err(e) = lock_file.seek(SeekFrom::End(0)) {
error!("移动文件指针到文件末尾失败: {},无法写入新数据", e);
return selected_cores;
}
// 构建写入格式: "{use_name},核心1,核心2,xxx"
let mut write_content = format!("{use_name}");
for core_id in &selected_cores {
write_content.push_str(&format!(",{}", core_id));
}
write_content.push_str("\n");
// 写入新数据
if let Err(e) = lock_file.write_all(write_content.as_bytes()) {
error!("写入数据到core_affinity文件失败: {}", e);
return selected_cores;
}
// 刷新文件缓冲区
if let Err(e) = lock_file.flush() {
error!("刷新文件缓冲区失败: {}", e);
}
selected_cores
};
debug!("{use_name} use_cores: {:?}", use_core_ids);
// 返回核心ID列表
use_core_ids
}
fn read_linux_system_cpu_cores() -> Option<Vec<CoreId>> {
// 读取Linux系统文件获取所有物理核心
std::fs::read_to_string("/sys/devices/system/cpu/present")
.ok()
.and_then(|content| {
parse_cpu_range(&content)
.map(|cores| cores.into_iter().map(|id| CoreId { id }).collect())
})
}
fn parse_cpu_range(content: &str) -> Option<Vec<usize>> {
let mut cpus = Vec::new();
let content = content.trim();
if content.is_empty() {
return None;
}
for part in content.split(',') {
if let Some((start, end)) = part.split_once('-') {
let start = start.parse::<usize>().ok()?;
let end = end.parse::<usize>().ok()?;
cpus.extend(start..=end);
} else {
let cpu = part.parse::<usize>().ok()?;
cpus.push(cpu);
}
}
Some(cpus)
}
#[cfg(feature = "deal_physical_cpu")]
pub fn get_core_skip() -> usize {
let core_ids = num_cpus::get();
let core_physical = num_cpus::get_physical();
if core_ids / core_physical == 2 {
warn!("core_ids: {core_ids}, core_physical: {core_physical}; skip 2");
2
} else {
1
}
}
#[cfg(not(feature = "deal_physical_cpu"))]
pub fn get_core_skip() -> usize {
1
}