use super::{numa::NumaTopology, WorkerConfig};
use std::sync::{Arc, Mutex};
#[derive(Debug, Clone, Copy)]
pub enum AffinityStrategy {
None,
Pinned,
NumaSpread,
NumaCompact,
Custom,
}
#[derive(Debug, Clone)]
pub struct CoreAffinity {
pub core_ids: Vec<usize>,
pub allow_migration: bool,
pub numa_node: Option<usize>,
}
impl CoreAffinity {
pub fn cores(_coreids: Vec<usize>) -> Self {
Self {
core_ids: _coreids,
allow_migration: false,
numa_node: None,
}
}
pub fn numa_node(_nodeid: usize, topology: &NumaTopology) -> Self {
let core_ids = if _nodeid < topology.cpus_per_node.len() {
topology.cpus_per_node[_nodeid].clone()
} else {
vec![]
};
Self {
core_ids,
allow_migration: true,
numa_node: Some(_nodeid),
}
}
pub fn with_migration(mut self, allow: bool) -> Self {
self.allow_migration = allow;
self
}
}
pub struct AffinityManager {
strategy: AffinityStrategy,
topology: NumaTopology,
thread_assignments: Arc<Mutex<Vec<Option<CoreAffinity>>>>,
}
impl AffinityManager {
pub fn new(strategy: AffinityStrategy, topology: NumaTopology) -> Self {
let thread_assignments = Arc::new(Mutex::new(Vec::new()));
Self {
strategy,
topology,
thread_assignments,
}
}
pub fn generate_assignments(&self, numthreads: usize) -> Vec<CoreAffinity> {
match self.strategy {
AffinityStrategy::None => {
vec![]
}
AffinityStrategy::Pinned => self.generate_pinned_assignments(numthreads),
AffinityStrategy::NumaSpread => self.generate_numa_spread_assignments(numthreads),
AffinityStrategy::NumaCompact => self.generate_numa_compact_assignments(numthreads),
AffinityStrategy::Custom => {
self.thread_assignments
.lock()
.expect("Operation failed")
.iter()
.filter_map(|opt| opt.clone())
.collect()
}
}
}
fn generate_pinned_assignments(&self, numthreads: usize) -> Vec<CoreAffinity> {
let total_cores: usize = self
.topology
.cpus_per_node
.iter()
.map(|node| node.len())
.sum();
let effective_threads = std::cmp::min(numthreads, total_cores);
let mut all_cores: Vec<usize> = self
.topology
.cpus_per_node
.iter()
.flat_map(|node| node.iter().cloned())
.collect();
all_cores.sort_unstable();
(0..effective_threads)
.map(|i| CoreAffinity::cores(vec![all_cores[i]]))
.collect()
}
fn generate_numa_spread_assignments(&self, numthreads: usize) -> Vec<CoreAffinity> {
let mut assignments = Vec::new();
let threads_per_node = numthreads / self.topology.num_nodes;
let extra_threads = numthreads % self.topology.num_nodes;
for (node_id, cores) in self.topology.cpus_per_node.iter().enumerate() {
let node_threads = threads_per_node + if node_id < extra_threads { 1 } else { 0 };
for i in 0..node_threads {
if i < cores.len() {
assignments.push(CoreAffinity::cores(vec![cores[i]]).with_migration(false));
} else {
assignments.push(
CoreAffinity::numa_node(node_id, &self.topology).with_migration(true),
);
}
}
}
assignments
}
fn generate_numa_compact_assignments(&self, numthreads: usize) -> Vec<CoreAffinity> {
let mut assignments = Vec::new();
let mut remaining_threads = numthreads;
for (node_id, cores) in self.topology.cpus_per_node.iter().enumerate() {
if remaining_threads == 0 {
break;
}
let node_capacity = cores.len();
let threads_for_node = std::cmp::min(remaining_threads, node_capacity);
for core in cores.iter().take(threads_for_node) {
assignments.push(CoreAffinity::cores(vec![*core]).with_migration(false));
}
if remaining_threads > node_capacity {
for _ in node_capacity..remaining_threads.min(node_capacity * 2) {
assignments.push(
CoreAffinity::numa_node(node_id, &self.topology).with_migration(true),
);
}
}
remaining_threads = remaining_threads.saturating_sub(node_capacity * 2);
}
assignments
}
pub fn set_thread_affinity(&self, threadid: usize, affinity: CoreAffinity) {
let mut assignments = self.thread_assignments.lock().expect("Operation failed");
while assignments.len() <= threadid {
assignments.push(None);
}
assignments[threadid] = Some(affinity);
}
pub fn get_thread_affinity(&self, threadid: usize) -> Option<CoreAffinity> {
let assignments = self.thread_assignments.lock().expect("Operation failed");
assignments.get(threadid).and_then(|opt| opt.clone())
}
pub fn apply_current_thread_affinity(&self, _affinity: &CoreAffinity) -> Result<(), String> {
#[cfg(target_os = "linux")]
{
self.apply_linux_affinity(_affinity)
}
#[cfg(target_os = "windows")]
{
self.apply_windows_affinity(_affinity)
}
#[cfg(not(any(target_os = "linux", target_os = "windows")))]
{
eprintln!("CPU affinity not supported on this platform");
Ok(())
}
}
#[cfg(target_os = "linux")]
fn apply_linux_affinity(&self, affinity: &CoreAffinity) -> Result<(), String> {
if !affinity.core_ids.is_empty() {
let core_list = affinity
.core_ids
.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>()
.join(",");
std::env::set_var("GOMP_CPU_AFFINITY", &core_list);
std::env::set_var("KMP_AFFINITY", format!("explicit,proclist=[{core_list}]"));
}
if let Some(numa_node) = affinity.numa_node {
std::env::set_var("NUMA_NODE_HINT", numa_node.to_string());
}
Ok(())
}
#[cfg(target_os = "windows")]
fn apply_windows_affinity(&self, affinity: &CoreAffinity) -> Result<(), String> {
if !affinity.core_ids.is_empty() {
let core_mask: u64 = affinity
.core_ids
.iter()
.fold(0u64, |mask, &core_id| mask | (1u64 << core_id));
std::env::set_var("THREAD_AFFINITY_MASK", format!("0x{:x}", core_mask));
}
Ok(())
}
pub fn recommend_strategy(
&self,
num_threads: usize,
workload_type: WorkloadType,
) -> AffinityStrategy {
match workload_type {
WorkloadType::CpuBound => {
if num_threads <= self.total_cores() {
AffinityStrategy::Pinned
} else {
AffinityStrategy::NumaSpread
}
}
WorkloadType::MemoryBound => {
if self.topology.num_nodes > 1 {
AffinityStrategy::NumaCompact
} else {
AffinityStrategy::Pinned
}
}
WorkloadType::Balanced => {
if self.topology.num_nodes > 1 && num_threads >= self.topology.num_nodes {
AffinityStrategy::NumaSpread
} else {
AffinityStrategy::Pinned
}
}
WorkloadType::Latency => AffinityStrategy::Pinned,
}
}
fn total_cores(&self) -> usize {
self.topology
.cpus_per_node
.iter()
.map(|node| node.len())
.sum()
}
}
#[derive(Debug, Clone, Copy)]
pub enum WorkloadType {
CpuBound,
MemoryBound,
Balanced,
Latency,
}
pub struct AffinityThreadPool {
affinity_manager: AffinityManager,
config: WorkerConfig,
}
impl AffinityThreadPool {
pub fn new(strategy: AffinityStrategy, topology: NumaTopology, config: WorkerConfig) -> Self {
let affinity_manager = AffinityManager::new(strategy, topology);
Self {
affinity_manager,
config,
}
}
pub fn execute_with_affinity<F, R>(&self, work: F) -> R
where
F: FnOnce() -> R + Send,
R: Send,
{
let num_threads = self.config.workers.unwrap_or(
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4),
);
let assignments = self.affinity_manager.generate_assignments(num_threads);
if let Some(affinity) = assignments.first() {
if let Err(e) = self
.affinity_manager
.apply_current_thread_affinity(affinity)
{
eprintln!("Warning: Failed to set thread affinity: {e}");
}
}
work()
}
pub fn get_affinity_info(&self) -> AffinityInfo {
let num_threads = self.config.workers.unwrap_or(
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4),
);
let assignments = self.affinity_manager.generate_assignments(num_threads);
AffinityInfo {
strategy: self.affinity_manager.strategy,
num_threads,
assignments,
topology: self.affinity_manager.topology.clone(),
}
}
}
#[derive(Debug, Clone)]
pub struct AffinityInfo {
pub strategy: AffinityStrategy,
pub num_threads: usize,
pub assignments: Vec<CoreAffinity>,
pub topology: NumaTopology,
}
impl AffinityInfo {
pub fn print_summary(&self) {
println!("=== Thread Affinity Summary ===");
println!("Strategy: {:?}", self.strategy);
println!("Number of threads: {}", self.num_threads);
println!("NUMA nodes: {}", self.topology.num_nodes);
for (node_id, cores) in self.topology.cpus_per_node.iter().enumerate() {
println!(" Node {node_id}: CPUs {cores:?}");
}
println!("Thread assignments:");
for (thread_id, affinity) in self.assignments.iter().enumerate() {
println!(
" Thread {}: cores {:?}, migration: {}, NUMA: {:?}",
thread_id, affinity.core_ids, affinity.allow_migration, affinity.numa_node
);
}
println!("==============================");
}
pub fn efficiency_metrics(&self) -> AffinityEfficiencyMetrics {
let cores_used: std::collections::HashSet<usize> = self
.assignments
.iter()
.flat_map(|affinity| affinity.core_ids.iter().cloned())
.collect();
let total_cores: usize = self
.topology
.cpus_per_node
.iter()
.map(|node| node.len())
.sum();
let numa_nodes_used: std::collections::HashSet<usize> = self
.assignments
.iter()
.filter_map(|affinity| affinity.numa_node)
.collect();
let threads_with_migration: usize = self
.assignments
.iter()
.filter(|affinity| affinity.allow_migration)
.count();
AffinityEfficiencyMetrics {
core_utilization: cores_used.len() as f64 / total_cores as f64,
numa_spread: numa_nodes_used.len() as f64 / self.topology.num_nodes as f64,
migration_ratio: threads_with_migration as f64 / self.num_threads as f64,
threads_per_core: self.num_threads as f64 / cores_used.len() as f64,
}
}
}
#[derive(Debug, Clone)]
pub struct AffinityEfficiencyMetrics {
pub core_utilization: f64,
pub numa_spread: f64,
pub migration_ratio: f64,
pub threads_per_core: f64,
}
pub mod utils {
use super::*;
pub fn auto_detect_strategy(
workload_type: WorkloadType,
num_threads: usize,
topology: &NumaTopology,
) -> AffinityStrategy {
let manager = AffinityManager::new(AffinityStrategy::None, topology.clone());
manager.recommend_strategy(num_threads, workload_type)
}
pub fn creatematrix_thread_pool(
matrixsize: (usize, usize),
topology: NumaTopology,
) -> AffinityThreadPool {
let workload_type = if matrixsize.0 * matrixsize.1 > 1_000_000 {
WorkloadType::MemoryBound
} else {
WorkloadType::CpuBound
};
let num_threads = std::cmp::min(
topology.cpus_per_node.iter().map(|node| node.len()).sum(),
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4),
);
let strategy = auto_detect_strategy(workload_type, num_threads, &topology);
let config = WorkerConfig::new().with_workers(num_threads);
AffinityThreadPool::new(strategy, topology, config)
}
pub fn benchmark_affinity_strategies<F>(
workload: F,
topology: NumaTopology,
config: WorkerConfig,
) -> Vec<(AffinityStrategy, f64)>
where
F: Fn() -> f64 + Clone + Send + Sync,
{
let strategies = vec![
AffinityStrategy::None,
AffinityStrategy::Pinned,
AffinityStrategy::NumaSpread,
AffinityStrategy::NumaCompact,
];
let mut results = Vec::new();
for strategy in strategies {
let pool = AffinityThreadPool::new(strategy, topology.clone(), config.clone());
for _ in 0..3 {
pool.execute_with_affinity(&workload);
}
let start = std::time::Instant::now();
let iterations = 10;
let mut total_work = 0.0;
for _ in 0..iterations {
total_work += pool.execute_with_affinity(&workload);
}
let elapsed = start.elapsed().as_secs_f64();
let throughput = total_work / elapsed;
results.push((strategy, throughput));
}
results
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_core_affinity_creation() {
let affinity = CoreAffinity::cores(vec![0, 1, 2]);
assert_eq!(affinity.core_ids, vec![0, 1, 2]);
assert!(!affinity.allow_migration);
assert_eq!(affinity.numa_node, None);
}
#[test]
fn test_numa_affinity_creation() {
let topology = NumaTopology {
num_nodes: 2,
cpus_per_node: vec![vec![0, 1, 2, 3], vec![4, 5, 6, 7]],
memory_bandwidth: vec![vec![100.0, 50.0], vec![50.0, 100.0]],
};
let affinity = CoreAffinity::numa_node(1, &topology);
assert_eq!(affinity.core_ids, vec![4, 5, 6, 7]);
assert!(affinity.allow_migration);
assert_eq!(affinity.numa_node, Some(1));
}
#[test]
fn test_affinity_strategy_recommendation() {
let topology = NumaTopology {
num_nodes: 2,
cpus_per_node: vec![vec![0, 1, 2, 3], vec![4, 5, 6, 7]],
memory_bandwidth: vec![vec![100.0, 50.0], vec![50.0, 100.0]],
};
let manager = AffinityManager::new(AffinityStrategy::None, topology);
assert!(matches!(
manager.recommend_strategy(4, WorkloadType::CpuBound),
AffinityStrategy::Pinned
));
assert!(matches!(
manager.recommend_strategy(4, WorkloadType::MemoryBound),
AffinityStrategy::NumaCompact
));
}
#[test]
fn test_pinned_assignments() {
let topology = NumaTopology {
num_nodes: 2,
cpus_per_node: vec![vec![0, 1, 2, 3], vec![4, 5, 6, 7]],
memory_bandwidth: vec![vec![100.0, 50.0], vec![50.0, 100.0]],
};
let manager = AffinityManager::new(AffinityStrategy::Pinned, topology);
let assignments = manager.generate_assignments(4);
assert_eq!(assignments.len(), 4);
for (i, assignment) in assignments.iter().enumerate() {
assert_eq!(assignment.core_ids.len(), 1);
assert_eq!(assignment.core_ids[0], i);
assert!(!assignment.allow_migration);
}
}
#[test]
fn test_numa_spread_assignments() {
let topology = NumaTopology {
num_nodes: 2,
cpus_per_node: vec![vec![0, 1, 2, 3], vec![4, 5, 6, 7]],
memory_bandwidth: vec![vec![100.0, 50.0], vec![50.0, 100.0]],
};
let manager = AffinityManager::new(AffinityStrategy::NumaSpread, topology);
let assignments = manager.generate_assignments(4);
assert_eq!(assignments.len(), 4);
let node0_threads = assignments
.iter()
.filter(|a| {
a.core_ids.contains(&0)
|| a.core_ids.contains(&1)
|| a.core_ids.contains(&2)
|| a.core_ids.contains(&3)
})
.count();
let node1_threads = assignments
.iter()
.filter(|a| {
a.core_ids.contains(&4)
|| a.core_ids.contains(&5)
|| a.core_ids.contains(&6)
|| a.core_ids.contains(&7)
})
.count();
assert_eq!(node0_threads, 2);
assert_eq!(node1_threads, 2);
}
}