fast_able/fast_thread_pool/
utils.rs

1use std::{
2    fs::{self, File, OpenOptions},
3    io::{Error, ErrorKind, Read, Seek, SeekFrom, Write},
4    ops::{Deref, DerefMut},
5    sync::LazyLock,
6    thread,
7    time::Duration,
8};
9
10use arraystring::error;
11use core_affinity::CoreId;
12use fs2::FileExt;
13
14use crate::fast_thread_pool::FILE_CORE_AFFINITY;
15
16/// Get CPU core count
17/// 获取cpu核心数
18pub static CORES: LazyLock<Vec<CoreId>> = LazyLock::new(|| {
19    let core_ids = core_affinity::get_core_ids().unwrap_or_else(|| {
20        warn!("get core ids from core_affinity failed, use default empty vector");
21        vec![]
22    });
23
24    // 测试代码, 模拟实时内核隔离, 将只能使用固定的cpu核心; 使用硬编码的固定核心列表而非系统检测的核心
25    // let core_ids = vec![20, 21, 22, 23, 24, 25, 26, 27, 28].iter().map(|x| CoreId { id: *x }).collect::<Vec<_>>();
26
27    debug!(
28        "use core_affinity core_ids: {:?}",
29        core_ids.iter().map(|x| x.id).collect::<Vec<_>>()
30    );
31    core_ids
32});
33
34/// File wrapper type with exclusive lock
35/// Automatically acquires exclusive lock on creation, and unlocks on drop
36/// 带有排他锁的文件包装类型
37/// 在创建时自动获取排他锁,在释放时自动解锁
38struct LockFile {
39    file: File,
40    locked: bool,
41}
42
43impl LockFile {
44    /// Open file in read-write mode and try to acquire exclusive lock
45    /// 以读写模式打开文件并尝试获取排他锁
46    ///
47    /// # Arguments
48    /// * `path` - File path
49    /// # 参数
50    /// * `path` - 文件路径
51    ///
52    /// # Returns
53    /// * `Result<LockFile, Error>` - Returns LockFile instance on success, error on failure
54    /// # 返回值
55    /// * `Result<LockFile, Error>` - 成功返回LockFile实例,失败返回错误
56    fn open(path: &str) -> Result<Self, Error> {
57        let file = OpenOptions::new().read(true).write(true).open(path)?;
58
59        // 尝试获取排他锁
60        match file.try_lock_exclusive() {
61            Ok(_) => Ok(LockFile { file, locked: true }),
62            Err(e) => Err(e),
63        }
64    }
65}
66
67// 实现Deref和DerefMut,这样LockFile可以自动调用内部File的方法
68impl Deref for LockFile {
69    type Target = File;
70
71    fn deref(&self) -> &Self::Target {
72        &self.file
73    }
74}
75
76impl DerefMut for LockFile {
77    fn deref_mut(&mut self) -> &mut Self::Target {
78        &mut self.file
79    }
80}
81
82impl Drop for LockFile {
83    fn drop(&mut self) {
84        if self.locked {
85            // 在析构时自动解锁
86            let _ = fs2::FileExt::unlock(&self.file);
87        }
88    }
89}
90
91/// 单核心分配函数 - 使用最后一个闲置核心
92pub fn use_last_core(use_name: &str) -> usize {
93    use_last_core2(use_name, 1)[0]
94}
95
96pub fn use_last_core2(use_name: &str, count: usize) -> Vec<usize> {
97    let core = CORES.clone();
98
99    // 获得之前已经绑定的核心
100    _ = fs::File::create_new(FILE_CORE_AFFINITY);
101
102    // 最大重试次数
103    const MAX_RETRY: usize = 10;
104
105    // 打开文件并获取排他锁
106    let mut lock_file = {
107        let mut retry_count = 0;
108        loop {
109            if retry_count >= MAX_RETRY {
110                error!(
111                    "open core_affinity file overflow max retry {MAX_RETRY}, conot continue execute, return default core: 0"
112                );
113                // 获取默认的核心ID并返回
114                let use_core = core.last().map(|x| x.clone()).unwrap_or_else(|| {
115                    warn!("get cpu core number failed; get use default core: 0");
116                    core_affinity::CoreId { id: 0 }
117                });
118                return vec![use_core.id];
119            }
120
121            match LockFile::open(FILE_CORE_AFFINITY) {
122                Ok(file) => break file,
123                Err(e) => {
124                    // 文件被锁定,等待一段时间后重试
125                    warn!(
126                        ".core_affinity is locked, waiting to release: {}, retry times: {}/{}",
127                        e,
128                        retry_count + 1,
129                        MAX_RETRY
130                    );
131                    retry_count += 1;
132                    thread::sleep(Duration::from_millis(150));
133                }
134            }
135        }
136    };
137
138    let mut file_content = String::new();
139
140    // 使用作用域和提前返回来确保在所有情况下都能正确释放文件锁
141    let use_core_ids = {
142        // 读取文件内容
143        if let Err(e) = lock_file.read_to_string(&mut file_content) {
144            error!("读取core_affinity文件内容失败: {},无法继续执行", e);
145            // 获取默认的核心ID并返回
146            let use_core = core.last().map(|x| x.clone()).unwrap_or_else(|| {
147                warn!("获取cpu核心数失败");
148                core_affinity::CoreId { id: 0 }
149            });
150            return vec![use_core.id];
151        }
152
153        // 解析出已分配的所有核心
154        let mut used_cores = Vec::new();
155        for line in file_content.lines() {
156            // 跳过注释和空行
157            if line.trim().is_empty() || line.trim().starts_with("realtime_system") {
158                continue;
159            }
160            
161            let parts: Vec<&str> = line.split(',').collect();
162            // 跳过第一部分(名称),解析后面的所有核心ID
163            for i in 1..parts.len() {
164                if let Ok(core_id) = parts[i].trim().parse::<usize>() {
165                    used_cores.push(core_id);
166                }
167            }
168        }
169        
170        // debug!("已经分配的所有核心: {:?}", used_cores);
171
172        // 获取所有核心ID并按照从大到小排序
173        let mut all_cores: Vec<usize> = core.iter().map(|c| c.id).collect();
174        all_cores.sort_by(|a, b| b.cmp(a)); // 从大到小排序
175        
176        if all_cores.is_empty() {
177            let default_cores = (0..count).collect::<Vec<_>>();
178            warn!("没有可用的核心,使用默认核心: {:?}", default_cores);
179            return default_cores;
180        }
181        
182        // debug!("所有核心(从大到小排序): {:?}", all_cores);
183        
184        // 找出未使用的核心
185        let available_cores: Vec<usize> = all_cores
186            .iter()
187            .filter(|&id| !used_cores.contains(id))
188            .cloned()
189            .collect();
190        
191        // debug!("可用的未分配核心: {:?}", available_cores);
192        
193        // 创建已选择的核心列表
194        let mut selected_cores = Vec::with_capacity(count);
195        
196        // 如果使用的是硬编码核心列表且所有核心都已被分配(即available_cores为空)
197        // 则我们需要从高到低依次分配核心,而不是都分配同一个核心
198        if available_cores.is_empty() {
199            // debug!("所有核心都已被分配,将按照从高到低顺序循环使用");
200            
201            // 获取最后一次分配的核心
202            let last_core = used_cores.last().cloned().unwrap_or(all_cores[0]);
203            
204            // 在all_cores中查找上一次使用的核心的位置
205            let mut last_index = 0;
206            for (i, &core_id) in all_cores.iter().enumerate() {
207                if core_id == last_core {
208                    last_index = i;
209                    break;
210                }
211            }
212            
213            // 下一个要使用的核心索引 - 从这个索引开始分配
214            let mut start_index = (last_index + 1) % all_cores.len();
215            
216            // debug!("上一次使用的核心: {}, 索引: {}, 下一个起始索引: {}", 
217            //        last_core, last_index, start_index);
218            
219            // 按序循环选择核心
220            for i in 0..count {
221                let current_index = (start_index + i) % all_cores.len();
222                selected_cores.push(all_cores[current_index]);
223            }
224        } else {
225            // 首先使用未分配的核心
226            let mut remaining = count;
227            for &core_id in &available_cores {
228                if remaining == 0 {
229                    break;
230                }
231                selected_cores.push(core_id);
232                remaining -= 1;
233            }
234            
235            // 如果未分配的核心不足,则从所有核心开始循环使用
236            if remaining > 0 {
237                // debug!("可用核心不足,将按照从高到低顺序循环使用");
238                
239                // 获取最后一次分配的核心
240                let last_core = if selected_cores.is_empty() {
241                    used_cores.last().cloned().unwrap_or(all_cores[0])
242                } else {
243                    selected_cores.last().cloned().unwrap()
244                };
245                
246                // 在all_cores中查找上一次使用的核心的位置
247                let mut last_index = 0;
248                for (i, &core_id) in all_cores.iter().enumerate() {
249                    if core_id == last_core {
250                        last_index = i;
251                        break;
252                    }
253                }
254                
255                // 下一个要使用的核心索引 - 从这个索引开始分配
256                let mut start_index = (last_index + 1) % all_cores.len();
257                
258                // debug!("上一次使用的核心: {}, 索引: {}, 下一个起始索引: {}", 
259                //        last_core, last_index, start_index);
260                
261                // 从所有核心中循环选择剩余需要的核心
262                for i in 0..remaining {
263                    let current_index = (start_index + i) % all_cores.len();
264                    selected_cores.push(all_cores[current_index]);
265                }
266            }
267        }
268        
269        // debug!("本次分配的核心: {:?}", selected_cores);
270
271        // 将文件指针移到文件末尾进行追加写入
272        if let Err(e) = lock_file.seek(SeekFrom::End(0)) {
273            error!("移动文件指针到文件末尾失败: {},无法写入新数据", e);
274            return selected_cores;
275        }
276
277        // 构建写入格式: "{use_name},核心1,核心2,xxx"
278        let mut write_content = format!("{use_name}");
279        for core_id in &selected_cores {
280            write_content.push_str(&format!(",{}", core_id));
281        }
282        write_content.push_str("\n");
283
284        // 写入新数据
285        if let Err(e) = lock_file.write_all(write_content.as_bytes()) {
286            error!("写入数据到core_affinity文件失败: {}", e);
287            return selected_cores;
288        }
289
290        // 刷新文件缓冲区
291        if let Err(e) = lock_file.flush() {
292            error!("刷新文件缓冲区失败: {}", e);
293        }
294
295        selected_cores
296    };
297
298    debug!("{use_name} use_cores: {:?}", use_core_ids);
299
300    // 返回核心ID列表
301    use_core_ids
302}
303
304fn read_linux_system_cpu_cores() -> Option<Vec<CoreId>> {
305    // 读取Linux系统文件获取所有物理核心
306    std::fs::read_to_string("/sys/devices/system/cpu/present")
307        .ok()
308        .and_then(|content| {
309            parse_cpu_range(&content)
310                .map(|cores| cores.into_iter().map(|id| CoreId { id }).collect())
311        })
312}
313
314fn parse_cpu_range(content: &str) -> Option<Vec<usize>> {
315    let mut cpus = Vec::new();
316    let content = content.trim();
317
318    if content.is_empty() {
319        return None;
320    }
321
322    for part in content.split(',') {
323        if let Some((start, end)) = part.split_once('-') {
324            let start = start.parse::<usize>().ok()?;
325            let end = end.parse::<usize>().ok()?;
326            cpus.extend(start..=end);
327        } else {
328            let cpu = part.parse::<usize>().ok()?;
329            cpus.push(cpu);
330        }
331    }
332
333    Some(cpus)
334}
335
336#[cfg(feature = "deal_physical_cpu")]
337pub fn get_core_skip() -> usize {
338    let core_ids = num_cpus::get();
339    let core_physical = num_cpus::get_physical();
340    if core_ids / core_physical == 2 {
341        warn!("core_ids: {core_ids}, core_physical: {core_physical}; skip 2");
342        2
343    } else {
344        1
345    }
346}
347
348#[cfg(not(feature = "deal_physical_cpu"))]
349pub fn get_core_skip() -> usize {
350    1
351}