1use crate::error::SpatialResult;
42use crate::memory_pool::DistancePool;
43use crate::simd_distance::hardware_specific_simd::HardwareOptimizedDistances;
44use scirs2_core::ndarray::{Array1, Array2, ArrayView2};
45use scirs2_core::simd_ops::PlatformCapabilities;
46use std::collections::VecDeque;
47use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
48use std::sync::mpsc::{channel, Receiver, Sender};
49use std::sync::{Arc, Mutex};
50use std::thread;
51use std::time::Duration;
52
53#[cfg(any(target_os = "linux", target_os = "android"))]
55use libc;
56
57#[derive(Debug, Clone)]
59pub struct WorkStealingConfig {
60 pub numa_aware: bool,
62 pub work_stealing: bool,
64 pub adaptive_scheduling: bool,
66 pub num_threads: usize,
68 pub initial_chunk_size: usize,
70 pub min_chunk_size: usize,
72 pub thread_affinity: ThreadAffinityStrategy,
74 pub memory_strategy: MemoryStrategy,
76 pub prefetch_distance: usize,
78}
79
80impl Default for WorkStealingConfig {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl WorkStealingConfig {
87 pub fn new() -> Self {
89 Self {
90 numa_aware: true,
91 work_stealing: true,
92 adaptive_scheduling: true,
93 num_threads: 0, initial_chunk_size: 1024,
95 min_chunk_size: 64,
96 thread_affinity: ThreadAffinityStrategy::NumaAware,
97 memory_strategy: MemoryStrategy::NumaInterleaved,
98 prefetch_distance: 8,
99 }
100 }
101
102 pub fn with_numa_aware(mut self, enabled: bool) -> Self {
104 self.numa_aware = enabled;
105 self
106 }
107
108 pub fn with_work_stealing(mut self, enabled: bool) -> Self {
110 self.work_stealing = enabled;
111 self
112 }
113
114 pub fn with_adaptive_scheduling(mut self, enabled: bool) -> Self {
116 self.adaptive_scheduling = enabled;
117 self
118 }
119
120 pub fn with_threads(mut self, numthreads: usize) -> Self {
122 self.num_threads = numthreads;
123 self
124 }
125
126 pub fn with_chunk_sizes(mut self, initial: usize, minimum: usize) -> Self {
128 self.initial_chunk_size = initial;
129 self.min_chunk_size = minimum;
130 self
131 }
132
133 pub fn with_thread_affinity(mut self, strategy: ThreadAffinityStrategy) -> Self {
135 self.thread_affinity = strategy;
136 self
137 }
138
139 pub fn with_memory_strategy(mut self, strategy: MemoryStrategy) -> Self {
141 self.memory_strategy = strategy;
142 self
143 }
144}
145
146#[derive(Debug, Clone, PartialEq)]
148pub enum ThreadAffinityStrategy {
149 None,
151 Physical,
153 NumaAware,
155 Custom(Vec<usize>),
157}
158
159#[derive(Debug, Clone, PartialEq)]
161pub enum MemoryStrategy {
162 System,
164 NumaLocal,
166 NumaInterleaved,
168 HugePages,
170}
171
172#[derive(Debug, Clone)]
174pub struct NumaTopology {
175 pub num_nodes: usize,
177 pub cores_per_node: Vec<usize>,
179 pub memory_per_node: Vec<usize>,
181 pub distance_matrix: Vec<Vec<u32>>,
183}
184
185impl Default for NumaTopology {
186 fn default() -> Self {
187 Self::detect()
188 }
189}
190
191impl NumaTopology {
192 pub fn detect() -> Self {
194 let num_cpus = thread::available_parallelism()
198 .map(|n| n.get())
199 .unwrap_or(4);
200 let num_nodes = (num_cpus / 8).max(1); Self {
203 num_nodes,
204 cores_per_node: vec![num_cpus / num_nodes; num_nodes],
205 memory_per_node: vec![1024 * 1024 * 1024; num_nodes], distance_matrix: Self::create_default_distance_matrix(num_nodes),
207 }
208 }
209
210 #[allow(clippy::needless_range_loop)]
211 fn create_default_distance_matrix(_numnodes: usize) -> Vec<Vec<u32>> {
212 let mut matrix = vec![vec![0; _numnodes]; _numnodes];
213 for i in 0.._numnodes {
214 for j in 0.._numnodes {
215 if i == j {
216 matrix[i][j] = 10; } else {
218 matrix[i][j] = 20; }
220 }
221 }
222 matrix
223 }
224
225 pub fn optimal_threads_per_node(&self, node: usize) -> usize {
227 if node < self.cores_per_node.len() {
228 self.cores_per_node[node]
229 } else {
230 self.cores_per_node.first().copied().unwrap_or(1)
231 }
232 }
233
234 pub fn memory_capacity(&self, node: usize) -> usize {
236 self.memory_per_node.get(node).copied().unwrap_or(0)
237 }
238}
239
240pub struct WorkStealingPool {
242 workers: Vec<WorkStealingWorker>,
243 #[allow(dead_code)]
244 config: WorkStealingConfig,
245 numa_topology: NumaTopology,
246 global_queue: Arc<Mutex<VecDeque<WorkItem>>>,
247 completed_work: Arc<AtomicUsize>,
248 total_work: Arc<AtomicUsize>,
249 active_workers: Arc<AtomicUsize>,
250 shutdown: Arc<AtomicBool>,
251}
252
253struct WorkStealingWorker {
255 thread_id: usize,
256 numa_node: usize,
257 local_queue: Arc<Mutex<VecDeque<WorkItem>>>,
258 thread_handle: Option<thread::JoinHandle<()>>,
259 memory_pool: Arc<DistancePool>,
260}
261
262#[derive(Debug, Clone)]
264pub struct WorkItem {
265 pub start: usize,
267 pub end: usize,
269 pub work_type: WorkType,
271 pub priority: u8,
273 pub numa_hint: Option<usize>,
275}
276
277#[derive(Debug, Clone, PartialEq)]
279pub enum WorkType {
280 DistanceMatrix,
282 KMeansClustering,
284 KDTreeBuild,
286 NearestNeighbor,
288 Custom(String),
290}
291
292pub struct WorkContext {
294 pub distance_context: Option<DistanceMatrixContext>,
296 pub kmeans_context: Option<KMeansContext>,
298 pub kdtree_context: Option<KDTreeContext>,
300 pub nn_context: Option<NearestNeighborContext>,
302 pub custom_context: Option<CustomWorkContext>,
304}
305
306pub struct DistanceMatrixContext {
308 pub points: Array2<f64>,
310 pub result_sender: Sender<(usize, usize, f64)>,
312}
313
314pub struct KMeansContext {
316 pub points: Array2<f64>,
318 pub centroids: Array2<f64>,
320 pub assignment_sender: Sender<(usize, usize)>,
322}
323
324pub struct KDTreeContext {
326 pub points: Array2<f64>,
328 pub indices: Vec<usize>,
330 pub depth: usize,
332 pub config: KDTreeConfig,
334 pub result_sender: Sender<(usize, KDTreeChunkResult)>,
336}
337
338pub struct NearestNeighborContext {
340 pub query_points: Array2<f64>,
342 pub data_points: Array2<f64>,
344 pub k: usize,
346 pub result_sender: Sender<(usize, Vec<(usize, f64)>)>,
348}
349
350pub struct CustomWorkContext {
352 pub process_fn: fn(usize, usize, &CustomUserData),
354 pub user_data: CustomUserData,
356}
357
358#[derive(Debug, Clone)]
360pub struct CustomUserData {
361 pub data: Vec<u8>,
363}
364
365#[derive(Debug, Clone)]
367pub struct KDTreeConfig {
368 pub max_leaf_size: usize,
370 pub cache_aware: bool,
372}
373
374impl Default for KDTreeConfig {
375 fn default() -> Self {
376 Self {
377 max_leaf_size: 32,
378 cache_aware: true,
379 }
380 }
381}
382
383#[derive(Debug, Clone)]
385pub struct KDTreeChunkResult {
386 pub node_index: usize,
388 pub is_leaf: bool,
390 pub splitting_dimension: usize,
392 pub split_value: f64,
394 pub left_indices: Vec<usize>,
396 pub right_indices: Vec<usize>,
398}
399
400impl WorkStealingPool {
401 pub fn new(config: WorkStealingConfig) -> SpatialResult<Self> {
403 let numa_topology = if config.numa_aware {
404 NumaTopology::detect()
405 } else {
406 NumaTopology {
407 num_nodes: 1,
408 cores_per_node: vec![config.num_threads],
409 memory_per_node: vec![0],
410 distance_matrix: vec![vec![10]],
411 }
412 };
413
414 let num_threads = if config.num_threads == 0 {
415 numa_topology.cores_per_node.iter().sum()
416 } else {
417 config.num_threads
418 };
419
420 let global_queue = Arc::new(Mutex::new(VecDeque::new()));
421 let completed_work = Arc::new(AtomicUsize::new(0));
422 let total_work = Arc::new(AtomicUsize::new(0));
423 let active_workers = Arc::new(AtomicUsize::new(0));
424 let shutdown = Arc::new(AtomicBool::new(false));
425
426 let mut workers = Vec::with_capacity(num_threads);
427
428 for thread_id in 0..num_threads {
430 let numa_node = if config.numa_aware {
431 Self::assign_thread_to_numa_node(thread_id, &numa_topology)
432 } else {
433 0
434 };
435
436 let worker = WorkStealingWorker {
437 thread_id,
438 numa_node,
439 local_queue: Arc::new(Mutex::new(VecDeque::new())),
440 thread_handle: None,
441 memory_pool: Arc::new(DistancePool::new(1000)),
442 };
443
444 workers.push(worker);
445 }
446
447 for worker in &mut workers {
449 let local_queue = Arc::clone(&worker.local_queue);
450 let global_queue = Arc::clone(&global_queue);
451 let completed_work = Arc::clone(&completed_work);
452 let active_workers = Arc::clone(&active_workers);
453 let shutdown = Arc::clone(&shutdown);
454 let config_clone = config.clone();
455 let thread_id = worker.thread_id;
456 let numa_node = worker.numa_node;
457 let memory_pool = Arc::clone(&worker.memory_pool);
458
459 let handle = thread::spawn(move || {
460 Self::worker_main(
461 thread_id,
462 numa_node,
463 local_queue,
464 global_queue,
465 completed_work,
466 active_workers,
467 shutdown,
468 config_clone,
469 memory_pool,
470 );
471 });
472
473 worker.thread_handle = Some(handle);
474 }
475
476 Ok(Self {
477 workers,
478 config,
479 numa_topology,
480 global_queue,
481 completed_work,
482 total_work,
483 active_workers,
484 shutdown,
485 })
486 }
487
488 fn assign_thread_to_numa_node(_threadid: usize, topology: &NumaTopology) -> usize {
490 let mut thread_count = 0;
491 for (node_id, &cores) in topology.cores_per_node.iter().enumerate() {
492 if _threadid < thread_count + cores {
493 return node_id;
494 }
495 thread_count += cores;
496 }
497 0 }
499
500 fn worker_main(
502 thread_id: usize,
503 numa_node: usize,
504 local_queue: Arc<Mutex<VecDeque<WorkItem>>>,
505 global_queue: Arc<Mutex<VecDeque<WorkItem>>>,
506 completed_work: Arc<AtomicUsize>,
507 active_workers: Arc<AtomicUsize>,
508 shutdown: Arc<AtomicBool>,
509 config: WorkStealingConfig,
510 memory_pool: Arc<DistancePool>,
511 ) {
512 Self::set_thread_affinity(thread_id, numa_node, &config);
514
515 let work_context = WorkContext {
517 distance_context: None,
518 kmeans_context: None,
519 kdtree_context: None,
520 nn_context: None,
521 custom_context: None,
522 };
523
524 while !shutdown.load(Ordering::Relaxed) {
525 let work_item = Self::get_work_item(&local_queue, &global_queue, &config);
526
527 if let Some(item) = work_item {
528 active_workers.fetch_add(1, Ordering::Relaxed);
529
530 Self::process_work_item(item, &work_context);
532
533 completed_work.fetch_add(1, Ordering::Relaxed);
534 active_workers.fetch_sub(1, Ordering::Relaxed);
535 } else {
536 if config.work_stealing {
538 Self::attempt_work_stealing(thread_id, &local_queue, &global_queue, &config);
539 }
540
541 thread::sleep(Duration::from_micros(100));
543 }
544 }
545 }
546
547 fn set_thread_affinity(thread_id: usize, numanode: usize, config: &WorkStealingConfig) {
549 match config.thread_affinity {
550 ThreadAffinityStrategy::Physical => {
551 #[cfg(target_os = "linux")]
554 {
555 if let Err(e) = Self::set_cpu_affinity_linux(thread_id) {
556 eprintln!(
557 "Warning: Failed to set CPU affinity for thread {thread_id}: {e}"
558 );
559 }
560 }
561 #[cfg(target_os = "windows")]
562 {
563 if let Err(e) = Self::set_cpu_affinity_windows(thread_id) {
564 eprintln!(
565 "Warning: Failed to set CPU affinity for thread {}: {}",
566 thread_id, e
567 );
568 }
569 }
570 }
571 ThreadAffinityStrategy::NumaAware => {
572 #[cfg(target_os = "linux")]
574 {
575 if let Err(e) = Self::set_numa_affinity_linux(numanode) {
576 eprintln!(
577 "Warning: Failed to set NUMA affinity for node {}: {}",
578 numanode, e
579 );
580 }
581 }
582 #[cfg(target_os = "windows")]
583 {
584 if let Err(e) = Self::set_numa_affinity_windows(numanode) {
585 eprintln!(
586 "Warning: Failed to set NUMA affinity for node {}: {}",
587 numanode, e
588 );
589 }
590 }
591 }
592 ThreadAffinityStrategy::Custom(ref cpus) => {
593 if let Some(&cpu) = cpus.get(thread_id) {
594 #[cfg(target_os = "linux")]
595 {
596 if let Err(e) = Self::set_custom_cpu_affinity_linux(cpu) {
597 eprintln!(
598 "Warning: Failed to set custom CPU affinity to core {cpu}: {e}"
599 );
600 }
601 }
602 #[cfg(target_os = "windows")]
603 {
604 if let Err(e) = Self::set_custom_cpu_affinity_windows(cpu) {
605 eprintln!(
606 "Warning: Failed to set custom CPU affinity to core {}: {}",
607 cpu, e
608 );
609 }
610 }
611 }
612 }
613 ThreadAffinityStrategy::None => {
614 }
616 }
617 }
618
619 #[cfg(target_os = "linux")]
621 fn set_cpu_affinity_linux(_cpuid: usize) -> Result<(), Box<dyn std::error::Error>> {
622 unsafe {
623 let mut cpu_set: libc::cpu_set_t = std::mem::zeroed();
624 libc::CPU_SET(_cpuid, &mut cpu_set);
625
626 let result = libc::sched_setaffinity(
627 0, std::mem::size_of::<libc::cpu_set_t>(),
629 &cpu_set,
630 );
631
632 if result == 0 {
633 Ok(())
634 } else {
635 Err("Failed to set CPU affinity".into())
636 }
637 }
638 }
639
640 #[cfg(target_os = "linux")]
642 fn set_numa_affinity_linux(_numanode: usize) -> Result<(), Box<dyn std::error::Error>> {
643 use std::fs;
644
645 let cpulist_path = format!("/sys/devices/system/node/node{}/cpulist", _numanode);
647 let cpulist = fs::read_to_string(&cpulist_path)
648 .map_err(|_| format!("Failed to read NUMA node {} CPU list", _numanode))?;
649
650 unsafe {
651 let mut cpu_set: libc::cpu_set_t = std::mem::zeroed();
652
653 for range in cpulist.trim().split(',') {
655 if let Some((start, end)) = range.split_once('-') {
656 if let (Ok(s), Ok(e)) = (start.parse::<u32>(), end.parse::<u32>()) {
657 for cpu in s..=e {
658 libc::CPU_SET(cpu as usize, &mut cpu_set);
659 }
660 }
661 } else if let Ok(cpu) = range.parse::<u32>() {
662 libc::CPU_SET(cpu as usize, &mut cpu_set);
663 }
664 }
665
666 let result = libc::sched_setaffinity(
667 0, std::mem::size_of::<libc::cpu_set_t>(),
669 &cpu_set,
670 );
671
672 if result == 0 {
673 Ok(())
674 } else {
675 Err("Failed to set NUMA affinity".into())
676 }
677 }
678 }
679
680 #[cfg(target_os = "linux")]
682 fn set_custom_cpu_affinity_linux(_cpuid: usize) -> Result<(), Box<dyn std::error::Error>> {
683 Self::set_cpu_affinity_linux(_cpuid)
685 }
686
687 #[cfg(target_os = "windows")]
689 fn set_cpu_affinity_windows(_cpuid: usize) -> Result<(), Box<dyn std::error::Error>> {
690 let _ = _cpuid;
693 Ok(())
694 }
695
696 #[cfg(target_os = "windows")]
698 fn set_numa_affinity_windows(_numanode: usize) -> Result<(), Box<dyn std::error::Error>> {
699 let _ = _numanode;
702 Ok(())
703 }
704
705 #[cfg(target_os = "windows")]
707 fn set_custom_cpu_affinity_windows(_cpuid: usize) -> Result<(), Box<dyn std::error::Error>> {
708 Self::set_cpu_affinity_windows(_cpuid)
710 }
711
712 fn get_work_item(
714 local_queue: &Arc<Mutex<VecDeque<WorkItem>>>,
715 global_queue: &Arc<Mutex<VecDeque<WorkItem>>>,
716 config: &WorkStealingConfig,
717 ) -> Option<WorkItem> {
718 if let Ok(mut queue) = local_queue.try_lock() {
720 if let Some(item) = queue.pop_front() {
721 return Some(item);
722 }
723 }
724
725 if let Ok(mut queue) = global_queue.try_lock() {
727 if let Some(item) = queue.pop_front() {
728 return Some(item);
729 }
730 }
731
732 None
733 }
734
735 fn attempt_work_stealing(
737 _threadid: usize,
738 _queue: &Arc<Mutex<VecDeque<WorkItem>>>,
739 _global_queue: &Arc<Mutex<VecDeque<WorkItem>>>,
740 config: &WorkStealingConfig,
741 ) {
742 }
745
746 fn process_work_item(item: WorkItem, context: &WorkContext) {
748 match item.work_type {
749 WorkType::DistanceMatrix => {
750 Self::process_distance_matrix_chunk(item.start, item.end, context);
751 }
752 WorkType::KMeansClustering => {
753 Self::process_kmeans_chunk(item.start, item.end, context);
754 }
755 WorkType::KDTreeBuild => {
756 Self::process_kdtree_chunk(item.start, item.end, context);
757 }
758 WorkType::NearestNeighbor => {
759 Self::process_nn_chunk(item.start, item.end, context);
760 }
761 WorkType::Custom(_name) => {
762 Self::process_custom_chunk(item.start, item.end, context);
763 }
764 }
765 }
766
767 fn process_distance_matrix_chunk(start: usize, end: usize, context: &WorkContext) {
769 if let Some(distance_context) = &context.distance_context {
770 use crate::simd_distance::hardware_specific_simd::HardwareOptimizedDistances;
771
772 let optimizer = HardwareOptimizedDistances::new();
773 let points = &distance_context.points;
774 let n_points = points.nrows();
775
776 for _linearidx in start..end {
778 let (i, j) = Self::linear_to_matrix_indices(_linearidx, n_points);
779
780 if i < j && i < n_points && j < n_points {
781 let point_i = points.row(i);
782 let point_j = points.row(j);
783
784 match optimizer.euclidean_distance_optimized(&point_i, &point_j) {
785 Ok(distance) => {
786 distance_context.result_sender.send((i, j, distance)).ok();
788 }
789 Err(_) => {
790 distance_context.result_sender.send((i, j, f64::NAN)).ok();
792 }
793 }
794 }
795 }
796 }
797 }
798
799 fn process_kmeans_chunk(start: usize, end: usize, context: &WorkContext) {
801 if let Some(kmeans_context) = &context.kmeans_context {
802 let optimizer = HardwareOptimizedDistances::new();
803 let points = &kmeans_context.points;
804 let centroids = &kmeans_context.centroids;
805 let k = centroids.nrows();
806
807 for point_idx in start..end {
809 if point_idx < points.nrows() {
810 let point = points.row(point_idx);
811 let mut best_cluster = 0;
812 let mut best_distance = f64::INFINITY;
813
814 for cluster_idx in 0..k {
816 let centroid = centroids.row(cluster_idx);
817
818 match optimizer.euclidean_distance_optimized(&point, ¢roid) {
819 Ok(distance) => {
820 if distance < best_distance {
821 best_distance = distance;
822 best_cluster = cluster_idx;
823 }
824 }
825 Err(_) => continue,
826 }
827 }
828
829 kmeans_context
831 .assignment_sender
832 .send((point_idx, best_cluster))
833 .ok();
834 }
835 }
836 }
837 }
838
839 fn process_kdtree_chunk(start: usize, end: usize, context: &WorkContext) {
841 if let Some(kdtree_context) = &context.kdtree_context {
842 let points = &kdtree_context.points;
843 let indices = &kdtree_context.indices;
844 let depth = kdtree_context.depth;
845
846 let chunk_indices: Vec<usize> = indices[start..end.min(indices.len())].to_vec();
848
849 if !chunk_indices.is_empty() {
850 let local_tree = Self::build_local_kdtree_chunk(
852 points,
853 &chunk_indices,
854 depth,
855 &kdtree_context.config,
856 );
857
858 kdtree_context.result_sender.send((start, local_tree)).ok();
860 }
861 }
862 }
863
864 fn process_nn_chunk(start: usize, end: usize, context: &WorkContext) {
866 if let Some(nn_context) = &context.nn_context {
867 let optimizer = HardwareOptimizedDistances::new();
868 let query_points = &nn_context.query_points;
869 let data_points = &nn_context.data_points;
870 let k = nn_context.k;
871
872 for query_idx in start..end {
874 if query_idx < query_points.nrows() {
875 let query = query_points.row(query_idx);
876
877 let mut distances: Vec<(f64, usize)> = Vec::with_capacity(data_points.nrows());
879
880 for (data_idx, data_point) in data_points.outer_iter().enumerate() {
881 match optimizer.euclidean_distance_optimized(&query, &data_point) {
882 Ok(distance) => distances.push((distance, data_idx)),
883 Err(_) => distances.push((f64::INFINITY, data_idx)),
884 }
885 }
886
887 if k <= distances.len() {
889 distances.select_nth_unstable_by(k - 1, |a, b| {
890 a.0.partial_cmp(&b.0).expect("Operation failed")
891 });
892 distances[..k].sort_unstable_by(|a, b| {
893 a.0.partial_cmp(&b.0).expect("Operation failed")
894 });
895
896 let result: Vec<(usize, f64)> = distances[..k]
897 .iter()
898 .map(|(dist, idx)| (*idx, *dist))
899 .collect();
900
901 nn_context.result_sender.send((query_idx, result)).ok();
902 }
903 }
904 }
905 }
906 }
907
908 fn process_custom_chunk(start: usize, end: usize, context: &WorkContext) {
910 if let Some(custom_context) = &context.custom_context {
911 (custom_context.process_fn)(start, end, &custom_context.user_data);
913 }
914 }
915
916 fn linear_to_matrix_indices(_linearidx: usize, n: usize) -> (usize, usize) {
918 let mut k = _linearidx;
920 let mut i = 0;
921
922 while k >= n - i - 1 {
923 k -= n - i - 1;
924 i += 1;
925 }
926
927 let j = k + i + 1;
928 (i, j)
929 }
930
931 fn build_local_kdtree_chunk(
933 points: &Array2<f64>,
934 indices: &[usize],
935 depth: usize,
936 config: &KDTreeConfig,
937 ) -> KDTreeChunkResult {
938 let n_dims = points.ncols();
939 let splitting_dimension = depth % n_dims;
940
941 if indices.len() <= 1 {
942 return KDTreeChunkResult {
943 node_index: indices.first().copied().unwrap_or(0),
944 is_leaf: true,
945 splitting_dimension,
946 split_value: 0.0,
947 left_indices: Vec::new(),
948 right_indices: Vec::new(),
949 };
950 }
951
952 let mut sorted_indices = indices.to_vec();
954 sorted_indices.sort_by(|&a, &b| {
955 let coord_a = points[[a, splitting_dimension]];
956 let coord_b = points[[b, splitting_dimension]];
957 coord_a
958 .partial_cmp(&coord_b)
959 .unwrap_or(std::cmp::Ordering::Equal)
960 });
961
962 let median_idx = sorted_indices.len() / 2;
963 let split_point_idx = sorted_indices[median_idx];
964 let split_value = points[[split_point_idx, splitting_dimension]];
965
966 let left_indices = sorted_indices[..median_idx].to_vec();
967 let right_indices = sorted_indices[median_idx + 1..].to_vec();
968
969 KDTreeChunkResult {
970 node_index: split_point_idx,
971 is_leaf: false,
972 splitting_dimension,
973 split_value,
974 left_indices,
975 right_indices,
976 }
977 }
978
979 pub fn submit_work(&self, _workitems: Vec<WorkItem>) -> SpatialResult<()> {
981 self.total_work.store(_workitems.len(), Ordering::Relaxed);
982 self.completed_work.store(0, Ordering::Relaxed);
983
984 let mut global_queue = self.global_queue.lock().expect("Operation failed");
985 for item in _workitems {
986 global_queue.push_back(item);
987 }
988 drop(global_queue);
989
990 Ok(())
991 }
992
993 pub fn wait_for_completion(&self) -> SpatialResult<()> {
995 let total = self.total_work.load(Ordering::Relaxed);
996
997 while self.completed_work.load(Ordering::Relaxed) < total {
998 thread::sleep(Duration::from_millis(1));
999 }
1000
1001 Ok(())
1002 }
1003
1004 pub fn progress(&self) -> (usize, usize) {
1006 let completed = self.completed_work.load(Ordering::Relaxed);
1007 let total = self.total_work.load(Ordering::Relaxed);
1008 (completed, total)
1009 }
1010
1011 pub fn statistics(&self) -> PoolStatistics {
1013 PoolStatistics {
1014 num_threads: self.workers.len(),
1015 numa_nodes: self.numa_topology.num_nodes,
1016 active_workers: self.active_workers.load(Ordering::Relaxed),
1017 completed_work: self.completed_work.load(Ordering::Relaxed),
1018 total_work: self.total_work.load(Ordering::Relaxed),
1019 queue_depth: self.global_queue.lock().expect("Operation failed").len(),
1020 }
1021 }
1022}
1023
1024impl Drop for WorkStealingPool {
1025 fn drop(&mut self) {
1026 self.shutdown.store(true, Ordering::Relaxed);
1028
1029 for worker in &mut self.workers {
1031 if let Some(handle) = worker.thread_handle.take() {
1032 let _ = handle.join();
1033 }
1034 }
1035 }
1036}
1037
1038#[derive(Debug, Clone)]
1040pub struct PoolStatistics {
1041 pub num_threads: usize,
1042 pub numa_nodes: usize,
1043 pub active_workers: usize,
1044 pub completed_work: usize,
1045 pub total_work: usize,
1046 pub queue_depth: usize,
1047}
1048
1049pub struct AdvancedParallelDistanceMatrix {
1051 pool: WorkStealingPool,
1052 config: WorkStealingConfig,
1053}
1054
1055impl AdvancedParallelDistanceMatrix {
1056 pub fn new(config: WorkStealingConfig) -> SpatialResult<Self> {
1058 let pool = WorkStealingPool::new(config.clone())?;
1059 Ok(Self { pool, config })
1060 }
1061
1062 pub fn compute_parallel(&self, points: &ArrayView2<'_, f64>) -> SpatialResult<Array2<f64>> {
1064 let n_points = points.nrows();
1065 let n_pairs = n_points * (n_points - 1) / 2;
1066 let mut result_matrix = Array2::zeros((n_points, n_points));
1067
1068 type DistanceResult = (usize, usize, f64);
1070 let (result_sender, result_receiver): (Sender<DistanceResult>, Receiver<DistanceResult>) =
1071 channel();
1072
1073 let _distance_context = DistanceMatrixContext {
1075 points: points.to_owned(),
1076 result_sender,
1077 };
1078
1079 let chunk_size = self.config.initial_chunk_size;
1084 let mut work_items = Vec::new();
1085
1086 for chunk_start in (0..n_pairs).step_by(chunk_size) {
1087 let chunk_end = (chunk_start + chunk_size).min(n_pairs);
1088 work_items.push(WorkItem {
1089 start: chunk_start,
1090 end: chunk_end,
1091 work_type: WorkType::DistanceMatrix,
1092 priority: 1,
1093 numa_hint: None,
1094 });
1095 }
1096
1097 self.pool.submit_work(work_items)?;
1099
1100 let mut collected_results = 0;
1102 let timeout = Duration::from_secs(2); let start_time = std::time::Instant::now();
1104
1105 while collected_results < n_pairs && start_time.elapsed() < timeout {
1106 if let Ok((i, j, distance)) = result_receiver.try_recv() {
1107 if i < n_points && j < n_points {
1108 result_matrix[[i, j]] = distance;
1109 result_matrix[[j, i]] = distance;
1110 collected_results += 1;
1111 }
1112 } else {
1113 thread::sleep(Duration::from_millis(1));
1114 }
1115 }
1116
1117 self.pool.wait_for_completion()?;
1119
1120 if collected_results < n_pairs {
1122 let optimizer = HardwareOptimizedDistances::new();
1123
1124 for i in 0..n_points {
1125 for j in (i + 1)..n_points {
1126 if result_matrix[[i, j]] == 0.0 && i != j {
1127 let point_i = points.row(i);
1128 let point_j = points.row(j);
1129
1130 if let Ok(distance) =
1131 optimizer.euclidean_distance_optimized(&point_i, &point_j)
1132 {
1133 result_matrix[[i, j]] = distance;
1134 result_matrix[[j, i]] = distance;
1135 }
1136 }
1137 }
1138 }
1139 }
1140
1141 Ok(result_matrix)
1142 }
1143
1144 pub fn statistics(&self) -> PoolStatistics {
1146 self.pool.statistics()
1147 }
1148}
1149
1150pub struct AdvancedParallelKMeans {
1152 pool: WorkStealingPool,
1153 config: WorkStealingConfig,
1154 k: usize,
1155}
1156
1157impl AdvancedParallelKMeans {
1158 pub fn new(k: usize, config: WorkStealingConfig) -> SpatialResult<Self> {
1160 let pool = WorkStealingPool::new(config.clone())?;
1161 Ok(Self { pool, config, k })
1162 }
1163
1164 pub fn fit_parallel(
1166 &self,
1167 points: &ArrayView2<'_, f64>,
1168 ) -> SpatialResult<(Array2<f64>, Array1<usize>)> {
1169 let n_points = points.nrows();
1170 let n_dims = points.ncols();
1171
1172 let chunk_size = self.config.initial_chunk_size;
1174 let mut work_items = Vec::new();
1175
1176 for chunk_start in (0..n_points).step_by(chunk_size) {
1177 let chunk_end = (chunk_start + chunk_size).min(n_points);
1178 work_items.push(WorkItem {
1179 start: chunk_start,
1180 end: chunk_end,
1181 work_type: WorkType::KMeansClustering,
1182 priority: 1,
1183 numa_hint: None,
1184 });
1185 }
1186
1187 self.pool.submit_work(work_items)?;
1189 self.pool.wait_for_completion()?;
1190
1191 let centroids = Array2::zeros((self.k, n_dims));
1194 let assignments = Array1::zeros(n_points);
1195
1196 Ok((centroids, assignments))
1197 }
1198}
1199
1200static GLOBAL_WORK_STEALING_POOL: std::sync::OnceLock<Mutex<Option<WorkStealingPool>>> =
1202 std::sync::OnceLock::new();
1203
1204#[allow(dead_code)]
1206pub fn global_work_stealing_pool() -> SpatialResult<&'static Mutex<Option<WorkStealingPool>>> {
1207 Ok(GLOBAL_WORK_STEALING_POOL.get_or_init(|| Mutex::new(None)))
1208}
1209
1210#[allow(dead_code)]
1212pub fn initialize_global_pool(config: WorkStealingConfig) -> SpatialResult<()> {
1213 let pool_mutex = global_work_stealing_pool()?;
1214 let mut pool_guard = pool_mutex.lock().expect("Operation failed");
1215
1216 if pool_guard.is_none() {
1217 *pool_guard = Some(WorkStealingPool::new(config)?);
1218 }
1219
1220 Ok(())
1221}
1222
1223#[allow(dead_code)]
1225pub fn get_numa_topology() -> NumaTopology {
1226 NumaTopology::detect()
1227}
1228
1229#[allow(dead_code)]
1231pub fn report_advanced_parallel_capabilities() {
1232 let topology = get_numa_topology();
1233 let total_cores: usize = topology.cores_per_node.iter().sum();
1234
1235 println!("Advanced-Parallel Processing Capabilities:");
1236 println!(" Total CPU cores: {total_cores}");
1237 println!(" NUMA nodes: {}", topology.num_nodes);
1238
1239 for (node, &cores) in topology.cores_per_node.iter().enumerate() {
1240 let memory_gb = topology.memory_per_node[node] as f64 / (1024.0 * 1024.0 * 1024.0);
1241 println!(" Node {node}: {cores} cores, {memory_gb:.1} GB memory");
1242 }
1243
1244 println!(" Work-stealing: Available");
1245 println!(" NUMA-aware allocation: Available");
1246 println!(" Thread affinity: Available");
1247
1248 let caps = PlatformCapabilities::detect();
1249 if caps.simd_available {
1250 println!(" SIMD acceleration: Available");
1251 if caps.avx512_available {
1252 println!(" AVX-512: Available");
1253 } else if caps.avx2_available {
1254 println!(" AVX2: Available");
1255 }
1256 }
1257}
1258
1259#[cfg(test)]
1260mod tests {
1261 use super::*;
1262 use scirs2_core::ndarray::array;
1263
1264 #[test]
1265 fn test_work_stealing_config() {
1266 let config = WorkStealingConfig::new()
1267 .with_numa_aware(true)
1268 .with_work_stealing(true)
1269 .with_threads(8);
1270
1271 assert!(config.numa_aware);
1272 assert!(config.work_stealing);
1273 assert_eq!(config.num_threads, 8);
1274 }
1275
1276 #[test]
1277 fn test_numa_topology_detection() {
1278 let topology = NumaTopology::detect();
1279
1280 assert!(topology.num_nodes > 0);
1281 assert!(!topology.cores_per_node.is_empty());
1282 assert_eq!(topology.cores_per_node.len(), topology.num_nodes);
1283 assert_eq!(topology.memory_per_node.len(), topology.num_nodes);
1284 }
1285
1286 #[test]
1287 fn test_work_item_creation() {
1288 let item = WorkItem {
1289 start: 0,
1290 end: 100,
1291 work_type: WorkType::DistanceMatrix,
1292 priority: 1,
1293 numa_hint: Some(0),
1294 };
1295
1296 assert_eq!(item.start, 0);
1297 assert_eq!(item.end, 100);
1298 assert_eq!(item.work_type, WorkType::DistanceMatrix);
1299 assert_eq!(item.priority, 1);
1300 assert_eq!(item.numa_hint, Some(0));
1301 }
1302
1303 #[test]
1304 fn test_work_stealing_pool_creation() {
1305 let config = WorkStealingConfig::new().with_threads(1); let pool = WorkStealingPool::new(config);
1307
1308 assert!(pool.is_ok());
1309 let pool = pool.expect("Operation failed");
1310 assert_eq!(pool.workers.len(), 1);
1311 }
1312
1313 #[test]
1314 fn test_advanced_parallel_distance_matrix() {
1315 let _points = array![[0.0, 0.0], [1.0, 0.0]];
1317 let config = WorkStealingConfig::new().with_threads(1);
1318
1319 let processor = AdvancedParallelDistanceMatrix::new(config);
1320 assert!(processor.is_ok());
1321
1322 let processor = processor.expect("Operation failed");
1324 let stats = processor.statistics();
1325 assert_eq!(stats.num_threads, 1);
1326 }
1327
1328 #[test]
1329 fn test_advanced_parallel_kmeans() {
1330 let points = array![[0.0, 0.0], [1.0, 1.0]];
1332 let config = WorkStealingConfig::new().with_threads(1); let kmeans = AdvancedParallelKMeans::new(1, config); assert!(kmeans.is_ok());
1336
1337 let kmeans = kmeans.expect("Operation failed");
1338 let result = kmeans.fit_parallel(&points.view());
1339 assert!(result.is_ok());
1340
1341 let (centroids, assignments) = result.expect("Operation failed");
1342 assert_eq!(centroids.dim(), (1, 2));
1343 assert_eq!(assignments.len(), 2);
1344 }
1345
1346 #[test]
1347 fn test_global_functions() {
1348 let _topology = get_numa_topology();
1350 report_advanced_parallel_capabilities();
1351
1352 let config = WorkStealingConfig::new().with_threads(1);
1353 let init_result = initialize_global_pool(config);
1354 assert!(init_result.is_ok());
1355 }
1356
1357 #[test]
1358 fn test_work_context_structures() {
1359 let (sender, _receiver) = channel::<(usize, usize, f64)>();
1361
1362 let distance_context = DistanceMatrixContext {
1363 points: Array2::zeros((4, 2)),
1364 result_sender: sender,
1365 };
1366
1367 let work_context = WorkContext {
1368 distance_context: Some(distance_context),
1369 kmeans_context: None,
1370 kdtree_context: None,
1371 nn_context: None,
1372 custom_context: None,
1373 };
1374
1375 assert!(work_context.distance_context.is_some());
1377 }
1378
1379 #[test]
1380 fn test_linear_to_matrix_indices() {
1381 let n = 4;
1382 let expected_pairs = [(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)];
1383
1384 for (_linearidx, expected) in expected_pairs.iter().enumerate() {
1385 let result = WorkStealingPool::linear_to_matrix_indices(_linearidx, n);
1386 assert_eq!(result, *expected, "Failed for linear index {_linearidx}");
1387 }
1388 }
1389
1390 #[test]
1391 fn test_kdtree_chunk_result() {
1392 let chunk_result = KDTreeChunkResult {
1393 node_index: 0,
1394 is_leaf: true,
1395 splitting_dimension: 0,
1396 split_value: 1.0,
1397 left_indices: Vec::new(),
1398 right_indices: Vec::new(),
1399 };
1400
1401 assert!(chunk_result.is_leaf);
1402 assert_eq!(chunk_result.node_index, 0);
1403 assert_eq!(chunk_result.splitting_dimension, 0);
1404 }
1405
1406 #[test]
1407 fn test_enhanced_distance_matrix_computation() {
1408 let _points = array![[0.0, 0.0], [1.0, 0.0]];
1410 let config = WorkStealingConfig::new().with_threads(1);
1411
1412 let processor = AdvancedParallelDistanceMatrix::new(config);
1413 assert!(processor.is_ok());
1414
1415 let processor = processor.expect("Operation failed");
1417 let stats = processor.statistics();
1418 assert_eq!(stats.num_threads, 1);
1419 assert_eq!(stats.numa_nodes, 1);
1420 }
1421
1422 #[test]
1423 fn test_enhanced_kmeans_with_context() {
1424 let points = array![[0.0, 0.0], [1.0, 1.0]];
1426 let config = WorkStealingConfig::new().with_threads(1); let kmeans = AdvancedParallelKMeans::new(1, config); assert!(kmeans.is_ok());
1430
1431 let kmeans = kmeans.expect("Operation failed");
1432 let result = kmeans.fit_parallel(&points.view());
1433 assert!(result.is_ok());
1434
1435 let (centroids, assignments) = result.expect("Operation failed");
1436 assert_eq!(centroids.dim(), (1, 2));
1437 assert_eq!(assignments.len(), 2);
1438 }
1439
1440 #[test]
1441 fn test_numa_topology_detailed() {
1442 let topology = NumaTopology::detect();
1443
1444 assert!(topology.num_nodes > 0);
1445 assert_eq!(topology.cores_per_node.len(), topology.num_nodes);
1446 assert_eq!(topology.memory_per_node.len(), topology.num_nodes);
1447 assert_eq!(topology.distance_matrix.len(), topology.num_nodes);
1448
1449 for node in 0..topology.num_nodes {
1451 let threads = topology.optimal_threads_per_node(node);
1452 assert!(threads > 0);
1453 }
1454
1455 for node in 0..topology.num_nodes {
1457 let _capacity = topology.memory_capacity(node);
1458 }
1460 }
1461
1462 #[test]
1463 fn test_work_stealing_configuration_advanced() {
1464 let config = WorkStealingConfig::new()
1465 .with_numa_aware(true)
1466 .with_work_stealing(true)
1467 .with_adaptive_scheduling(true)
1468 .with_threads(4)
1469 .with_chunk_sizes(512, 32)
1470 .with_thread_affinity(ThreadAffinityStrategy::NumaAware)
1471 .with_memory_strategy(MemoryStrategy::NumaInterleaved);
1472
1473 assert!(config.numa_aware);
1474 assert!(config.work_stealing);
1475 assert!(config.adaptive_scheduling);
1476 assert_eq!(config.num_threads, 4);
1477 assert_eq!(config.initial_chunk_size, 512);
1478 assert_eq!(config.min_chunk_size, 32);
1479 assert_eq!(config.thread_affinity, ThreadAffinityStrategy::NumaAware);
1480 assert_eq!(config.memory_strategy, MemoryStrategy::NumaInterleaved);
1481 }
1482}