fast-able 1.20.2

The world's martial arts are fast and unbreakable; 天下武功 唯快不破
Documentation
use std::{
    fs::{self, File, OpenOptions},
    io::{Error, ErrorKind, Read, Seek, SeekFrom, Write},
    ops::{Deref, DerefMut},
    sync::LazyLock,
    thread,
    time::Duration,
};

use arraystring::error;
use core_affinity::CoreId;
use fs2::FileExt;

use crate::fast_thread_pool::FILE_CORE_AFFINITY;

/// Get CPU core count
/// 获取cpu核心数
pub static CORES: LazyLock<Vec<CoreId>> = LazyLock::new(|| {
    let core_ids = core_affinity::get_core_ids().unwrap_or_else(|| {
        warn!("get core ids from core_affinity failed, use default empty vector");
        vec![]
    });

    // 测试代码, 模拟实时内核隔离, 将只能使用固定的cpu核心; 使用硬编码的固定核心列表而非系统检测的核心
    // let core_ids = vec![20, 21, 22, 23, 24, 25, 26, 27, 28].iter().map(|x| CoreId { id: *x }).collect::<Vec<_>>();

    debug!(
        "use core_affinity core_ids: {:?}",
        core_ids.iter().map(|x| x.id).collect::<Vec<_>>()
    );
    core_ids
});

/// File wrapper type with exclusive lock
/// Automatically acquires exclusive lock on creation, and unlocks on drop
/// 带有排他锁的文件包装类型
/// 在创建时自动获取排他锁,在释放时自动解锁
struct LockFile {
    file: File,
    locked: bool,
}

impl LockFile {
    /// Open file in read-write mode and try to acquire exclusive lock
    /// 以读写模式打开文件并尝试获取排他锁
    ///
    /// # Arguments
    /// * `path` - File path
    /// # 参数
    /// * `path` - 文件路径
    ///
    /// # Returns
    /// * `Result<LockFile, Error>` - Returns LockFile instance on success, error on failure
    /// # 返回值
    /// * `Result<LockFile, Error>` - 成功返回LockFile实例,失败返回错误
    fn open(path: &str) -> Result<Self, Error> {
        let file = OpenOptions::new().read(true).write(true).open(path)?;

        // 尝试获取排他锁
        match file.try_lock_exclusive() {
            Ok(_) => Ok(LockFile { file, locked: true }),
            Err(e) => Err(e),
        }
    }
}

// 实现Deref和DerefMut,这样LockFile可以自动调用内部File的方法
impl Deref for LockFile {
    type Target = File;

    fn deref(&self) -> &Self::Target {
        &self.file
    }
}

impl DerefMut for LockFile {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.file
    }
}

impl Drop for LockFile {
    fn drop(&mut self) {
        if self.locked {
            // 在析构时自动解锁
            let _ = fs2::FileExt::unlock(&self.file);
        }
    }
}

/// 单核心分配函数 - 使用最后一个闲置核心
pub fn use_last_core(use_name: &str) -> usize {
    use_last_core2(use_name, 1)[0]
}

