cpu_load 0.1.2

Real-time CPU load monitoring with intelligent core selection / 实时 CPU 负载监控与智能核心选择
Documentation
#![cfg_attr(docsrs, feature(doc_cfg))]

use std::{
  sync::{
    Arc,
    atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering},
  },
  time::Duration,
};

use compio::time;
pub use iter::CpuLoadIter;
use sysinfo::{CpuRefreshKind, RefreshKind, System};

mod iter;

/// Initial delay before first CPU sampling (ms)
/// 首次 CPU 采样前的初始延迟(毫秒)
const INIT_DELAY_MS: u64 = 100;

/// Default sampling interval (1 second)
/// 默认采样间隔(1 秒)
const DEFAULT_INTERVAL: Duration = Duration::from_secs(1);

/// CPU Load Monitor
/// CPU 负载监控器
pub struct CpuLoad {
  /// Global CPU load (0-100)
  /// 全局 CPU 负载 (0-100)
  global: AtomicU8,
  /// Core loads (0-100)
  /// 各核心负载 (0-100)
  cores: Box<[AtomicU8]>,
  /// Sampling interval
  /// 采样间隔
  interval: Duration,
  /// Core indices sorted by load (ascending)
  /// 按负载升序排列的核心索引
  rank: Box<[AtomicUsize]>,
  /// Round-robin cursor
  /// 轮询游标
  cursor: AtomicUsize,
  /// Sorting in progress flag
  /// 排序进行中标志
  sorting: AtomicBool,
}

impl CpuLoad {
  /// Sample CPU metrics and update
  /// 采样 CPU 指标并更新
  fn sample(sys: &mut System, this: &Self) {
    sys.refresh_cpu_all();

    // Update global load
    // 更新全局负载
    let g = sys.global_cpu_usage().clamp(0.0, 100.0) as u8;
    this.global.store(g, Ordering::Relaxed);

    // Update core loads
    // 更新各核心负载
    let cpus = sys.cpus();
    let len = cpus.len().min(this.cores.len());
    for i in 0..len {
      // SAFETY: i < len <= cores.len() and i < cpus.len()
      let usage = unsafe { cpus.get_unchecked(i) }
        .cpu_usage()
        .clamp(0.0, 100.0) as u8;
      unsafe { this.cores.get_unchecked(i) }.store(usage, Ordering::Relaxed);
    }
  }

  /// Create monitor with default 1s interval
  /// 使用默认 1 秒间隔创建监控器
  #[inline]
  pub fn new() -> Arc<Self> {
    Self::init(DEFAULT_INTERVAL)
  }

  /// Create monitor and start background sampling task
  /// 创建监控器并启动后台采样任务
  pub fn init(interval: Duration) -> Arc<Self> {
    let mut sys =
      System::new_with_specifics(RefreshKind::nothing().with_cpu(CpuRefreshKind::nothing()));
    sys.refresh_cpu_all();
    let n = sys.cpus().len().max(1);

    let cores: Box<[AtomicU8]> = (0..n).map(|_| AtomicU8::new(0)).collect();
    let rank: Box<[AtomicUsize]> = (0..n).map(AtomicUsize::new).collect();

    let inst = Arc::new(Self {
      global: AtomicU8::new(0),
      cores,
      interval,
      rank,
      cursor: AtomicUsize::new(0),
      sorting: AtomicBool::new(false),
    });

    let weak = Arc::downgrade(&inst);

    compio::runtime::spawn(async move {
      let mut sys =
        System::new_with_specifics(RefreshKind::nothing().with_cpu(CpuRefreshKind::everything()));

      // Initial sampling
      // 初始采样
      if let Some(r) = weak.upgrade() {
        time::sleep(Duration::from_millis(INIT_DELAY_MS)).await;
        Self::sample(&mut sys, &r);
      }

      // Regular sampling loop
      // 定期采样循环
      while let Some(r) = weak.upgrade() {
        time::sleep(r.interval).await;
        Self::sample(&mut sys, &r);
      }
    })
    .detach();

    inst
  }

