pick_fast 0.1.1

High-performance weighted random load balancer for selecting low-latency nodes with atomic EMA weight updates. / 高性能加权随机负载均衡器,用于随机选择低延迟节点,支持基于原子操作的指数移动平均权重更新。
Documentation
#![cfg_attr(docsrs, feature(doc_cfg))]

use std::{
  marker::PhantomData,
  ops::Deref,
  sync::atomic::{AtomicU32, Ordering},
};

/// 策略 Trait: 定义耗时与权重的转换逻辑
pub trait Rank {
  /// 计算权重
  /// - 输入: `latency_us` (微秒)
  /// - 输出: `weight` (权重值)
  fn calc(latency_us: u32) -> u32;

  /// 初始权重 (默认为 1,实现慢启动)
  fn init() -> u32 {
    1
  }
}

/// 默认策略: 倒数模型 (Inverse)
/// 逻辑: Weight = BASE / Latency
/// 场景: 适用于 N <= 256 的通用负载均衡
pub struct Inverse;

impl Rank for Inverse {
  #[inline(always)]
  fn calc(latency: u32) -> u32 {
    // 调整为 2^22 (约 4,194,304)
    // 计算: 256 * 2^22 = 2^30 (约10亿) < u32::MAX (约42亿)
    // 即使有 256 个节点且延时均为 1us,也不会溢出。
    const BASE: u32 = 1 << 22;

    // 精度说明:
    // 1ms (1000us) -> 权重 4194
    // 50ms(50000us)-> 权重 83
    // 1s  (10^6us) -> 权重 4
    // 精度足够区分不同节点节点的性能差异。

    BASE / latency.max(1)
  }
}

/// 选中节点的句柄
/// 包含数据引用和索引,用完后可以直接作为 &T 使用
pub struct Handle<'a, T> {
  pub index: usize,
  pub data: &'a T,
}

impl<'a, T> Deref for Handle<'a, T> {
  type Target = T;
  fn deref(&self) -> &Self::Target {
    self.data
  }
}

// ========================================================================
// 3. 核心结构 (PickFast)
// ========================================================================

/// 极速加权负载均衡器
///
/// 泛型参数:
/// - `T`: 节点数据类型
/// - `N`: 节点数量 (编译期常量)
/// - `M`: 权重模型 (默认为 `Inverse`)
#[repr(align(64))] // 防止 Cache Line 伪共享
pub struct PickFast<T, const N: usize, M = Inverse> {
  /// 节点数据 (只读)
  pub li: [T; N],

  /// 节点权重 (原子)
  pub weight_li: [AtomicU32; N],

  /// 总权重 (原子缓存)
  total: AtomicU32,

  _marker: PhantomData<M>,
}

unsafe impl<T: Sync, const N: usize, M> Sync for PickFast<T, N, M> {}
unsafe impl<T: Send, const N: usize, M> Send for PickFast<T, N, M> {}

impl<T, const N: usize, M: Rank> PickFast<T, N, M> {
  /// 创建一个新的选择器
  pub fn new(li: [T; N]) -> Self {
    assert!(N > 0, "PickFast: N must be > 0");
    // 如果 N 非常大,这里给个运行时提醒 (虽然 Inverse 模型已适配到 256)
    // 实际上 2^22 支持 N=1024 也没问题,只是精度稍微降低
    if N > 256 {
      log::warn!("PickFast N={N} is large, ensure Rank won't overflow u32");
    }

    let init_val = M::init();

    // 1. 初始化权重数组
    let weight_li = [const { AtomicU32::new(1) }; N];
    for w in &weight_li {
      w.store(init_val, Ordering::Relaxed);
    }

    // 2. 初始化总权重
    let total = AtomicU32::new(init_val * (N as u32));

    Self {
      li,
      weight_li,
      total,
      _marker: PhantomData,
    }
  }

  /// 极速挑选 (Pick)
  ///
  /// O(1) 获取总权重 -> O(N) 扫描
  #[inline(always)]
  pub fn pick(&self) -> Handle<'_, T> {
    let total_w = self.total.load(Ordering::Relaxed);

    if total_w == 0 {
      return Handle {
        index: 0,
        data: &self.li[0],
      };
    }

    // 随机目标
    let target = fastrand::u32(0..total_w);
    let mut sum = 0;

    // 扫描
    for (i, w) in self.weight_li.iter().enumerate() {
      sum += w.load(Ordering::Relaxed);
      if sum > target {
        return Handle {
          index: i,
          data: &self.li[i],
        };
      }
    }

    // 兜底 (处理并发更新时的微小窗口)
    Handle {
      index: N - 1,
      data: &self.li[N - 1],
    }
  }

  /// 设定观测值 (Set)
  ///
  /// 传入观测值 (如耗时),内部自动计算权重并平滑更新
  #[inline(always)]
  pub fn set(&self, index: usize, val: u32) {
    if index >= N {
      return;
    }

    let target_w = M::calc(val);

    // CAS 更新单节点权重
    // EMA公式: New = (Old + Target) / 2
    let _ = self.weight_li[index]
      .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |old| {
        Some((old + target_w) >> 1)
      })
      .map(|prev| {
        // 修正总权重
        let new_w = (prev + target_w) >> 1;
        if new_w > prev {
          self.total.fetch_add(new_w - prev, Ordering::Relaxed);
        } else {
          self.total.fetch_sub(prev - new_w, Ordering::Relaxed);
        }
      });
  }
}