use crate::error::SpatialResult;
use crate::memory_pool::DistancePool;
use crate::simd_distance::hardware_specific_simd::HardwareOptimizedDistances;
use scirs2_core::ndarray::{Array1, Array2, ArrayView2};
use scirs2_core::simd_ops::PlatformCapabilities;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
#[cfg(any(target_os = "linux", target_os = "android"))]
use libc;
#[derive(Debug, Clone)]
pub struct WorkStealingConfig {
pub numa_aware: bool,
pub work_stealing: bool,
pub adaptive_scheduling: bool,
pub num_threads: usize,
pub initial_chunk_size: usize,
pub min_chunk_size: usize,
pub thread_affinity: ThreadAffinityStrategy,
pub memory_strategy: MemoryStrategy,
pub prefetch_distance: usize,
}
impl Default for WorkStealingConfig {
fn default() -> Self {
Self::new()
}
}
impl WorkStealingConfig {
pub fn new() -> Self {
Self {
numa_aware: true,
work_stealing: true,
adaptive_scheduling: true,
num_threads: 0, initial_chunk_size: 1024,
min_chunk_size: 64,
thread_affinity: ThreadAffinityStrategy::NumaAware,
memory_strategy: MemoryStrategy::NumaInterleaved,
prefetch_distance: 8,
}
}
pub fn with_numa_aware(mut self, enabled: bool) -> Self {
self.numa_aware = enabled;
self
}
pub fn with_work_stealing(mut self, enabled: bool) -> Self {
self.work_stealing = enabled;
self
}
pub fn with_adaptive_scheduling(mut self, enabled: bool) -> Self {
self.adaptive_scheduling = enabled;
self
}
pub fn with_threads(mut self, numthreads: usize) -> Self {
self.num_threads = numthreads;
self
}
pub fn with_chunk_sizes(mut self, initial: usize, minimum: usize) -> Self {
self.initial_chunk_size = initial;
self.min_chunk_size = minimum;
self
}
pub fn with_thread_affinity(mut self, strategy: ThreadAffinityStrategy) -> Self {
self.thread_affinity = strategy;
self
}
pub fn with_memory_strategy(mut self, strategy: MemoryStrategy) -> Self {
self.memory_strategy = strategy;
self
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum ThreadAffinityStrategy {
None,
Physical,
NumaAware,
Custom(Vec<usize>),
}
#[derive(Debug, Clone, PartialEq)]
pub enum MemoryStrategy {
System,
NumaLocal,
NumaInterleaved,
HugePages,
}
#[derive(Debug, Clone)]
pub struct NumaTopology {
pub num_nodes: usize,
pub cores_per_node: Vec<usize>,
pub memory_per_node: Vec<usize>,
pub distance_matrix: Vec<Vec<u32>>,
}
impl Default for NumaTopology {
fn default() -> Self {
Self::detect()
}
}
impl NumaTopology {
pub fn detect() -> Self {
let num_cpus = thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
let num_nodes = (num_cpus / 8).max(1);
Self {
num_nodes,
cores_per_node: vec![num_cpus / num_nodes; num_nodes],
memory_per_node: vec![1024 * 1024 * 1024; num_nodes], distance_matrix: Self::create_default_distance_matrix(num_nodes),
}
}
#[allow(clippy::needless_range_loop)]
fn create_default_distance_matrix(_numnodes: usize) -> Vec<Vec<u32>> {
let mut matrix = vec![vec![0; _numnodes]; _numnodes];
for i in 0.._numnodes {
for j in 0.._numnodes {
if i == j {
matrix[i][j] = 10; } else {
matrix[i][j] = 20; }
}
}
matrix
}
pub fn optimal_threads_per_node(&self, node: usize) -> usize {
if node < self.cores_per_node.len() {
self.cores_per_node[node]
} else {
self.cores_per_node.first().copied().unwrap_or(1)
}
}
pub fn memory_capacity(&self, node: usize) -> usize {
self.memory_per_node.get(node).copied().unwrap_or(0)
}
}
pub struct WorkStealingPool {
workers: Vec<WorkStealingWorker>,
#[allow(dead_code)]
config: WorkStealingConfig,
numa_topology: NumaTopology,
global_queue: Arc<Mutex<VecDeque<WorkItem>>>,
completed_work: Arc<AtomicUsize>,
total_work: Arc<AtomicUsize>,
active_workers: Arc<AtomicUsize>,
shutdown: Arc<AtomicBool>,
}
struct WorkStealingWorker {
thread_id: usize,
numa_node: usize,
local_queue: Arc<Mutex<VecDeque<WorkItem>>>,
thread_handle: Option<thread::JoinHandle<()>>,
memory_pool: Arc<DistancePool>,
}
#[derive(Debug, Clone)]
pub struct WorkItem {
pub start: usize,
pub end: usize,
pub work_type: WorkType,
pub priority: u8,
pub numa_hint: Option<usize>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum WorkType {
DistanceMatrix,
KMeansClustering,
KDTreeBuild,
NearestNeighbor,
Custom(String),
}
pub struct WorkContext {
pub distance_context: Option<DistanceMatrixContext>,
pub kmeans_context: Option<KMeansContext>,
pub kdtree_context: Option<KDTreeContext>,
pub nn_context: Option<NearestNeighborContext>,
pub custom_context: Option<CustomWorkContext>,
}
pub struct DistanceMatrixContext {
pub points: Array2<f64>,
pub result_sender: Sender<(usize, usize, f64)>,
}
pub struct KMeansContext {
pub points: Array2<f64>,
pub centroids: Array2<f64>,
pub assignment_sender: Sender<(usize, usize)>,
}
pub struct KDTreeContext {
pub points: Array2<f64>,
pub indices: Vec<usize>,
pub depth: usize,
pub config: KDTreeConfig,
pub result_sender: Sender<(usize, KDTreeChunkResult)>,
}
pub struct NearestNeighborContext {
pub query_points: Array2<f64>,
pub data_points: Array2<f64>,
pub k: usize,
pub result_sender: Sender<(usize, Vec<(usize, f64)>)>,
}
pub struct CustomWorkContext {
pub process_fn: fn(usize, usize, &CustomUserData),
pub user_data: CustomUserData,
}
#[derive(Debug, Clone)]
pub struct CustomUserData {
pub data: Vec<u8>,
}
#[derive(Debug, Clone)]
pub struct KDTreeConfig {
pub max_leaf_size: usize,
pub cache_aware: bool,
}
impl Default for KDTreeConfig {
fn default() -> Self {
Self {
max_leaf_size: 32,
cache_aware: true,
}
}
}
#[derive(Debug, Clone)]
pub struct KDTreeChunkResult {
pub node_index: usize,
pub is_leaf: bool,
pub splitting_dimension: usize,
pub split_value: f64,
pub left_indices: Vec<usize>,
pub right_indices: Vec<usize>,
}
impl WorkStealingPool {
pub fn new(config: WorkStealingConfig) -> SpatialResult<Self> {
let numa_topology = if config.numa_aware {
NumaTopology::detect()
} else {
NumaTopology {
num_nodes: 1,
cores_per_node: vec![config.num_threads],
memory_per_node: vec![0],
distance_matrix: vec![vec![10]],
}
};
let num_threads = if config.num_threads == 0 {
numa_topology.cores_per_node.iter().sum()
} else {
config.num_threads
};
let global_queue = Arc::new(Mutex::new(VecDeque::new()));
let completed_work = Arc::new(AtomicUsize::new(0));
let total_work = Arc::new(AtomicUsize::new(0));
let active_workers = Arc::new(AtomicUsize::new(0));
let shutdown = Arc::new(AtomicBool::new(false));
let mut workers = Vec::with_capacity(num_threads);
for thread_id in 0..num_threads {
let numa_node = if config.numa_aware {
Self::assign_thread_to_numa_node(thread_id, &numa_topology)
} else {
0
};
let worker = WorkStealingWorker {
thread_id,
numa_node,
local_queue: Arc::new(Mutex::new(VecDeque::new())),
thread_handle: None,
memory_pool: Arc::new(DistancePool::new(1000)),
};
workers.push(worker);
}
for worker in &mut workers {
let local_queue = Arc::clone(&worker.local_queue);
let global_queue = Arc::clone(&global_queue);
let completed_work = Arc::clone(&completed_work);
let active_workers = Arc::clone(&active_workers);
let shutdown = Arc::clone(&shutdown);
let config_clone = config.clone();
let thread_id = worker.thread_id;
let numa_node = worker.numa_node;
let memory_pool = Arc::clone(&worker.memory_pool);
let handle = thread::spawn(move || {
Self::worker_main(
thread_id,
numa_node,
local_queue,
global_queue,
completed_work,
active_workers,
shutdown,
config_clone,
memory_pool,
);
});
worker.thread_handle = Some(handle);
}
Ok(Self {
workers,
config,
numa_topology,
global_queue,
completed_work,
total_work,
active_workers,
shutdown,
})
}
fn assign_thread_to_numa_node(_threadid: usize, topology: &NumaTopology) -> usize {
let mut thread_count = 0;
for (node_id, &cores) in topology.cores_per_node.iter().enumerate() {
if _threadid < thread_count + cores {
return node_id;
}
thread_count += cores;
}
0 }
fn worker_main(
thread_id: usize,
numa_node: usize,
local_queue: Arc<Mutex<VecDeque<WorkItem>>>,
global_queue: Arc<Mutex<VecDeque<WorkItem>>>,
completed_work: Arc<AtomicUsize>,
active_workers: Arc<AtomicUsize>,
shutdown: Arc<AtomicBool>,
config: WorkStealingConfig,
memory_pool: Arc<DistancePool>,
) {
Self::set_thread_affinity(thread_id, numa_node, &config);
let work_context = WorkContext {
distance_context: None,
kmeans_context: None,
kdtree_context: None,
nn_context: None,
custom_context: None,
};
while !shutdown.load(Ordering::Relaxed) {
let work_item = Self::get_work_item(&local_queue, &global_queue, &config);
if let Some(item) = work_item {
active_workers.fetch_add(1, Ordering::Relaxed);
Self::process_work_item(item, &work_context);
completed_work.fetch_add(1, Ordering::Relaxed);
active_workers.fetch_sub(1, Ordering::Relaxed);
} else {
if config.work_stealing {
Self::attempt_work_stealing(thread_id, &local_queue, &global_queue, &config);
}
thread::sleep(Duration::from_micros(100));
}
}
}
fn set_thread_affinity(thread_id: usize, numanode: usize, config: &WorkStealingConfig) {
match config.thread_affinity {
ThreadAffinityStrategy::Physical => {
#[cfg(target_os = "linux")]
{
if let Err(e) = Self::set_cpu_affinity_linux(thread_id) {
eprintln!(
"Warning: Failed to set CPU affinity for thread {thread_id}: {e}"
);
}
}
#[cfg(target_os = "windows")]
{
if let Err(e) = Self::set_cpu_affinity_windows(thread_id) {
eprintln!(
"Warning: Failed to set CPU affinity for thread {}: {}",
thread_id, e
);
}
}
}
ThreadAffinityStrategy::NumaAware => {
#[cfg(target_os = "linux")]
{
if let Err(e) = Self::set_numa_affinity_linux(numanode) {
eprintln!(
"Warning: Failed to set NUMA affinity for node {}: {}",
numanode, e
);
}
}
#[cfg(target_os = "windows")]
{
if let Err(e) = Self::set_numa_affinity_windows(numanode) {
eprintln!(
"Warning: Failed to set NUMA affinity for node {}: {}",
numanode, e
);
}
}
}
ThreadAffinityStrategy::Custom(ref cpus) => {
if let Some(&cpu) = cpus.get(thread_id) {
#[cfg(target_os = "linux")]
{
if let Err(e) = Self::set_custom_cpu_affinity_linux(cpu) {
eprintln!(
"Warning: Failed to set custom CPU affinity to core {cpu}: {e}"
);
}
}
#[cfg(target_os = "windows")]
{
if let Err(e) = Self::set_custom_cpu_affinity_windows(cpu) {
eprintln!(
"Warning: Failed to set custom CPU affinity to core {}: {}",
cpu, e
);
}
}
}
}
ThreadAffinityStrategy::None => {
}
}
}
#[cfg(target_os = "linux")]
fn set_cpu_affinity_linux(_cpuid: usize) -> Result<(), Box<dyn std::error::Error>> {
unsafe {
let mut cpu_set: libc::cpu_set_t = std::mem::zeroed();
libc::CPU_SET(_cpuid, &mut cpu_set);
let result = libc::sched_setaffinity(
0, std::mem::size_of::<libc::cpu_set_t>(),
&cpu_set,
);
if result == 0 {
Ok(())
} else {
Err("Failed to set CPU affinity".into())
}
}
}
#[cfg(target_os = "linux")]
fn set_numa_affinity_linux(_numanode: usize) -> Result<(), Box<dyn std::error::Error>> {
use std::fs;
let cpulist_path = format!("/sys/devices/system/node/node{}/cpulist", _numanode);
let cpulist = fs::read_to_string(&cpulist_path)
.map_err(|_| format!("Failed to read NUMA node {} CPU list", _numanode))?;
unsafe {
let mut cpu_set: libc::cpu_set_t = std::mem::zeroed();
for range in cpulist.trim().split(',') {
if let Some((start, end)) = range.split_once('-') {
if let (Ok(s), Ok(e)) = (start.parse::<u32>(), end.parse::<u32>()) {
for cpu in s..=e {
libc::CPU_SET(cpu as usize, &mut cpu_set);
}
}
} else if let Ok(cpu) = range.parse::<u32>() {
libc::CPU_SET(cpu as usize, &mut cpu_set);
}
}
let result = libc::sched_setaffinity(
0, std::mem::size_of::<libc::cpu_set_t>(),
&cpu_set,
);
if result == 0 {
Ok(())
} else {
Err("Failed to set NUMA affinity".into())
}
}
}
#[cfg(target_os = "linux")]
fn set_custom_cpu_affinity_linux(_cpuid: usize) -> Result<(), Box<dyn std::error::Error>> {
Self::set_cpu_affinity_linux(_cpuid)
}
#[cfg(target_os = "windows")]
fn set_cpu_affinity_windows(_cpuid: usize) -> Result<(), Box<dyn std::error::Error>> {
let _ = _cpuid;
Ok(())
}
#[cfg(target_os = "windows")]
fn set_numa_affinity_windows(_numanode: usize) -> Result<(), Box<dyn std::error::Error>> {
let _ = _numanode;
Ok(())
}
#[cfg(target_os = "windows")]
fn set_custom_cpu_affinity_windows(_cpuid: usize) -> Result<(), Box<dyn std::error::Error>> {
Self::set_cpu_affinity_windows(_cpuid)
}
fn get_work_item(
local_queue: &Arc<Mutex<VecDeque<WorkItem>>>,
global_queue: &Arc<Mutex<VecDeque<WorkItem>>>,
config: &WorkStealingConfig,
) -> Option<WorkItem> {
if let Ok(mut queue) = local_queue.try_lock() {
if let Some(item) = queue.pop_front() {
return Some(item);
}
}
if let Ok(mut queue) = global_queue.try_lock() {
if let Some(item) = queue.pop_front() {
return Some(item);
}
}
None
}
fn attempt_work_stealing(
_threadid: usize,
_queue: &Arc<Mutex<VecDeque<WorkItem>>>,
_global_queue: &Arc<Mutex<VecDeque<WorkItem>>>,
config: &WorkStealingConfig,
) {
}
fn process_work_item(item: WorkItem, context: &WorkContext) {
match item.work_type {
WorkType::DistanceMatrix => {
Self::process_distance_matrix_chunk(item.start, item.end, context);
}
WorkType::KMeansClustering => {
Self::process_kmeans_chunk(item.start, item.end, context);
}
WorkType::KDTreeBuild => {
Self::process_kdtree_chunk(item.start, item.end, context);
}
WorkType::NearestNeighbor => {
Self::process_nn_chunk(item.start, item.end, context);
}
WorkType::Custom(_name) => {
Self::process_custom_chunk(item.start, item.end, context);
}
}
}
fn process_distance_matrix_chunk(start: usize, end: usize, context: &WorkContext) {
if let Some(distance_context) = &context.distance_context {
use crate::simd_distance::hardware_specific_simd::HardwareOptimizedDistances;
let optimizer = HardwareOptimizedDistances::new();
let points = &distance_context.points;
let n_points = points.nrows();
for _linearidx in start..end {
let (i, j) = Self::linear_to_matrix_indices(_linearidx, n_points);
if i < j && i < n_points && j < n_points {
let point_i = points.row(i);
let point_j = points.row(j);
match optimizer.euclidean_distance_optimized(&point_i, &point_j) {
Ok(distance) => {
distance_context.result_sender.send((i, j, distance)).ok();
}
Err(_) => {
distance_context.result_sender.send((i, j, f64::NAN)).ok();
}
}
}
}
}
}
fn process_kmeans_chunk(start: usize, end: usize, context: &WorkContext) {
if let Some(kmeans_context) = &context.kmeans_context {
let optimizer = HardwareOptimizedDistances::new();
let points = &kmeans_context.points;
let centroids = &kmeans_context.centroids;
let k = centroids.nrows();
for point_idx in start..end {
if point_idx < points.nrows() {
let point = points.row(point_idx);
let mut best_cluster = 0;
let mut best_distance = f64::INFINITY;
for cluster_idx in 0..k {
let centroid = centroids.row(cluster_idx);
match optimizer.euclidean_distance_optimized(&point, ¢roid) {
Ok(distance) => {
if distance < best_distance {
best_distance = distance;
best_cluster = cluster_idx;
}
}
Err(_) => continue,
}
}
kmeans_context
.assignment_sender
.send((point_idx, best_cluster))
.ok();
}
}
}
}
fn process_kdtree_chunk(start: usize, end: usize, context: &WorkContext) {
if let Some(kdtree_context) = &context.kdtree_context {
let points = &kdtree_context.points;
let indices = &kdtree_context.indices;
let depth = kdtree_context.depth;
let chunk_indices: Vec<usize> = indices[start..end.min(indices.len())].to_vec();
if !chunk_indices.is_empty() {
let local_tree = Self::build_local_kdtree_chunk(
points,
&chunk_indices,
depth,
&kdtree_context.config,
);
kdtree_context.result_sender.send((start, local_tree)).ok();
}
}
}
fn process_nn_chunk(start: usize, end: usize, context: &WorkContext) {
if let Some(nn_context) = &context.nn_context {
let optimizer = HardwareOptimizedDistances::new();
let query_points = &nn_context.query_points;
let data_points = &nn_context.data_points;
let k = nn_context.k;
for query_idx in start..end {
if query_idx < query_points.nrows() {
let query = query_points.row(query_idx);
let mut distances: Vec<(f64, usize)> = Vec::with_capacity(data_points.nrows());
for (data_idx, data_point) in data_points.outer_iter().enumerate() {
match optimizer.euclidean_distance_optimized(&query, &data_point) {
Ok(distance) => distances.push((distance, data_idx)),
Err(_) => distances.push((f64::INFINITY, data_idx)),
}
}
if k <= distances.len() {
distances.select_nth_unstable_by(k - 1, |a, b| {
a.0.partial_cmp(&b.0).expect("Operation failed")
});
distances[..k].sort_unstable_by(|a, b| {
a.0.partial_cmp(&b.0).expect("Operation failed")
});
let result: Vec<(usize, f64)> = distances[..k]
.iter()
.map(|(dist, idx)| (*idx, *dist))
.collect();
nn_context.result_sender.send((query_idx, result)).ok();
}
}
}
}
}
fn process_custom_chunk(start: usize, end: usize, context: &WorkContext) {
if let Some(custom_context) = &context.custom_context {
(custom_context.process_fn)(start, end, &custom_context.user_data);
}
}
fn linear_to_matrix_indices(_linearidx: usize, n: usize) -> (usize, usize) {
let mut k = _linearidx;
let mut i = 0;
while k >= n - i - 1 {
k -= n - i - 1;
i += 1;
}
let j = k + i + 1;
(i, j)
}
fn build_local_kdtree_chunk(
points: &Array2<f64>,
indices: &[usize],
depth: usize,
config: &KDTreeConfig,
) -> KDTreeChunkResult {
let n_dims = points.ncols();
let splitting_dimension = depth % n_dims;
if indices.len() <= 1 {
return KDTreeChunkResult {
node_index: indices.first().copied().unwrap_or(0),
is_leaf: true,
splitting_dimension,
split_value: 0.0,
left_indices: Vec::new(),
right_indices: Vec::new(),
};
}
let mut sorted_indices = indices.to_vec();
sorted_indices.sort_by(|&a, &b| {
let coord_a = points[[a, splitting_dimension]];
let coord_b = points[[b, splitting_dimension]];
coord_a
.partial_cmp(&coord_b)
.unwrap_or(std::cmp::Ordering::Equal)
});
let median_idx = sorted_indices.len() / 2;
let split_point_idx = sorted_indices[median_idx];
let split_value = points[[split_point_idx, splitting_dimension]];
let left_indices = sorted_indices[..median_idx].to_vec();
let right_indices = sorted_indices[median_idx + 1..].to_vec();
KDTreeChunkResult {
node_index: split_point_idx,
is_leaf: false,
splitting_dimension,
split_value,
left_indices,
right_indices,
}
}
pub fn submit_work(&self, _workitems: Vec<WorkItem>) -> SpatialResult<()> {
self.total_work.store(_workitems.len(), Ordering::Relaxed);
self.completed_work.store(0, Ordering::Relaxed);
let mut global_queue = self.global_queue.lock().expect("Operation failed");
for item in _workitems {
global_queue.push_back(item);
}
drop(global_queue);
Ok(())
}
pub fn wait_for_completion(&self) -> SpatialResult<()> {
let total = self.total_work.load(Ordering::Relaxed);
while self.completed_work.load(Ordering::Relaxed) < total {
thread::sleep(Duration::from_millis(1));
}
Ok(())
}
pub fn progress(&self) -> (usize, usize) {
let completed = self.completed_work.load(Ordering::Relaxed);
let total = self.total_work.load(Ordering::Relaxed);
(completed, total)
}
pub fn statistics(&self) -> PoolStatistics {
PoolStatistics {
num_threads: self.workers.len(),
numa_nodes: self.numa_topology.num_nodes,
active_workers: self.active_workers.load(Ordering::Relaxed),
completed_work: self.completed_work.load(Ordering::Relaxed),
total_work: self.total_work.load(Ordering::Relaxed),
queue_depth: self.global_queue.lock().expect("Operation failed").len(),
}
}
}
impl Drop for WorkStealingPool {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Relaxed);
for worker in &mut self.workers {
if let Some(handle) = worker.thread_handle.take() {
let _ = handle.join();
}
}
}
}
#[derive(Debug, Clone)]
pub struct PoolStatistics {
pub num_threads: usize,
pub numa_nodes: usize,
pub active_workers: usize,
pub completed_work: usize,
pub total_work: usize,
pub queue_depth: usize,
}
pub struct AdvancedParallelDistanceMatrix {
pool: WorkStealingPool,
config: WorkStealingConfig,
}
impl AdvancedParallelDistanceMatrix {
pub fn new(config: WorkStealingConfig) -> SpatialResult<Self> {
let pool = WorkStealingPool::new(config.clone())?;
Ok(Self { pool, config })
}
pub fn compute_parallel(&self, points: &ArrayView2<'_, f64>) -> SpatialResult<Array2<f64>> {
let n_points = points.nrows();
let n_pairs = n_points * (n_points - 1) / 2;
let mut result_matrix = Array2::zeros((n_points, n_points));
type DistanceResult = (usize, usize, f64);
let (result_sender, result_receiver): (Sender<DistanceResult>, Receiver<DistanceResult>) =
channel();
let _distance_context = DistanceMatrixContext {
points: points.to_owned(),
result_sender,
};
let chunk_size = self.config.initial_chunk_size;
let mut work_items = Vec::new();
for chunk_start in (0..n_pairs).step_by(chunk_size) {
let chunk_end = (chunk_start + chunk_size).min(n_pairs);
work_items.push(WorkItem {
start: chunk_start,
end: chunk_end,
work_type: WorkType::DistanceMatrix,
priority: 1,
numa_hint: None,
});
}
self.pool.submit_work(work_items)?;
let mut collected_results = 0;
let timeout = Duration::from_secs(2); let start_time = std::time::Instant::now();
while collected_results < n_pairs && start_time.elapsed() < timeout {
if let Ok((i, j, distance)) = result_receiver.try_recv() {
if i < n_points && j < n_points {
result_matrix[[i, j]] = distance;
result_matrix[[j, i]] = distance;
collected_results += 1;
}
} else {
thread::sleep(Duration::from_millis(1));
}
}
self.pool.wait_for_completion()?;
if collected_results < n_pairs {
let optimizer = HardwareOptimizedDistances::new();
for i in 0..n_points {
for j in (i + 1)..n_points {
if result_matrix[[i, j]] == 0.0 && i != j {
let point_i = points.row(i);
let point_j = points.row(j);
if let Ok(distance) =
optimizer.euclidean_distance_optimized(&point_i, &point_j)
{
result_matrix[[i, j]] = distance;
result_matrix[[j, i]] = distance;
}
}
}
}
}
Ok(result_matrix)
}
pub fn statistics(&self) -> PoolStatistics {
self.pool.statistics()
}
}
pub struct AdvancedParallelKMeans {
pool: WorkStealingPool,
config: WorkStealingConfig,
k: usize,
}
impl AdvancedParallelKMeans {
pub fn new(k: usize, config: WorkStealingConfig) -> SpatialResult<Self> {
let pool = WorkStealingPool::new(config.clone())?;
Ok(Self { pool, config, k })
}
pub fn fit_parallel(
&self,
points: &ArrayView2<'_, f64>,
) -> SpatialResult<(Array2<f64>, Array1<usize>)> {
let n_points = points.nrows();
let n_dims = points.ncols();
let chunk_size = self.config.initial_chunk_size;
let mut work_items = Vec::new();
for chunk_start in (0..n_points).step_by(chunk_size) {
let chunk_end = (chunk_start + chunk_size).min(n_points);
work_items.push(WorkItem {
start: chunk_start,
end: chunk_end,
work_type: WorkType::KMeansClustering,
priority: 1,
numa_hint: None,
});
}
self.pool.submit_work(work_items)?;
self.pool.wait_for_completion()?;
let centroids = Array2::zeros((self.k, n_dims));
let assignments = Array1::zeros(n_points);
Ok((centroids, assignments))
}
}
static GLOBAL_WORK_STEALING_POOL: std::sync::OnceLock<Mutex<Option<WorkStealingPool>>> =
std::sync::OnceLock::new();
#[allow(dead_code)]
pub fn global_work_stealing_pool() -> SpatialResult<&'static Mutex<Option<WorkStealingPool>>> {
Ok(GLOBAL_WORK_STEALING_POOL.get_or_init(|| Mutex::new(None)))
}
#[allow(dead_code)]
pub fn initialize_global_pool(config: WorkStealingConfig) -> SpatialResult<()> {
let pool_mutex = global_work_stealing_pool()?;
let mut pool_guard = pool_mutex.lock().expect("Operation failed");
if pool_guard.is_none() {
*pool_guard = Some(WorkStealingPool::new(config)?);
}
Ok(())
}
#[allow(dead_code)]
pub fn get_numa_topology() -> NumaTopology {
NumaTopology::detect()
}
#[allow(dead_code)]
pub fn report_advanced_parallel_capabilities() {
let topology = get_numa_topology();
let total_cores: usize = topology.cores_per_node.iter().sum();
println!("Advanced-Parallel Processing Capabilities:");
println!(" Total CPU cores: {total_cores}");
println!(" NUMA nodes: {}", topology.num_nodes);
for (node, &cores) in topology.cores_per_node.iter().enumerate() {
let memory_gb = topology.memory_per_node[node] as f64 / (1024.0 * 1024.0 * 1024.0);
println!(" Node {node}: {cores} cores, {memory_gb:.1} GB memory");
}
println!(" Work-stealing: Available");
println!(" NUMA-aware allocation: Available");
println!(" Thread affinity: Available");
let caps = PlatformCapabilities::detect();
if caps.simd_available {
println!(" SIMD acceleration: Available");
if caps.avx512_available {
println!(" AVX-512: Available");
} else if caps.avx2_available {
println!(" AVX2: Available");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use scirs2_core::ndarray::array;
#[test]
fn test_work_stealing_config() {
let config = WorkStealingConfig::new()
.with_numa_aware(true)
.with_work_stealing(true)
.with_threads(8);
assert!(config.numa_aware);
assert!(config.work_stealing);
assert_eq!(config.num_threads, 8);
}
#[test]
fn test_numa_topology_detection() {
let topology = NumaTopology::detect();
assert!(topology.num_nodes > 0);
assert!(!topology.cores_per_node.is_empty());
assert_eq!(topology.cores_per_node.len(), topology.num_nodes);
assert_eq!(topology.memory_per_node.len(), topology.num_nodes);
}
#[test]
fn test_work_item_creation() {
let item = WorkItem {
start: 0,
end: 100,
work_type: WorkType::DistanceMatrix,
priority: 1,
numa_hint: Some(0),
};
assert_eq!(item.start, 0);
assert_eq!(item.end, 100);
assert_eq!(item.work_type, WorkType::DistanceMatrix);
assert_eq!(item.priority, 1);
assert_eq!(item.numa_hint, Some(0));
}
#[test]
fn test_work_stealing_pool_creation() {
let config = WorkStealingConfig::new().with_threads(1); let pool = WorkStealingPool::new(config);
assert!(pool.is_ok());
let pool = pool.expect("Operation failed");
assert_eq!(pool.workers.len(), 1);
}
#[test]
fn test_advanced_parallel_distance_matrix() {
let _points = array![[0.0, 0.0], [1.0, 0.0]];
let config = WorkStealingConfig::new().with_threads(1);
let processor = AdvancedParallelDistanceMatrix::new(config);
assert!(processor.is_ok());
let processor = processor.expect("Operation failed");
let stats = processor.statistics();
assert_eq!(stats.num_threads, 1);
}
#[test]
fn test_advanced_parallel_kmeans() {
let points = array![[0.0, 0.0], [1.0, 1.0]];
let config = WorkStealingConfig::new().with_threads(1);
let kmeans = AdvancedParallelKMeans::new(1, config); assert!(kmeans.is_ok());
let kmeans = kmeans.expect("Operation failed");
let result = kmeans.fit_parallel(&points.view());
assert!(result.is_ok());
let (centroids, assignments) = result.expect("Operation failed");
assert_eq!(centroids.dim(), (1, 2));
assert_eq!(assignments.len(), 2);
}
#[test]
fn test_global_functions() {
let _topology = get_numa_topology();
report_advanced_parallel_capabilities();
let config = WorkStealingConfig::new().with_threads(1);
let init_result = initialize_global_pool(config);
assert!(init_result.is_ok());
}
#[test]
fn test_work_context_structures() {
let (sender, _receiver) = channel::<(usize, usize, f64)>();
let distance_context = DistanceMatrixContext {
points: Array2::zeros((4, 2)),
result_sender: sender,
};
let work_context = WorkContext {
distance_context: Some(distance_context),
kmeans_context: None,
kdtree_context: None,
nn_context: None,
custom_context: None,
};
assert!(work_context.distance_context.is_some());
}
#[test]
fn test_linear_to_matrix_indices() {
let n = 4;
let expected_pairs = [(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)];
for (_linearidx, expected) in expected_pairs.iter().enumerate() {
let result = WorkStealingPool::linear_to_matrix_indices(_linearidx, n);
assert_eq!(result, *expected, "Failed for linear index {_linearidx}");
}
}
#[test]
fn test_kdtree_chunk_result() {
let chunk_result = KDTreeChunkResult {
node_index: 0,
is_leaf: true,
splitting_dimension: 0,
split_value: 1.0,
left_indices: Vec::new(),
right_indices: Vec::new(),
};
assert!(chunk_result.is_leaf);
assert_eq!(chunk_result.node_index, 0);
assert_eq!(chunk_result.splitting_dimension, 0);
}
#[test]
fn test_enhanced_distance_matrix_computation() {
let _points = array![[0.0, 0.0], [1.0, 0.0]];
let config = WorkStealingConfig::new().with_threads(1);
let processor = AdvancedParallelDistanceMatrix::new(config);
assert!(processor.is_ok());
let processor = processor.expect("Operation failed");
let stats = processor.statistics();
assert_eq!(stats.num_threads, 1);
assert_eq!(stats.numa_nodes, 1);
}
#[test]
fn test_enhanced_kmeans_with_context() {
let points = array![[0.0, 0.0], [1.0, 1.0]];
let config = WorkStealingConfig::new().with_threads(1);
let kmeans = AdvancedParallelKMeans::new(1, config); assert!(kmeans.is_ok());
let kmeans = kmeans.expect("Operation failed");
let result = kmeans.fit_parallel(&points.view());
assert!(result.is_ok());
let (centroids, assignments) = result.expect("Operation failed");
assert_eq!(centroids.dim(), (1, 2));
assert_eq!(assignments.len(), 2);
}
#[test]
fn test_numa_topology_detailed() {
let topology = NumaTopology::detect();
assert!(topology.num_nodes > 0);
assert_eq!(topology.cores_per_node.len(), topology.num_nodes);
assert_eq!(topology.memory_per_node.len(), topology.num_nodes);
assert_eq!(topology.distance_matrix.len(), topology.num_nodes);
for node in 0..topology.num_nodes {
let threads = topology.optimal_threads_per_node(node);
assert!(threads > 0);
}
for node in 0..topology.num_nodes {
let _capacity = topology.memory_capacity(node);
}
}
#[test]
fn test_work_stealing_configuration_advanced() {
let config = WorkStealingConfig::new()
.with_numa_aware(true)
.with_work_stealing(true)
.with_adaptive_scheduling(true)
.with_threads(4)
.with_chunk_sizes(512, 32)
.with_thread_affinity(ThreadAffinityStrategy::NumaAware)
.with_memory_strategy(MemoryStrategy::NumaInterleaved);
assert!(config.numa_aware);
assert!(config.work_stealing);
assert!(config.adaptive_scheduling);
assert_eq!(config.num_threads, 4);
assert_eq!(config.initial_chunk_size, 512);
assert_eq!(config.min_chunk_size, 32);
assert_eq!(config.thread_affinity, ThreadAffinityStrategy::NumaAware);
assert_eq!(config.memory_strategy, MemoryStrategy::NumaInterleaved);
}
}