  /// Get the index of the idlest CPU core (Round-Robin with periodic re-sort)
  /// 获取最空闲的 CPU 核心索引(轮询 + 周期性重排序)
  pub fn idlest(&self) -> usize {
    let n = self.rank.len();
    if n == 0 {
      return 0;
    }

    // If sorting in progress, use current rank directly
    // 如果正在排序,直接使用当前 rank
    if self.sorting.load(Ordering::Acquire) {
      let idx = self.cursor.fetch_add(1, Ordering::Relaxed) % n;
      // SAFETY: idx < n
      return unsafe { self.rank.get_unchecked(idx) }.load(Ordering::Relaxed);
    }

    let cur = self.cursor.fetch_add(1, Ordering::Relaxed);

    // Trigger re-sort when cursor >= n
    // cursor >= n 时触发重排序
    if cur >= n
      && self
        .sorting
        .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
        .is_ok()
    {
      let mut idx: Vec<usize> = (0..n).collect();
      // SAFETY: i always < n = cores.len()
      idx.sort_unstable_by_key(|&i| unsafe { self.cores.get_unchecked(i) }.load(Ordering::Relaxed));

      for (i, id) in idx.into_iter().enumerate() {
        // SAFETY: i < n
        unsafe { self.rank.get_unchecked(i) }.store(id, Ordering::Relaxed);
      }

      self.cursor.store(0, Ordering::Relaxed);
      self.sorting.store(false, Ordering::Release);

      // SAFETY: n > 0, so rank[0] exists
      return unsafe { self.rank.get_unchecked(0) }.load(Ordering::Relaxed);
    }

    // SAFETY: cur % n < n
    unsafe { self.rank.get_unchecked(cur % n) }.load(Ordering::Relaxed)
  }

  /// Get the current global CPU load (0-100)
  /// 获取当前全局 CPU 负载 (0-100)
  #[inline]
  pub fn global(&self) -> u8 {
    self.global.load(Ordering::Relaxed)
  }

  /// Get the load of a specific core (0-100)
  /// 获取指定核心的负载 (0-100)
  #[inline]
  pub fn core(&self, idx: usize) -> Option<u8> {
    self.cores.get(idx).map(|v| v.load(Ordering::Relaxed))
  }

  /// Get the number of CPU cores
  /// 获取 CPU 核心数
  #[inline]
  pub fn len(&self) -> usize {
    self.cores.len()
  }

  /// Check if no cores (always false in practice)
  /// 检查是否无核心(实际上总是 false)
  #[inline]
  pub fn is_empty(&self) -> bool {
    self.cores.is_empty()
  }
}

impl<'a> IntoIterator for &'a CpuLoad {
  type Item = u8;
  type IntoIter = CpuLoadIter<'a>;

  #[inline]
  fn into_iter(self) -> Self::IntoIter {
    CpuLoadIter::new(&self.cores)
  }
}

impl Default for CpuLoad {
  #[inline]
  fn default() -> Self {
    // Unwrap Arc for Default trait
    // 为 Default trait 解包 Arc
    Arc::try_unwrap(Self::new()).unwrap_or_else(|arc| {
      // Clone inner if Arc has multiple refs (shouldn't happen in default)
      // 如果 Arc 有多个引用则克隆内部(默认情况下不应发生)
      Self {
        global: AtomicU8::new(arc.global.load(Ordering::Relaxed)),
        cores: arc
          .cores
          .iter()
          .map(|a| AtomicU8::new(a.load(Ordering::Relaxed)))
          .collect(),
        interval: arc.interval,
        rank: arc
          .rank
          .iter()
          .map(|a| AtomicUsize::new(a.load(Ordering::Relaxed)))
          .collect(),
        cursor: AtomicUsize::new(arc.cursor.load(Ordering::Relaxed)),
        sorting: AtomicBool::new(arc.sorting.load(Ordering::Relaxed)),
      }
    })
  }
}