#![allow(dead_code)]
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use tokio::runtime::{Builder, Runtime};
#[derive(Debug, Clone)]
pub struct CpuTopology {
pub total_cpus: usize,
pub physical_cores: usize,
pub logical_cores: usize,
pub numa_nodes: usize,
pub cpu_info: Vec<CpuInfo>,
pub current_cpu: Option<usize>,
}
#[derive(Debug, Clone)]
pub struct CpuInfo {
pub id: usize,
pub numa_node: usize,
pub physical_core: usize,
pub is_hyperthread: bool,
pub cache_level1: usize,
pub cache_level2: usize,
pub cache_level3: usize,
pub frequency_mhz: usize,
}
impl CpuTopology {
pub fn detect() -> Self {
let total_cpus = num_cpus::get();
let physical_cores = num_cpus::get_physical();
let logical_cores = total_cpus;
let numa_nodes = (total_cpus / 4).max(1);
let mut cpu_info = Vec::new();
for cpu_id in 0..total_cpus {
let numa_node = cpu_id / (total_cpus / numa_nodes);
let physical_core = cpu_id / (total_cpus / physical_cores);
let is_hyperthread = total_cpus > physical_cores && (cpu_id % 2 == 1);
cpu_info.push(CpuInfo {
id: cpu_id,
numa_node,
physical_core,
is_hyperthread,
cache_level1: 32 * 1024, cache_level2: 256 * 1024, cache_level3: 8 * 1024 * 1024, frequency_mhz: 3000, });
}
Self {
total_cpus,
physical_cores,
logical_cores,
numa_nodes,
cpu_info,
current_cpu: Self::get_current_cpu(),
}
}
fn get_current_cpu() -> Option<usize> {
Some(0)
}
pub fn get_numa_cpus(&self, numa_node: usize) -> Vec<usize> {
self.cpu_info
.iter()
.filter(|cpu| cpu.numa_node == numa_node)
.map(|cpu| cpu.id)
.collect()
}
pub fn get_physical_cores(&self, numa_node: Option<usize>) -> Vec<usize> {
self.cpu_info
.iter()
.filter(|cpu| {
if let Some(node) = numa_node {
cpu.numa_node == node && !cpu.is_hyperthread
} else {
!cpu.is_hyperthread
}
})
.map(|cpu| cpu.id)
.collect()
}
pub fn get_isolated_cpus(&self) -> Vec<usize> {
self.cpu_info
.iter()
.filter(|cpu| cpu.id > 0 && !cpu.is_hyperthread)
.map(|cpu| cpu.id)
.collect()
}
}
pub struct ThreadAffinityManager {
topology: CpuTopology,
thread_assignments: Arc<RwLock<HashMap<thread::ThreadId, CpuAssignment>>>,
cpu_utilization: Arc<RwLock<HashMap<usize, CpuUtilization>>>,
assignment_strategy: AffinityStrategy,
}
#[derive(Debug, Clone)]
pub struct CpuAssignment {
pub thread_id: thread::ThreadId,
pub thread_name: String,
pub assigned_cpu: usize,
pub numa_node: usize,
pub assignment_time: std::time::Instant,
pub thread_type: ThreadType,
}
#[derive(Debug, Clone)]
pub enum ThreadType {
NetworkIO, MessageProcessor, StorageIO, Consumer, Metrics, Background, }
#[derive(Debug, Clone)]
pub struct CpuUtilization {
pub cpu_id: usize,
pub assigned_threads: usize,
pub utilization_percent: f64,
pub last_updated: std::time::Instant,
pub thread_types: Vec<ThreadType>,
}
#[derive(Debug, Clone)]
pub enum AffinityStrategy {
RoundRobin, NumaAware, WorkloadOptimized, IsolatedCores, HyperThreadAware, }
impl ThreadAffinityManager {
pub fn new(strategy: AffinityStrategy) -> Self {
let topology = CpuTopology::detect();
let mut cpu_utilization = HashMap::new();
for cpu in &topology.cpu_info {
cpu_utilization.insert(
cpu.id,
CpuUtilization {
cpu_id: cpu.id,
assigned_threads: 0,
utilization_percent: 0.0,
last_updated: std::time::Instant::now(),
thread_types: Vec::new(),
},
);
}
Self {
topology,
thread_assignments: Arc::new(RwLock::new(HashMap::new())),
cpu_utilization: Arc::new(RwLock::new(cpu_utilization)),
assignment_strategy: strategy,
}
}
pub fn assign_thread_affinity(
&self,
thread_name: &str,
thread_type: ThreadType,
) -> Result<usize, String> {
let current_thread_id = thread::current().id();
let assigned_cpu = self.select_optimal_cpu(&thread_type)?;
self.set_thread_cpu_affinity(assigned_cpu)?;
let assignment = CpuAssignment {
thread_id: current_thread_id,
thread_name: thread_name.to_string(),
assigned_cpu,
numa_node: self.topology.cpu_info[assigned_cpu].numa_node,
assignment_time: std::time::Instant::now(),
thread_type: thread_type.clone(),
};
self.thread_assignments
.write()
.insert(current_thread_id, assignment);
{
let mut utilization = self.cpu_utilization.write();
if let Some(cpu_util) = utilization.get_mut(&assigned_cpu) {
cpu_util.assigned_threads += 1;
cpu_util.thread_types.push(thread_type);
cpu_util.last_updated = std::time::Instant::now();
}
}
Ok(assigned_cpu)
}
fn select_optimal_cpu(&self, thread_type: &ThreadType) -> Result<usize, String> {
match self.assignment_strategy {
AffinityStrategy::RoundRobin => self.select_round_robin(),
AffinityStrategy::NumaAware => self.select_numa_aware(thread_type),
AffinityStrategy::WorkloadOptimized => self.select_workload_optimized(thread_type),
AffinityStrategy::IsolatedCores => self.select_isolated_core(thread_type),
AffinityStrategy::HyperThreadAware => self.select_hyperthread_aware(thread_type),
}
}
fn select_round_robin(&self) -> Result<usize, String> {
static NEXT_CPU: AtomicUsize = AtomicUsize::new(0);
let cpu_id = NEXT_CPU.fetch_add(1, Ordering::Relaxed) % self.topology.total_cpus;
Ok(cpu_id)
}
fn select_numa_aware(&self, _thread_type: &ThreadType) -> Result<usize, String> {
let current_numa = 0; let numa_cpus = self.topology.get_numa_cpus(current_numa);
if let Some(&cpu) = numa_cpus.first() {
Ok(cpu)
} else {
self.select_round_robin()
}
}
fn select_workload_optimized(&self, thread_type: &ThreadType) -> Result<usize, String> {
let utilization = self.cpu_utilization.read();
let preferred_cpus = match thread_type {
ThreadType::NetworkIO => {
self.topology.get_isolated_cpus()
}
ThreadType::MessageProcessor => {
self.topology.get_physical_cores(None)
}
ThreadType::StorageIO => {
(0..self.topology.total_cpus).collect()
}
ThreadType::Consumer | ThreadType::Metrics | ThreadType::Background => {
(0..self.topology.total_cpus).collect()
}
};
let best_cpu = preferred_cpus
.iter()
.min_by_key(|&&cpu| {
utilization
.get(&cpu)
.map(|u| u.assigned_threads)
.unwrap_or(0)
})
.copied();
best_cpu.ok_or_else(|| "No available CPU found".to_string())
}
fn select_isolated_core(&self, _thread_type: &ThreadType) -> Result<usize, String> {
let isolated_cpus = self.topology.get_isolated_cpus();
let utilization = self.cpu_utilization.read();
let best_cpu = isolated_cpus
.iter()
.min_by_key(|&&cpu| {
utilization
.get(&cpu)
.map(|u| u.assigned_threads)
.unwrap_or(0)
})
.copied();
best_cpu.ok_or_else(|| "No isolated CPU available".to_string())
}
fn select_hyperthread_aware(&self, thread_type: &ThreadType) -> Result<usize, String> {
let utilization = self.cpu_utilization.read();
let prefer_physical = matches!(
thread_type,
ThreadType::MessageProcessor | ThreadType::NetworkIO
);
let candidate_cpus: Vec<usize> = if prefer_physical {
self.topology.get_physical_cores(None)
} else {
(0..self.topology.total_cpus).collect()
};
let best_cpu = candidate_cpus
.iter()
.min_by_key(|&&cpu| {
utilization
.get(&cpu)
.map(|u| u.assigned_threads)
.unwrap_or(0)
})
.copied();
best_cpu.ok_or_else(|| "No suitable CPU found".to_string())
}
fn set_thread_cpu_affinity(&self, cpu_id: usize) -> Result<(), String> {
#[cfg(target_os = "linux")]
{
}
#[cfg(target_os = "macos")]
{
}
#[cfg(target_os = "windows")]
{
}
println!("Thread affinity set to CPU {}", cpu_id);
Ok(())
}
pub fn get_thread_assignment(&self, thread_id: thread::ThreadId) -> Option<CpuAssignment> {
self.thread_assignments.read().get(&thread_id).cloned()
}
pub fn get_cpu_utilization(&self) -> HashMap<usize, CpuUtilization> {
self.cpu_utilization.read().clone()
}
pub fn get_stats(&self) -> AffinityStats {
let assignments = self.thread_assignments.read();
let utilization = self.cpu_utilization.read();
let total_threads = assignments.len();
let active_cpus = utilization
.values()
.filter(|u| u.assigned_threads > 0)
.count();
let mut numa_distribution = HashMap::new();
for assignment in assignments.values() {
*numa_distribution.entry(assignment.numa_node).or_insert(0) += 1;
}
let mut type_distribution = HashMap::new();
for assignment in assignments.values() {
*type_distribution
.entry(format!("{:?}", assignment.thread_type))
.or_insert(0) += 1;
}
AffinityStats {
topology: self.topology.clone(),
strategy: self.assignment_strategy.clone(),
total_threads,
active_cpus,
cpu_utilization: utilization.clone(),
numa_distribution,
type_distribution,
assignments: assignments.clone(),
}
}
}
#[derive(Debug, Clone)]
pub struct AffinityStats {
pub topology: CpuTopology,
pub strategy: AffinityStrategy,
pub total_threads: usize,
pub active_cpus: usize,
pub cpu_utilization: HashMap<usize, CpuUtilization>,
pub numa_distribution: HashMap<usize, usize>,
pub type_distribution: HashMap<String, usize>,
pub assignments: HashMap<thread::ThreadId, CpuAssignment>,
}
impl AffinityStats {
pub fn report(&self) -> String {
let mut report = String::new();
report.push_str("Thread Affinity Statistics:\n\n");
report.push_str(&format!(
"CPU Topology: {} total CPUs, {} physical cores, {} NUMA nodes\n",
self.topology.total_cpus, self.topology.physical_cores, self.topology.numa_nodes
));
report.push_str(&format!(
"Strategy: {:?}, {} threads on {} active CPUs\n",
self.strategy, self.total_threads, self.active_cpus
));
report.push_str("\nCPU Utilization:\n");
for (cpu_id, util) in &self.cpu_utilization {
if util.assigned_threads > 0 {
let cpu_info = &self.topology.cpu_info[*cpu_id];
report.push_str(&format!(
" CPU {}: {} threads, NUMA node {}, {}\n",
cpu_id,
util.assigned_threads,
cpu_info.numa_node,
if cpu_info.is_hyperthread {
"HT"
} else {
"Physical"
}
));
}
}
report.push_str("\nNUMA Distribution:\n");
for (numa_node, count) in &self.numa_distribution {
report.push_str(&format!(" Node {}: {} threads\n", numa_node, count));
}
report.push_str("\nThread Type Distribution:\n");
for (thread_type, count) in &self.type_distribution {
report.push_str(&format!(" {}: {} threads\n", thread_type, count));
}
report
}
pub fn cpu_efficiency(&self) -> f64 {
if self.topology.total_cpus == 0 {
0.0
} else {
self.active_cpus as f64 / self.topology.total_cpus as f64
}
}
pub fn numa_balance(&self) -> f64 {
if self.numa_distribution.is_empty() || self.total_threads == 0 {
return 1.0;
}
let expected_per_node = self.total_threads as f64 / self.topology.numa_nodes as f64;
let variance: f64 = self
.numa_distribution
.values()
.map(|&count| {
let diff = count as f64 - expected_per_node;
diff * diff
})
.sum::<f64>()
/ self.numa_distribution.len() as f64;
1.0 / (1.0 + variance.sqrt())
}
}
pub struct AffinityThreadPool {
affinity_manager: Arc<ThreadAffinityManager>,
workers: Vec<AffinityWorker>,
task_queue: crossbeam_channel::Sender<Box<dyn FnOnce() + Send>>,
}
struct AffinityWorker {
thread_handle: JoinHandle<()>,
assigned_cpu: usize,
thread_type: ThreadType,
}
impl AffinityThreadPool {
pub fn new(
pool_size: usize,
strategy: AffinityStrategy,
default_thread_type: ThreadType,
) -> Self {
let affinity_manager = Arc::new(ThreadAffinityManager::new(strategy));
let (task_sender, task_receiver): (
_,
crossbeam_channel::Receiver<Box<dyn FnOnce() + Send>>,
) = crossbeam_channel::unbounded();
let mut workers = Vec::with_capacity(pool_size);
for i in 0..pool_size {
let affinity_manager_clone = affinity_manager.clone();
let task_receiver_clone = task_receiver.clone();
let thread_type_clone = default_thread_type.clone();
let thread_handle = thread::spawn(move || {
let thread_name = format!("affinity-worker-{}", i);
let assigned_cpu = affinity_manager_clone
.assign_thread_affinity(&thread_name, thread_type_clone.clone())
.expect("Failed to set thread affinity");
println!("Worker thread {} assigned to CPU {}", i, assigned_cpu);
while let Ok(task) = task_receiver_clone.recv() {
task();
}
});
workers.push(AffinityWorker {
thread_handle,
assigned_cpu: 0, thread_type: default_thread_type.clone(),
});
}
Self {
affinity_manager,
workers,
task_queue: task_sender,
}
}
pub fn execute<F>(&self, task: F) -> Result<(), String>
where
F: FnOnce() + Send + 'static,
{
self.task_queue
.send(Box::new(task))
.map_err(|_| "Failed to send task to worker".to_string())
}
pub fn get_affinity_stats(&self) -> AffinityStats {
self.affinity_manager.get_stats()
}
}
pub struct AffinityTokioRuntime {
runtime: Runtime,
affinity_manager: Arc<ThreadAffinityManager>,
}
impl AffinityTokioRuntime {
pub fn new(
num_threads: usize,
strategy: AffinityStrategy,
) -> Result<Self, Box<dyn std::error::Error>> {
let affinity_manager = Arc::new(ThreadAffinityManager::new(strategy));
let affinity_clone = affinity_manager.clone();
let runtime = Builder::new_multi_thread()
.worker_threads(num_threads)
.thread_name_fn(|| {
static COUNTER: AtomicUsize = AtomicUsize::new(0);
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
format!("tokio-worker-{}", id)
})
.on_thread_start(move || {
let thread_name = thread::current()
.name()
.unwrap_or("tokio-worker")
.to_string();
let _ = affinity_clone.assign_thread_affinity(&thread_name, ThreadType::NetworkIO);
})
.enable_all()
.build()?;
Ok(Self {
runtime,
affinity_manager,
})
}
pub fn runtime(&self) -> &Runtime {
&self.runtime
}
pub fn get_affinity_stats(&self) -> AffinityStats {
self.affinity_manager.get_stats()
}
}
pub struct CpuIsolation {
isolated_cpus: Vec<usize>,
isolation_active: bool,
}
impl CpuIsolation {
pub fn new() -> Self {
let topology = CpuTopology::detect();
let isolated_cpus = topology.get_isolated_cpus();
Self {
isolated_cpus,
isolation_active: false,
}
}
pub fn enable_isolation(&mut self) -> Result<(), String> {
println!("CPU isolation enabled for CPUs: {:?}", self.isolated_cpus);
self.isolation_active = true;
Ok(())
}
pub fn disable_isolation(&mut self) -> Result<(), String> {
println!("CPU isolation disabled");
self.isolation_active = false;
Ok(())
}
pub fn get_isolated_cpus(&self) -> &[usize] {
&self.isolated_cpus
}
pub fn is_isolation_active(&self) -> bool {
self.isolation_active
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cpu_topology_detection() {
let topology = CpuTopology::detect();
assert!(topology.total_cpus > 0);
assert!(topology.physical_cores > 0);
assert!(!topology.cpu_info.is_empty());
println!("CPU topology: {:?}", topology);
}
#[test]
fn test_thread_affinity_manager() {
let manager = ThreadAffinityManager::new(AffinityStrategy::RoundRobin);
let cpu = manager.assign_thread_affinity("test-thread", ThreadType::MessageProcessor);
assert!(cpu.is_ok());
let stats = manager.get_stats();
assert!(stats.total_threads > 0);
println!("Affinity stats: {}", stats.report());
}
#[test]
fn test_cpu_isolation() {
let mut isolation = CpuIsolation::new();
assert!(!isolation.is_isolation_active());
let result = isolation.enable_isolation();
assert!(result.is_ok());
assert!(isolation.is_isolation_active());
let result = isolation.disable_isolation();
assert!(result.is_ok());
assert!(!isolation.is_isolation_active());
}
}