use rayon::ThreadPool;
use rayon::ThreadPoolBuildError;
use std::sync::Arc;
use tracing::{info, debug, warn, error};
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub struct CpuAffinityConfig {
pub enable_affinity: bool,
pub enable_numa_awareness: bool,
pub core_ids: Option<Vec<usize>>,
pub numa_nodes: Option<Vec<usize>>,
}
impl Default for CpuAffinityConfig {
fn default() -> Self {
Self {
enable_affinity: true,
enable_numa_awareness: false, core_ids: None,
numa_nodes: None,
}
}
}
#[derive(Debug, Clone)]
pub struct NumaNode {
pub node_id: usize,
pub cpu_cores: Vec<usize>,
pub memory_size_mb: u64,
pub distance: Vec<u64>, }
pub struct OptimizedThreadPool {
pool: Arc<ThreadPool>,
config: CpuAffinityConfig,
numa_topology: Option<Vec<NumaNode>>,
thread_assignments: Arc<std::sync::RwLock<HashMap<usize, ThreadInfo>>>,
}
#[derive(Debug, Clone)]
pub struct ThreadInfo {
pub thread_id: usize,
pub cpu_core: Option<usize>,
pub numa_node: Option<usize>,
pub created_at: std::time::Instant,
}
impl OptimizedThreadPool {
pub fn new(num_threads: Option<usize>, config: CpuAffinityConfig) -> Result<Self, ThreadPoolBuildError> {
let num_threads = num_threads.unwrap_or_else(|| {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or_else(|_| rayon::current_num_threads())
});
info!("Creating optimized thread pool with {} threads", num_threads);
let numa_topology = if config.enable_numa_awareness {
Self::detect_numa_topology()
} else {
None
};
let thread_assignments = Arc::new(std::sync::RwLock::new(HashMap::new()));
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(|i| format!("sol-worker-{}", i))
.spawn_handler(|thread| {
let thread_id = thread.index();
if config.enable_affinity {
if let Some(core_id) = Self::assign_cpu_core(thread_id, &config, &numa_topology) {
if let Err(e) = Self::set_thread_affinity(core_id) {
error!("Failed to set CPU affinity for thread {}: {}", thread_id, e);
} else {
info!("Thread {} assigned to CPU core {}", thread_id, core_id);
}
}
}
let thread_info = ThreadInfo {
thread_id,
cpu_core: Self::get_assigned_core(thread_id, &config, &numa_topology),
numa_node: Self::get_numa_node(thread_id, &numa_topology),
created_at: std::time::Instant::now(),
};
debug!("Created thread: {:?}", thread_info);
thread.run();
Ok(())
})
.build()?;
Ok(Self {
pool: Arc::new(pool),
config,
numa_topology,
thread_assignments,
})
}
pub fn pool(&self) -> Arc<ThreadPool> {
Arc::clone(&self.pool)
}
pub fn config(&self) -> &CpuAffinityConfig {
&self.config
}
pub fn get_numa_topology(&self) -> Option<&Vec<NumaNode>> {
self.numa_topology.as_ref()
}
pub fn get_thread_assignments(&self) -> std::sync::RwLockReadGuard<'_, HashMap<usize, ThreadInfo>> {
self.thread_assignments.read().unwrap()
}
fn detect_numa_topology() -> Option<Vec<NumaNode>> {
#[cfg(target_os = "linux")]
{
let mut nodes = Vec::new();
if let Ok(entries) = std::fs::read_dir("/sys/devices/system/node") {
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() && path.file_name().unwrap_or_default().to_str().unwrap_or("").starts_with("node") {
if let Some(node_str) = path.file_name().and_then(|n| n.to_str()) {
if let Ok(node_id) = node_str.strip_prefix("node").unwrap_or("0").parse::<usize>() {
let cpu_cores = Self::get_numa_node_cores(node_id);
let memory_size = Self::get_numa_node_memory(node_id);
nodes.push(NumaNode {
node_id,
cpu_cores,
memory_size_mb: memory_size,
distance: Self::get_numa_distances(node_id),
});
}
}
}
}
}
if !nodes.is_empty() {
info!("Detected {} NUMA nodes", nodes.len());
Some(nodes)
} else {
debug!("No NUMA nodes detected, falling back to UMA");
None
}
}
#[cfg(not(target_os = "linux"))]
{
debug!("NUMA detection not supported on this platform");
None
}
}
#[cfg(target_os = "linux")]
fn get_numa_node_cores(node_id: usize) -> Vec<usize> {
let mut cores = Vec::new();
let node_path = format!("/sys/devices/system/node/node{}/cpulist", node_id);
if let Ok(content) = std::fs::read_to_string(node_path) {
for part in content.trim().split(',') {
if part.contains('-') {
let mut range = part.split('-');
if let (Some(start), Some(end)) = (range.next(), range.next()) {
if let (Ok(start), Ok(end)) = (start.parse::<usize>(), end.parse::<usize>()) {
cores.extend(start..=end);
}
}
} else if let Ok(core) = part.parse::<usize>() {
cores.push(core);
}
}
}
cores
}
#[cfg(target_os = "linux")]
fn get_numa_node_memory(node_id: usize) -> u64 {
let node_path = format!("/sys/devices/system/node/node{}/meminfo", node_id);
if let Ok(content) = std::fs::read_to_string(node_path) {
for line in content.lines() {
if line.contains("MemTotal:") {
if let Some(kb_str) = line.split_whitespace().nth(3) {
if let Ok(kb) = kb_str.parse::<u64>() {
return kb / 1024; }
}
}
}
}
0 }
#[cfg(target_os = "linux")]
fn get_numa_distances(_node_id: usize) -> Vec<u64> {
vec![10, 20, 30, 40] }
fn assign_cpu_core(thread_id: usize, config: &CpuAffinityConfig, numa_topology: &Option<Vec<NumaNode>>) -> Option<usize> {
if let Some(ref core_ids) = config.core_ids {
return core_ids.get(thread_id % core_ids.len()).copied();
}
if let Some(ref numa_nodes) = numa_topology {
let node_id = thread_id % numa_nodes.len();
let node = &numa_nodes[node_id];
if !node.cpu_cores.is_empty() {
let core_index = thread_id % node.cpu_cores.len();
return Some(node.cpu_cores[core_index]);
}
}
Some(thread_id)
}
fn get_assigned_core(thread_id: usize, config: &CpuAffinityConfig, numa_topology: &Option<Vec<NumaNode>>) -> Option<usize> {
Self::assign_cpu_core(thread_id, config, numa_topology)
}
fn get_numa_node(thread_id: usize, numa_topology: &Option<Vec<NumaNode>>) -> Option<usize> {
if let Some(ref nodes) = numa_topology {
Some(thread_id % nodes.len())
} else {
None
}
}
fn set_thread_affinity(_core_id: usize) -> Result<(), String> {
#[cfg(target_os = "linux")]
{
use libc::{cpu_set_t, sched_setaffinity, sched_getaffinity};
use std::mem;
unsafe {
let mut cpuset: cpu_set_t = mem::zeroed();
libc::CPU_ZERO(&mut cpuset);
libc::CPU_SET(_core_id, &mut cpuset);
let result = sched_setaffinity(0, mem::size_of::<cpu_set_t>(), &cpuset);
if result == 0 {
Ok(())
} else {
Err(format!("Failed to set CPU affinity: {}", std::io::Error::last_os_error()))
}
}
}
#[cfg(not(target_os = "linux"))]
{
warn!("CPU affinity not supported on this platform");
Ok(())
}
}
pub fn get_thread_affinity() -> Result<Vec<usize>, String> {
#[cfg(target_os = "linux")]
{
use libc::{cpu_set_t, sched_getaffinity};
use std::mem;
unsafe {
let mut cpuset: cpu_set_t = mem::zeroed();
let result = sched_getaffinity(0, mem::size_of::<cpu_set_t>(), &mut cpuset);
if result == 0 {
let mut cores = Vec::new();
for i in 0..libc::CPU_SETSIZE as usize {
if libc::CPU_ISSET(i, &cpuset) {
cores.push(i);
}
}
Ok(cores)
} else {
Err(format!("Failed to get CPU affinity: {}", std::io::Error::last_os_error()))
}
}
}
#[cfg(not(target_os = "linux"))]
{
Ok(vec![]) }
}
pub fn optimize_numa_memory(&self) {
if let Some(ref numa_nodes) = self.numa_topology {
info!("Optimizing memory allocation for {} NUMA nodes", numa_nodes.len());
for node in numa_nodes {
debug!("NUMA Node {}: {} cores, {} MB memory",
node.node_id, node.cpu_cores.len(), node.memory_size_mb);
}
}
}
pub fn get_metrics(&self) -> ThreadPoolMetrics {
ThreadPoolMetrics {
total_threads: self.pool.current_num_threads(),
active_threads: self.pool.current_thread_index().map_or(0, |i| i + 1),
cpu_affinity_enabled: self.config.enable_affinity,
numa_awareness_enabled: self.config.enable_numa_awareness,
numa_nodes: self.numa_topology.as_ref().map(|nodes| nodes.len()),
thread_assignments: self.thread_assignments.read().unwrap().len(),
}
}
}
#[derive(Debug, Clone)]
pub struct ThreadPoolMetrics {
pub total_threads: usize,
pub active_threads: usize,
pub cpu_affinity_enabled: bool,
pub numa_awareness_enabled: bool,
pub numa_nodes: Option<usize>,
pub thread_assignments: usize,
}
pub struct OptimizedThreadPoolBuilder {
num_threads: Option<usize>,
config: CpuAffinityConfig,
}
impl OptimizedThreadPoolBuilder {
pub fn new() -> Self {
Self {
num_threads: None,
config: CpuAffinityConfig::default(),
}
}
pub fn num_threads(mut self, num_threads: usize) -> Self {
self.num_threads = Some(num_threads);
self
}
pub fn enable_cpu_affinity(mut self, enable: bool) -> Self {
self.config.enable_affinity = enable;
self
}
pub fn enable_numa_awareness(mut self, enable: bool) -> Self {
self.config.enable_numa_awareness = enable;
self
}
pub fn core_ids(mut self, cores: Vec<usize>) -> Self {
self.config.core_ids = Some(cores);
self
}
pub fn numa_nodes(mut self, nodes: Vec<usize>) -> Self {
self.config.numa_nodes = Some(nodes);
self
}
pub fn build(self) -> Result<OptimizedThreadPool, ThreadPoolBuildError> {
OptimizedThreadPool::new(self.num_threads, self.config)
}
}
impl Default for OptimizedThreadPoolBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_thread_pool_builder() {
let builder = OptimizedThreadPoolBuilder::new()
.num_threads(4)
.enable_cpu_affinity(true)
.enable_numa_awareness(false);
assert!(builder.build().is_ok());
}
#[test]
fn test_cpu_affinity_detection() {
let _affinity = OptimizedThreadPool::get_thread_affinity();
}
#[test]
fn test_numa_detection() {
let _topology = OptimizedThreadPool::detect_numa_topology();
}
}