pub fn use_last_core2(use_name: &str, count: usize) -> Vec<usize> {
    let core = CORES.clone();

    // 获得之前已经绑定的核心
    _ = fs::File::create_new(FILE_CORE_AFFINITY);

    // 最大重试次数
    const MAX_RETRY: usize = 10;

    // 打开文件并获取排他锁
    let mut lock_file = {
        let mut retry_count = 0;
        loop {
            if retry_count >= MAX_RETRY {
                error!(
                    "open core_affinity file overflow max retry {MAX_RETRY}, conot continue execute, return default core: 0"
                );
                // 获取默认的核心ID并返回
                let use_core = core.last().map(|x| x.clone()).unwrap_or_else(|| {
                    warn!("get cpu core number failed; get use default core: 0");
                    core_affinity::CoreId { id: 0 }
                });
                return vec![use_core.id];
            }

            match LockFile::open(FILE_CORE_AFFINITY) {
                Ok(file) => break file,
                Err(e) => {
                    // 文件被锁定,等待一段时间后重试
                    warn!(
                        ".core_affinity is locked, waiting to release: {}, retry times: {}/{}",
                        e,
                        retry_count + 1,
                        MAX_RETRY
                    );
                    retry_count += 1;
                    thread::sleep(Duration::from_millis(150));
                }
            }
        }
    };

    let mut file_content = String::new();

    // 使用作用域和提前返回来确保在所有情况下都能正确释放文件锁
    let use_core_ids = {
        // 读取文件内容
        if let Err(e) = lock_file.read_to_string(&mut file_content) {
            error!("读取core_affinity文件内容失败: {},无法继续执行", e);
            // 获取默认的核心ID并返回
            let use_core = core.last().map(|x| x.clone()).unwrap_or_else(|| {
                warn!("获取cpu核心数失败");
                core_affinity::CoreId { id: 0 }
            });
            return vec![use_core.id];
        }

        // 解析出已分配的所有核心
        let mut used_cores = Vec::new();
        for line in file_content.lines() {
            // 跳过注释和空行
            if line.trim().is_empty() || line.trim().starts_with("realtime_system") {
                continue;
            }
            
            let parts: Vec<&str> = line.split(',').collect();
            // 跳过第一部分(名称),解析后面的所有核心ID
            for i in 1..parts.len() {
                if let Ok(core_id) = parts[i].trim().parse::<usize>() {
                    used_cores.push(core_id);
                }
            }
        }
        
        // debug!("已经分配的所有核心: {:?}", used_cores);

        // 获取所有核心ID并按照从大到小排序
        let mut all_cores: Vec<usize> = core.iter().map(|c| c.id).collect();
        all_cores.sort_by(|a, b| b.cmp(a)); // 从大到小排序
        
        if all_cores.is_empty() {
            let default_cores = (0..count).collect::<Vec<_>>();
            warn!("没有可用的核心,使用默认核心: {:?}", default_cores);
            return default_cores;
        }
        
        // debug!("所有核心(从大到小排序): {:?}", all_cores);
        
        // 找出未使用的核心
        let available_cores: Vec<usize> = all_cores
            .iter()
            .filter(|&id| !used_cores.contains(id))
            .cloned()
            .collect();
        
        // debug!("可用的未分配核心: {:?}", available_cores);
        
        // 创建已选择的核心列表
        let mut selected_cores = Vec::with_capacity(count);
        
        // 如果使用的是硬编码核心列表且所有核心都已被分配(即available_cores为空)
        // 则我们需要从高到低依次分配核心,而不是都分配同一个核心
        if available_cores.is_empty() {
            // debug!("所有核心都已被分配,将按照从高到低顺序循环使用");
            
            // 获取最后一次分配的核心
            let last_core = used_cores.last().cloned().unwrap_or(all_cores[0]);
            
            // 在all_cores中查找上一次使用的核心的位置
            let mut last_index = 0;
            for (i, &core_id) in all_cores.iter().enumerate() {
                if core_id == last_core {
                    last_index = i;
                    break;
                }
            }
            
            // 下一个要使用的核心索引 - 从这个索引开始分配
            let mut start_index = (last_index + 1) % all_cores.len();
            
            // debug!("上一次使用的核心: {}, 索引: {}, 下一个起始索引: {}", 
            //        last_core, last_index, start_index);
            
            // 按序循环选择核心
            for i in 0..count {
                let current_index = (start_index + i) % all_cores.len();
                selected_cores.push(all_cores[current_index]);
            }
        } else {
            // 首先使用未分配的核心
            let mut remaining = count;
            for &core_id in &available_cores {
                if remaining == 0 {
                    break;
                }
                selected_cores.push(core_id);
                remaining -= 1;
            }
            
            // 如果未分配的核心不足,则从所有核心开始循环使用
            if remaining > 0 {
                // debug!("可用核心不足,将按照从高到低顺序循环使用");
                
                // 获取最后一次分配的核心
                let last_core = if selected_cores.is_empty() {
                    used_cores.last().cloned().unwrap_or(all_cores[0])
                } else {
                    selected_cores.last().cloned().unwrap()
                };
                
                // 在all_cores中查找上一次使用的核心的位置
                let mut last_index = 0;
                for (i, &core_id) in all_cores.iter().enumerate() {
                    if core_id == last_core {
                        last_index = i;
                        break;
                    }
                }
                
                // 下一个要使用的核心索引 - 从这个索引开始分配
                let mut start_index = (last_index + 1) % all_cores.len();
                
                // debug!("上一次使用的核心: {}, 索引: {}, 下一个起始索引: {}", 
                //        last_core, last_index, start_index);
                
                // 从所有核心中循环选择剩余需要的核心
                for i in 0..remaining {
                    let current_index = (start_index + i) % all_cores.len();
                    selected_cores.push(all_cores[current_index]);
                }
            }
        }
        
        // debug!("本次分配的核心: {:?}", selected_cores);

        // 将文件指针移到文件末尾进行追加写入
        if let Err(e) = lock_file.seek(SeekFrom::End(0)) {
            error!("移动文件指针到文件末尾失败: {},无法写入新数据", e);
            return selected_cores;
        }

        // 构建写入格式: "{use_name},核心1,核心2,xxx"
        let mut write_content = format!("{use_name}");
        for core_id in &selected_cores {
            write_content.push_str(&format!(",{}", core_id));
        }
        write_content.push_str("\n");

        // 写入新数据
        if let Err(e) = lock_file.write_all(write_content.as_bytes()) {
            error!("写入数据到core_affinity文件失败: {}", e);
            return selected_cores;
        }

        // 刷新文件缓冲区
        if let Err(e) = lock_file.flush() {
            error!("刷新文件缓冲区失败: {}", e);
        }

        selected_cores
    };

    debug!("{use_name} use_cores: {:?}", use_core_ids);

    // 返回核心ID列表
    use_core_ids
}

fn read_linux_system_cpu_cores() -> Option<Vec<CoreId>> {
    // 读取Linux系统文件获取所有物理核心
    std::fs::read_to_string("/sys/devices/system/cpu/present")
        .ok()
        .and_then(|content| {
            parse_cpu_range(&content)
                .map(|cores| cores.into_iter().map(|id| CoreId { id }).collect())
        })
}

fn parse_cpu_range(content: &str) -> Option<Vec<usize>> {
    let mut cpus = Vec::new();
    let content = content.trim();

    if content.is_empty() {
        return None;
    }

    for part in content.split(',') {
        if let Some((start, end)) = part.split_once('-') {
            let start = start.parse::<usize>().ok()?;
            let end = end.parse::<usize>().ok()?;
            cpus.extend(start..=end);
        } else {
            let cpu = part.parse::<usize>().ok()?;
            cpus.push(cpu);
        }
    }

    Some(cpus)
}

#[cfg(feature = "deal_physical_cpu")]
pub fn get_core_skip() -> usize {
    let core_ids = num_cpus::get();
    let core_physical = num_cpus::get_physical();
    if core_ids / core_physical == 2 {
        warn!("core_ids: {core_ids}, core_physical: {core_physical}; skip 2");
        2
    } else {
        1
    }
}

#[cfg(not(feature = "deal_physical_cpu"))]
pub fn get_core_skip() -> usize {
    1
}