fast_able/fast_thread_pool/
utils.rs

1use std::{
2    fs::{self, File, OpenOptions},
3    io::{Error, ErrorKind, Read, Seek, SeekFrom, Write},
4    ops::{Deref, DerefMut},
5    sync::LazyLock,
6    thread,
7    time::Duration,
8};
9
10use arraystring::error;
11use core_affinity::CoreId;
12use fs2::FileExt;
13
14use crate::fast_thread_pool::FILE_CORE_AFFINITY;
15
16/// 获取cpu核心数
17pub static CORES: LazyLock<Vec<CoreId>> = LazyLock::new(|| {
18    let core_ids = core_affinity::get_core_ids().unwrap_or_else(|| {
19        warn!("get core ids from core_affinity failed, use default empty vector");
20        vec![]
21    });
22
23    // 测试代码, 模拟实时内核隔离, 将只能使用固定的cpu核心; 使用硬编码的固定核心列表而非系统检测的核心
24    // let core_ids = vec![20, 21, 22, 23, 24, 25, 26, 27, 28].iter().map(|x| CoreId { id: *x }).collect::<Vec<_>>();
25
26    debug!(
27        "use core_affinity core_ids: {:?}",
28        core_ids.iter().map(|x| x.id).collect::<Vec<_>>()
29    );
30    core_ids
31});
32
33/// 带有排他锁的文件包装类型
34/// 在创建时自动获取排他锁,在释放时自动解锁
35struct LockFile {
36    file: File,
37    locked: bool,
38}
39
40impl LockFile {
41    /// 以读写模式打开文件并尝试获取排他锁
42    ///
43    /// # 参数
44    /// * `path` - 文件路径
45    ///
46    /// # 返回值
47    /// * `Result<LockFile, Error>` - 成功返回LockFile实例,失败返回错误
48    fn open(path: &str) -> Result<Self, Error> {
49        let file = OpenOptions::new().read(true).write(true).open(path)?;
50
51        // 尝试获取排他锁
52        match file.try_lock_exclusive() {
53            Ok(_) => Ok(LockFile { file, locked: true }),
54            Err(e) => Err(e),
55        }
56    }
57}
58
59// 实现Deref和DerefMut,这样LockFile可以自动调用内部File的方法
60impl Deref for LockFile {
61    type Target = File;
62
63    fn deref(&self) -> &Self::Target {
64        &self.file
65    }
66}
67
68impl DerefMut for LockFile {
69    fn deref_mut(&mut self) -> &mut Self::Target {
70        &mut self.file
71    }
72}
73
74impl Drop for LockFile {
75    fn drop(&mut self) {
76        if self.locked {
77            // 在析构时自动解锁
78            let _ = fs2::FileExt::unlock(&self.file);
79        }
80    }
81}
82
83/// 单核心分配函数 - 使用最后一个闲置核心
84pub fn use_last_core(use_name: &str) -> usize {
85    use_last_core2(use_name, 1)[0]
86}
87
88pub fn use_last_core2(use_name: &str, count: usize) -> Vec<usize> {
89    let core = CORES.clone();
90
91    // 获得之前已经绑定的核心
92    _ = fs::File::create_new(FILE_CORE_AFFINITY);
93
94    // 最大重试次数
95    const MAX_RETRY: usize = 10;
96
97    // 打开文件并获取排他锁
98    let mut lock_file = {
99        let mut retry_count = 0;
100        loop {
101            if retry_count >= MAX_RETRY {
102                error!(
103                    "open core_affinity file overflow max retry {MAX_RETRY}, conot continue execute, return default core: 0"
104                );
105                // 获取默认的核心ID并返回
106                let use_core = core.last().map(|x| x.clone()).unwrap_or_else(|| {
107                    warn!("get cpu core number failed; get use default core: 0");
108                    core_affinity::CoreId { id: 0 }
109                });
110                return vec![use_core.id];
111            }
112
113            match LockFile::open(FILE_CORE_AFFINITY) {
114                Ok(file) => break file,
115                Err(e) => {
116                    // 文件被锁定,等待一段时间后重试
117                    warn!(
118                        ".core_affinity is locked, waiting to release: {}, retry times: {}/{}",
119                        e,
120                        retry_count + 1,
121                        MAX_RETRY
122                    );
123                    retry_count += 1;
124                    thread::sleep(Duration::from_millis(150));
125                }
126            }
127        }
128    };
129
130    let mut file_content = String::new();
131
132    // 使用作用域和提前返回来确保在所有情况下都能正确释放文件锁
133    let use_core_ids = {
134        // 读取文件内容
135        if let Err(e) = lock_file.read_to_string(&mut file_content) {
136            error!("读取core_affinity文件内容失败: {},无法继续执行", e);
137            // 获取默认的核心ID并返回
138            let use_core = core.last().map(|x| x.clone()).unwrap_or_else(|| {
139                warn!("获取cpu核心数失败");
140                core_affinity::CoreId { id: 0 }
141            });
142            return vec![use_core.id];
143        }
144
145        // 解析出已分配的所有核心
146        let mut used_cores = Vec::new();
147        for line in file_content.lines() {
148            // 跳过注释和空行
149            if line.trim().is_empty() || line.trim().starts_with("realtime_system") {
150                continue;
151            }
152            
153            let parts: Vec<&str> = line.split(',').collect();
154            // 跳过第一部分(名称),解析后面的所有核心ID
155            for i in 1..parts.len() {
156                if let Ok(core_id) = parts[i].trim().parse::<usize>() {
157                    used_cores.push(core_id);
158                }
159            }
160        }
161        
162        // debug!("已经分配的所有核心: {:?}", used_cores);
163
164        // 获取所有核心ID并按照从大到小排序
165        let mut all_cores: Vec<usize> = core.iter().map(|c| c.id).collect();
166        all_cores.sort_by(|a, b| b.cmp(a)); // 从大到小排序
167        
168        if all_cores.is_empty() {
169            let default_cores = (0..count).collect::<Vec<_>>();
170            warn!("没有可用的核心,使用默认核心: {:?}", default_cores);
171            return default_cores;
172        }
173        
174        // debug!("所有核心(从大到小排序): {:?}", all_cores);
175        
176        // 找出未使用的核心
177        let available_cores: Vec<usize> = all_cores
178            .iter()
179            .filter(|&id| !used_cores.contains(id))
180            .cloned()
181            .collect();
182        
183        // debug!("可用的未分配核心: {:?}", available_cores);
184        
185        // 创建已选择的核心列表
186        let mut selected_cores = Vec::with_capacity(count);
187        
188        // 如果使用的是硬编码核心列表且所有核心都已被分配(即available_cores为空)
189        // 则我们需要从高到低依次分配核心,而不是都分配同一个核心
190        if available_cores.is_empty() {
191            // debug!("所有核心都已被分配,将按照从高到低顺序循环使用");
192            
193            // 获取最后一次分配的核心
194            let last_core = used_cores.last().cloned().unwrap_or(all_cores[0]);
195            
196            // 在all_cores中查找上一次使用的核心的位置
197            let mut last_index = 0;
198            for (i, &core_id) in all_cores.iter().enumerate() {
199                if core_id == last_core {
200                    last_index = i;
201                    break;
202                }
203            }
204            
205            // 下一个要使用的核心索引 - 从这个索引开始分配
206            let mut start_index = (last_index + 1) % all_cores.len();
207            
208            // debug!("上一次使用的核心: {}, 索引: {}, 下一个起始索引: {}", 
209            //        last_core, last_index, start_index);
210            
211            // 按序循环选择核心
212            for i in 0..count {
213                let current_index = (start_index + i) % all_cores.len();
214                selected_cores.push(all_cores[current_index]);
215            }
216        } else {
217            // 首先使用未分配的核心
218            let mut remaining = count;
219            for &core_id in &available_cores {
220                if remaining == 0 {
221                    break;
222                }
223                selected_cores.push(core_id);
224                remaining -= 1;
225            }
226            
227            // 如果未分配的核心不足,则从所有核心开始循环使用
228            if remaining > 0 {
229                // debug!("可用核心不足,将按照从高到低顺序循环使用");
230                
231                // 获取最后一次分配的核心
232                let last_core = if selected_cores.is_empty() {
233                    used_cores.last().cloned().unwrap_or(all_cores[0])
234                } else {
235                    selected_cores.last().cloned().unwrap()
236                };
237                
238                // 在all_cores中查找上一次使用的核心的位置
239                let mut last_index = 0;
240                for (i, &core_id) in all_cores.iter().enumerate() {
241                    if core_id == last_core {
242                        last_index = i;
243                        break;
244                    }
245                }
246                
247                // 下一个要使用的核心索引 - 从这个索引开始分配
248                let mut start_index = (last_index + 1) % all_cores.len();
249                
250                // debug!("上一次使用的核心: {}, 索引: {}, 下一个起始索引: {}", 
251                //        last_core, last_index, start_index);
252                
253                // 从所有核心中循环选择剩余需要的核心
254                for i in 0..remaining {
255                    let current_index = (start_index + i) % all_cores.len();
256                    selected_cores.push(all_cores[current_index]);
257                }
258            }
259        }
260        
261        // debug!("本次分配的核心: {:?}", selected_cores);
262
263        // 将文件指针移到文件末尾进行追加写入
264        if let Err(e) = lock_file.seek(SeekFrom::End(0)) {
265            error!("移动文件指针到文件末尾失败: {},无法写入新数据", e);
266            return selected_cores;
267        }
268
269        // 构建写入格式: "{use_name},核心1,核心2,xxx"
270        let mut write_content = format!("{use_name}");
271        for core_id in &selected_cores {
272            write_content.push_str(&format!(",{}", core_id));
273        }
274        write_content.push_str("\n");
275
276        // 写入新数据
277        if let Err(e) = lock_file.write_all(write_content.as_bytes()) {
278            error!("写入数据到core_affinity文件失败: {}", e);
279            return selected_cores;
280        }
281
282        // 刷新文件缓冲区
283        if let Err(e) = lock_file.flush() {
284            error!("刷新文件缓冲区失败: {}", e);
285        }
286
287        selected_cores
288    };
289
290    debug!("{use_name} use_cores: {:?}", use_core_ids);
291
292    // 返回核心ID列表
293    use_core_ids
294}
295
296fn read_linux_system_cpu_cores() -> Option<Vec<CoreId>> {
297    // 读取Linux系统文件获取所有物理核心
298    std::fs::read_to_string("/sys/devices/system/cpu/present")
299        .ok()
300        .and_then(|content| {
301            parse_cpu_range(&content)
302                .map(|cores| cores.into_iter().map(|id| CoreId { id }).collect())
303        })
304}
305
306fn parse_cpu_range(content: &str) -> Option<Vec<usize>> {
307    let mut cpus = Vec::new();
308    let content = content.trim();
309
310    if content.is_empty() {
311        return None;
312    }
313
314    for part in content.split(',') {
315        if let Some((start, end)) = part.split_once('-') {
316            let start = start.parse::<usize>().ok()?;
317            let end = end.parse::<usize>().ok()?;
318            cpus.extend(start..=end);
319        } else {
320            let cpu = part.parse::<usize>().ok()?;
321            cpus.push(cpu);
322        }
323    }
324
325    Some(cpus)
326}
327
328#[cfg(feature = "deal_physical_cpu")]
329pub fn get_core_skip() -> usize {
330    let core_ids = num_cpus::get();
331    let core_physical = num_cpus::get_physical();
332    if core_ids / core_physical == 2 {
333        warn!("core_ids: {core_ids}, core_physical: {core_physical}; skip 2");
334        2
335    } else {
336        1
337    }
338}
339
340#[cfg(not(feature = "deal_physical_cpu"))]
341pub fn get_core_skip() -> usize {
342    1
343}