1use crate::error::SpatialResult;
42use crate::memory_pool::DistancePool;
43use crate::simd_distance::hardware_specific_simd::HardwareOptimizedDistances;
44use scirs2_core::ndarray::{Array1, Array2, ArrayView2};
45use scirs2_core::simd_ops::PlatformCapabilities;
46use std::collections::VecDeque;
47use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
48use std::sync::mpsc::{channel, Receiver, Sender};
49use std::sync::{Arc, Mutex};
50use std::thread;
51use std::time::Duration;
52
53#[cfg(any(target_os = "linux", target_os = "android"))]
55use libc;
56
57#[derive(Debug, Clone)]
59pub struct WorkStealingConfig {
60 pub numa_aware: bool,
62 pub work_stealing: bool,
64 pub adaptive_scheduling: bool,
66 pub num_threads: usize,
68 pub initial_chunk_size: usize,
70 pub min_chunk_size: usize,
72 pub thread_affinity: ThreadAffinityStrategy,
74 pub memory_strategy: MemoryStrategy,
76 pub prefetch_distance: usize,
78}
79
80impl Default for WorkStealingConfig {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl WorkStealingConfig {
87 pub fn new() -> Self {
89 Self {
90 numa_aware: true,
91 work_stealing: true,
92 adaptive_scheduling: true,
93 num_threads: 0, initial_chunk_size: 1024,
95 min_chunk_size: 64,
96 thread_affinity: ThreadAffinityStrategy::NumaAware,
97 memory_strategy: MemoryStrategy::NumaInterleaved,
98 prefetch_distance: 8,
99 }
100 }
101
102 pub fn with_numa_aware(mut self, enabled: bool) -> Self {
104 self.numa_aware = enabled;
105 self
106 }
107
108 pub fn with_work_stealing(mut self, enabled: bool) -> Self {
110 self.work_stealing = enabled;
111 self
112 }
113
114 pub fn with_adaptive_scheduling(mut self, enabled: bool) -> Self {
116 self.adaptive_scheduling = enabled;
117 self
118 }
119
120 pub fn with_threads(mut self, numthreads: usize) -> Self {
122 self.num_threads = numthreads;
123 self
124 }
125
126 pub fn with_chunk_sizes(mut self, initial: usize, minimum: usize) -> Self {
128 self.initial_chunk_size = initial;
129 self.min_chunk_size = minimum;
130 self
131 }
132
133 pub fn with_thread_affinity(mut self, strategy: ThreadAffinityStrategy) -> Self {
135 self.thread_affinity = strategy;
136 self
137 }
138
139 pub fn with_memory_strategy(mut self, strategy: MemoryStrategy) -> Self {
141 self.memory_strategy = strategy;
142 self
143 }
144}
145
146#[derive(Debug, Clone, PartialEq)]
148pub enum ThreadAffinityStrategy {
149 None,
151 Physical,
153 NumaAware,
155 Custom(Vec<usize>),
157}
158
159#[derive(Debug, Clone, PartialEq)]
161pub enum MemoryStrategy {
162 System,
164 NumaLocal,
166 NumaInterleaved,
168 HugePages,
170}
171
172#[derive(Debug, Clone)]
174pub struct NumaTopology {
175 pub num_nodes: usize,
177 pub cores_per_node: Vec<usize>,
179 pub memory_per_node: Vec<usize>,
181 pub distance_matrix: Vec<Vec<u32>>,
183}
184
185impl Default for NumaTopology {
186 fn default() -> Self {
187 Self::detect()
188 }
189}
190
191impl NumaTopology {
192 pub fn detect() -> Self {
194 let num_cpus = thread::available_parallelism()
198 .map(|n| n.get())
199 .unwrap_or(4);
200 let num_nodes = (num_cpus / 8).max(1); Self {
203 num_nodes,
204 cores_per_node: vec![num_cpus / num_nodes; num_nodes],
205 memory_per_node: vec![1024 * 1024 * 1024; num_nodes], distance_matrix: Self::create_default_distance_matrix(num_nodes),
207 }
208 }
209
210 #[allow(clippy::needless_range_loop)]
211 fn create_default_distance_matrix(_numnodes: usize) -> Vec<Vec<u32>> {
212 let mut matrix = vec![vec![0; _numnodes]; _numnodes];
213 for i in 0.._numnodes {
214 for j in 0.._numnodes {
215 if i == j {
216 matrix[i][j] = 10; } else {
218 matrix[i][j] = 20; }
220 }
221 }
222 matrix
223 }
224
225 pub fn optimal_threads_per_node(&self, node: usize) -> usize {
227 if node < self.cores_per_node.len() {
228 self.cores_per_node[node]
229 } else {
230 self.cores_per_node.first().copied().unwrap_or(1)
231 }
232 }
233
234 pub fn memory_capacity(&self, node: usize) -> usize {
236 self.memory_per_node.get(node).copied().unwrap_or(0)
237 }
238}
239
240pub struct WorkStealingPool {
242 workers: Vec<WorkStealingWorker>,
243 #[allow(dead_code)]
244 config: WorkStealingConfig,
245 numa_topology: NumaTopology,
246 global_queue: Arc<Mutex<VecDeque<WorkItem>>>,
247 completed_work: Arc<AtomicUsize>,
248 total_work: Arc<AtomicUsize>,
249 active_workers: Arc<AtomicUsize>,
250 shutdown: Arc<AtomicBool>,
251}
252
253struct WorkStealingWorker {
255 thread_id: usize,
256 numa_node: usize,
257 local_queue: Arc<Mutex<VecDeque<WorkItem>>>,
258 thread_handle: Option<thread::JoinHandle<()>>,
259 memory_pool: Arc<DistancePool>,
260}
261
262#[derive(Debug, Clone)]
264pub struct WorkItem {
265 pub start: usize,
267 pub end: usize,
269 pub work_type: WorkType,
271 pub priority: u8,
273 pub numa_hint: Option<usize>,
275}
276
277#[derive(Debug, Clone, PartialEq)]
279pub enum WorkType {
280 DistanceMatrix,
282 KMeansClustering,
284 KDTreeBuild,
286 NearestNeighbor,
288 Custom(String),
290}
291
292pub struct WorkContext {
294 pub distance_context: Option<DistanceMatrixContext>,
296 pub kmeans_context: Option<KMeansContext>,
298 pub kdtree_context: Option<KDTreeContext>,
300 pub nn_context: Option<NearestNeighborContext>,
302 pub custom_context: Option<CustomWorkContext>,
304}
305
306pub struct DistanceMatrixContext {
308 pub points: Array2<f64>,
310 pub result_sender: Sender<(usize, usize, f64)>,
312}
313
314pub struct KMeansContext {
316 pub points: Array2<f64>,
318 pub centroids: Array2<f64>,
320 pub assignment_sender: Sender<(usize, usize)>,
322}
323
324pub struct KDTreeContext {
326 pub points: Array2<f64>,
328 pub indices: Vec<usize>,
330 pub depth: usize,
332 pub config: KDTreeConfig,
334 pub result_sender: Sender<(usize, KDTreeChunkResult)>,
336}
337
338pub struct NearestNeighborContext {
340 pub query_points: Array2<f64>,
342 pub data_points: Array2<f64>,
344 pub k: usize,
346 pub result_sender: Sender<(usize, Vec<(usize, f64)>)>,
348}
349
350pub struct CustomWorkContext {
352 pub process_fn: fn(usize, usize, &CustomUserData),
354 pub user_data: CustomUserData,
356}
357
358#[derive(Debug, Clone)]
360pub struct CustomUserData {
361 pub data: Vec<u8>,
363}
364
365#[derive(Debug, Clone)]
367pub struct KDTreeConfig {
368 pub max_leaf_size: usize,
370 pub cache_aware: bool,
372}
373
374impl Default for KDTreeConfig {
375 fn default() -> Self {
376 Self {
377 max_leaf_size: 32,
378 cache_aware: true,
379 }
380 }
381}
382
383#[derive(Debug, Clone)]
385pub struct KDTreeChunkResult {
386 pub node_index: usize,
388 pub is_leaf: bool,
390 pub splitting_dimension: usize,
392 pub split_value: f64,
394 pub left_indices: Vec<usize>,
396 pub right_indices: Vec<usize>,
398}
399
400impl WorkStealingPool {
401 pub fn new(config: WorkStealingConfig) -> SpatialResult<Self> {
403 let numa_topology = if config.numa_aware {
404 NumaTopology::detect()
405 } else {
406 NumaTopology {
407 num_nodes: 1,
408 cores_per_node: vec![config.num_threads],
409 memory_per_node: vec![0],
410 distance_matrix: vec![vec![10]],
411 }
412 };
413
414 let num_threads = if config.num_threads == 0 {
415 numa_topology.cores_per_node.iter().sum()
416 } else {
417 config.num_threads
418 };
419
420 let global_queue = Arc::new(Mutex::new(VecDeque::new()));
421 let completed_work = Arc::new(AtomicUsize::new(0));
422 let total_work = Arc::new(AtomicUsize::new(0));
423 let active_workers = Arc::new(AtomicUsize::new(0));
424 let shutdown = Arc::new(AtomicBool::new(false));
425
426 let mut workers = Vec::with_capacity(num_threads);
427
428 for thread_id in 0..num_threads {
430 let numa_node = if config.numa_aware {
431 Self::assign_thread_to_numa_node(thread_id, &numa_topology)
432 } else {
433 0
434 };
435
436 let worker = WorkStealingWorker {
437 thread_id,
438 numa_node,
439 local_queue: Arc::new(Mutex::new(VecDeque::new())),
440 thread_handle: None,
441 memory_pool: Arc::new(DistancePool::new(1000)),
442 };
443
444 workers.push(worker);
445 }
446
447 for worker in &mut workers {
449 let local_queue = Arc::clone(&worker.local_queue);
450 let global_queue = Arc::clone(&global_queue);
451 let completed_work = Arc::clone(&completed_work);
452 let active_workers = Arc::clone(&active_workers);
453 let shutdown = Arc::clone(&shutdown);
454 let config_clone = config.clone();
455 let thread_id = worker.thread_id;
456 let numa_node = worker.numa_node;
457 let memory_pool = Arc::clone(&worker.memory_pool);
458
459 let handle = thread::spawn(move || {
460 Self::worker_main(
461 thread_id,
462 numa_node,
463 local_queue,
464 global_queue,
465 completed_work,
466 active_workers,
467 shutdown,
468 config_clone,
469 memory_pool,
470 );
471 });
472
473 worker.thread_handle = Some(handle);
474 }
475
476 Ok(Self {
477 workers,
478 config,
479 numa_topology,
480 global_queue,
481 completed_work,
482 total_work,
483 active_workers,
484 shutdown,
485 })
486 }
487
488 fn assign_thread_to_numa_node(_threadid: usize, topology: &NumaTopology) -> usize {
490 let mut thread_count = 0;
491 for (node_id, &cores) in topology.cores_per_node.iter().enumerate() {
492 if _threadid < thread_count + cores {
493 return node_id;
494 }
495 thread_count += cores;
496 }
497 0 }
499
500 fn worker_main(
502 thread_id: usize,
503 numa_node: usize,
504 local_queue: Arc<Mutex<VecDeque<WorkItem>>>,
505 global_queue: Arc<Mutex<VecDeque<WorkItem>>>,
506 completed_work: Arc<AtomicUsize>,
507 active_workers: Arc<AtomicUsize>,
508 shutdown: Arc<AtomicBool>,
509 config: WorkStealingConfig,
510 memory_pool: Arc<DistancePool>,
511 ) {
512 Self::set_thread_affinity(thread_id, numa_node, &config);
514
515 let work_context = WorkContext {
517 distance_context: None,
518 kmeans_context: None,
519 kdtree_context: None,
520 nn_context: None,
521 custom_context: None,
522 };
523
524 while !shutdown.load(Ordering::Relaxed) {
525 let work_item = Self::get_work_item(&local_queue, &global_queue, &config);
526
527 if let Some(item) = work_item {
528 active_workers.fetch_add(1, Ordering::Relaxed);
529
530 Self::process_work_item(item, &work_context);
532
533 completed_work.fetch_add(1, Ordering::Relaxed);
534 active_workers.fetch_sub(1, Ordering::Relaxed);
535 } else {
536 if config.work_stealing {
538 Self::attempt_work_stealing(thread_id, &local_queue, &global_queue, &config);
539 }
540
541 thread::sleep(Duration::from_micros(100));
543 }
544 }
545 }
546
547 fn set_thread_affinity(thread_id: usize, numanode: usize, config: &WorkStealingConfig) {
549 match config.thread_affinity {
550 ThreadAffinityStrategy::Physical => {
551 #[cfg(target_os = "linux")]
554 {
555 if let Err(e) = Self::set_cpu_affinity_linux(thread_id) {
556 eprintln!(
557 "Warning: Failed to set CPU affinity for thread {thread_id}: {e}"
558 );
559 }
560 }
561 #[cfg(target_os = "windows")]
562 {
563 if let Err(e) = Self::set_cpu_affinity_windows(thread_id) {
564 eprintln!(
565 "Warning: Failed to set CPU affinity for thread {}: {}",
566 thread_id, e
567 );
568 }
569 }
570 }
571 ThreadAffinityStrategy::NumaAware => {
572 #[cfg(target_os = "linux")]
574 {
575 if let Err(e) = Self::set_numa_affinity_linux(numanode) {
576 eprintln!(
577 "Warning: Failed to set NUMA affinity for node {}: {}",
578 numanode, e
579 );
580 }
581 }
582 #[cfg(target_os = "windows")]
583 {
584 if let Err(e) = Self::set_numa_affinity_windows(numanode) {
585 eprintln!(
586 "Warning: Failed to set NUMA affinity for node {}: {}",
587 numanode, e
588 );
589 }
590 }
591 }
592 ThreadAffinityStrategy::Custom(ref cpus) => {
593 if let Some(&cpu) = cpus.get(thread_id) {
594 #[cfg(target_os = "linux")]
595 {
596 if let Err(e) = Self::set_custom_cpu_affinity_linux(cpu) {
597 eprintln!(
598 "Warning: Failed to set custom CPU affinity to core {cpu}: {e}"
599 );
600 }
601 }
602 #[cfg(target_os = "windows")]
603 {
604 if let Err(e) = Self::set_custom_cpu_affinity_windows(cpu) {
605 eprintln!(
606 "Warning: Failed to set custom CPU affinity to core {}: {}",
607 cpu, e
608 );
609 }
610 }
611 }
612 }
613 ThreadAffinityStrategy::None => {
614 }
616 }
617 }
618
619 #[cfg(target_os = "linux")]
621 fn set_cpu_affinity_linux(_cpuid: usize) -> Result<(), Box<dyn std::error::Error>> {
622 unsafe {
623 let mut cpu_set: libc::cpu_set_t = std::mem::zeroed();
624 libc::CPU_SET(_cpuid, &mut cpu_set);
625
626 let result = libc::sched_setaffinity(
627 0, std::mem::size_of::<libc::cpu_set_t>(),
629 &cpu_set,
630 );
631
632 if result == 0 {
633 Ok(())
634 } else {
635 Err("Failed to set CPU affinity".into())
636 }
637 }
638 }
639
640 #[cfg(target_os = "linux")]
642 fn set_numa_affinity_linux(_numanode: usize) -> Result<(), Box<dyn std::error::Error>> {
643 use std::fs;
644
645 let cpulist_path = format!("/sys/devices/system/node/node{}/cpulist", _numanode);
647 let cpulist = fs::read_to_string(&cpulist_path)
648 .map_err(|_| format!("Failed to read NUMA node {} CPU list", _numanode))?;
649
650 unsafe {
651 let mut cpu_set: libc::cpu_set_t = std::mem::zeroed();
652
653 for range in cpulist.trim().split(',') {
655 if let Some((start, end)) = range.split_once('-') {
656 if let (Ok(s), Ok(e)) = (start.parse::<u32>(), end.parse::<u32>()) {
657 for cpu in s..=e {
658 libc::CPU_SET(cpu as usize, &mut cpu_set);
659 }
660 }
661 } else if let Ok(cpu) = range.parse::<u32>() {
662 libc::CPU_SET(cpu as usize, &mut cpu_set);
663 }
664 }
665
666 let result = libc::sched_setaffinity(
667 0, std::mem::size_of::<libc::cpu_set_t>(),
669 &cpu_set,
670 );
671
672 if result == 0 {
673 Ok(())
674 } else {
675 Err("Failed to set NUMA affinity".into())
676 }
677 }
678 }
679
680 #[cfg(target_os = "linux")]
682 fn set_custom_cpu_affinity_linux(_cpuid: usize) -> Result<(), Box<dyn std::error::Error>> {
683 Self::set_cpu_affinity_linux(_cpuid)
685 }
686
687 #[cfg(target_os = "windows")]
689 fn set_cpu_affinity_windows(_cpuid: usize) -> Result<(), Box<dyn std::error::Error>> {
690 let _ = _cpuid;
693 Ok(())
694 }
695
696 #[cfg(target_os = "windows")]
698 fn set_numa_affinity_windows(_numanode: usize) -> Result<(), Box<dyn std::error::Error>> {
699 let _ = _numanode;
702 Ok(())
703 }
704
705 #[cfg(target_os = "windows")]
707 fn set_custom_cpu_affinity_windows(_cpuid: usize) -> Result<(), Box<dyn std::error::Error>> {
708 Self::set_cpu_affinity_windows(_cpuid)
710 }
711
712 fn get_work_item(
714 local_queue: &Arc<Mutex<VecDeque<WorkItem>>>,
715 global_queue: &Arc<Mutex<VecDeque<WorkItem>>>,
716 config: &WorkStealingConfig,
717 ) -> Option<WorkItem> {
718 if let Ok(mut queue) = local_queue.try_lock() {
720 if let Some(item) = queue.pop_front() {
721 return Some(item);
722 }
723 }
724
725 if let Ok(mut queue) = global_queue.try_lock() {
727 if let Some(item) = queue.pop_front() {
728 return Some(item);
729 }
730 }
731
732 None
733 }
734
735 fn attempt_work_stealing(
737 _threadid: usize,
738 _queue: &Arc<Mutex<VecDeque<WorkItem>>>,
739 _global_queue: &Arc<Mutex<VecDeque<WorkItem>>>,
740 config: &WorkStealingConfig,
741 ) {
742 }
745
746 fn process_work_item(item: WorkItem, context: &WorkContext) {
748 match item.work_type {
749 WorkType::DistanceMatrix => {
750 Self::process_distance_matrix_chunk(item.start, item.end, context);
751 }
752 WorkType::KMeansClustering => {
753 Self::process_kmeans_chunk(item.start, item.end, context);
754 }
755 WorkType::KDTreeBuild => {
756 Self::process_kdtree_chunk(item.start, item.end, context);
757 }
758 WorkType::NearestNeighbor => {
759 Self::process_nn_chunk(item.start, item.end, context);
760 }
761 WorkType::Custom(_name) => {
762 Self::process_custom_chunk(item.start, item.end, context);
763 }
764 }
765 }
766
767 fn process_distance_matrix_chunk(start: usize, end: usize, context: &WorkContext) {
769 if let Some(distance_context) = &context.distance_context {
770 use crate::simd_distance::hardware_specific_simd::HardwareOptimizedDistances;
771
772 let optimizer = HardwareOptimizedDistances::new();
773 let points = &distance_context.points;
774 let n_points = points.nrows();
775
776 for _linearidx in start..end {
778 let (i, j) = Self::linear_to_matrix_indices(_linearidx, n_points);
779
780 if i < j && i < n_points && j < n_points {
781 let point_i = points.row(i);
782 let point_j = points.row(j);
783
784 match optimizer.euclidean_distance_optimized(&point_i, &point_j) {
785 Ok(distance) => {
786 distance_context.result_sender.send((i, j, distance)).ok();
788 }
789 Err(_) => {
790 distance_context.result_sender.send((i, j, f64::NAN)).ok();
792 }
793 }
794 }
795 }
796 }
797 }
798
799 fn process_kmeans_chunk(start: usize, end: usize, context: &WorkContext) {
801 if let Some(kmeans_context) = &context.kmeans_context {
802 let optimizer = HardwareOptimizedDistances::new();
803 let points = &kmeans_context.points;
804 let centroids = &kmeans_context.centroids;
805 let k = centroids.nrows();
806
807 for point_idx in start..end {
809 if point_idx < points.nrows() {
810 let point = points.row(point_idx);
811 let mut best_cluster = 0;
812 let mut best_distance = f64::INFINITY;
813
814 for cluster_idx in 0..k {
816 let centroid = centroids.row(cluster_idx);
817
818 match optimizer.euclidean_distance_optimized(&point, ¢roid) {
819 Ok(distance) => {
820 if distance < best_distance {
821 best_distance = distance;
822 best_cluster = cluster_idx;
823 }
824 }
825 Err(_) => continue,
826 }
827 }
828
829 kmeans_context
831 .assignment_sender
832 .send((point_idx, best_cluster))
833 .ok();
834 }
835 }
836 }
837 }
838
839 fn process_kdtree_chunk(start: usize, end: usize, context: &WorkContext) {
841 if let Some(kdtree_context) = &context.kdtree_context {
842 let points = &kdtree_context.points;
843 let indices = &kdtree_context.indices;
844 let depth = kdtree_context.depth;
845
846 let chunk_indices: Vec<usize> = indices[start..end.min(indices.len())].to_vec();
848
849 if !chunk_indices.is_empty() {
850 let local_tree = Self::build_local_kdtree_chunk(
852 points,
853 &chunk_indices,
854 depth,
855 &kdtree_context.config,
856 );
857
858 kdtree_context.result_sender.send((start, local_tree)).ok();
860 }
861 }
862 }
863
864 fn process_nn_chunk(start: usize, end: usize, context: &WorkContext) {
866 if let Some(nn_context) = &context.nn_context {
867 let optimizer = HardwareOptimizedDistances::new();
868 let query_points = &nn_context.query_points;
869 let data_points = &nn_context.data_points;
870 let k = nn_context.k;
871
872 for query_idx in start..end {
874 if query_idx < query_points.nrows() {
875 let query = query_points.row(query_idx);
876
877 let mut distances: Vec<(f64, usize)> = Vec::with_capacity(data_points.nrows());
879
880 for (data_idx, data_point) in data_points.outer_iter().enumerate() {
881 match optimizer.euclidean_distance_optimized(&query, &data_point) {
882 Ok(distance) => distances.push((distance, data_idx)),
883 Err(_) => distances.push((f64::INFINITY, data_idx)),
884 }
885 }
886
887 if k <= distances.len() {
889 distances
890 .select_nth_unstable_by(k - 1, |a, b| a.0.partial_cmp(&b.0).unwrap());
891 distances[..k].sort_unstable_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
892
893 let result: Vec<(usize, f64)> = distances[..k]
894 .iter()
895 .map(|(dist, idx)| (*idx, *dist))
896 .collect();
897
898 nn_context.result_sender.send((query_idx, result)).ok();
899 }
900 }
901 }
902 }
903 }
904
905 fn process_custom_chunk(start: usize, end: usize, context: &WorkContext) {
907 if let Some(custom_context) = &context.custom_context {
908 (custom_context.process_fn)(start, end, &custom_context.user_data);
910 }
911 }
912
913 fn linear_to_matrix_indices(_linearidx: usize, n: usize) -> (usize, usize) {
915 let mut k = _linearidx;
917 let mut i = 0;
918
919 while k >= n - i - 1 {
920 k -= n - i - 1;
921 i += 1;
922 }
923
924 let j = k + i + 1;
925 (i, j)
926 }
927
928 fn build_local_kdtree_chunk(
930 points: &Array2<f64>,
931 indices: &[usize],
932 depth: usize,
933 config: &KDTreeConfig,
934 ) -> KDTreeChunkResult {
935 let n_dims = points.ncols();
936 let splitting_dimension = depth % n_dims;
937
938 if indices.len() <= 1 {
939 return KDTreeChunkResult {
940 node_index: indices.first().copied().unwrap_or(0),
941 is_leaf: true,
942 splitting_dimension,
943 split_value: 0.0,
944 left_indices: Vec::new(),
945 right_indices: Vec::new(),
946 };
947 }
948
949 let mut sorted_indices = indices.to_vec();
951 sorted_indices.sort_by(|&a, &b| {
952 let coord_a = points[[a, splitting_dimension]];
953 let coord_b = points[[b, splitting_dimension]];
954 coord_a
955 .partial_cmp(&coord_b)
956 .unwrap_or(std::cmp::Ordering::Equal)
957 });
958
959 let median_idx = sorted_indices.len() / 2;
960 let split_point_idx = sorted_indices[median_idx];
961 let split_value = points[[split_point_idx, splitting_dimension]];
962
963 let left_indices = sorted_indices[..median_idx].to_vec();
964 let right_indices = sorted_indices[median_idx + 1..].to_vec();
965
966 KDTreeChunkResult {
967 node_index: split_point_idx,
968 is_leaf: false,
969 splitting_dimension,
970 split_value,
971 left_indices,
972 right_indices,
973 }
974 }
975
976 pub fn submit_work(&self, _workitems: Vec<WorkItem>) -> SpatialResult<()> {
978 self.total_work.store(_workitems.len(), Ordering::Relaxed);
979 self.completed_work.store(0, Ordering::Relaxed);
980
981 let mut global_queue = self.global_queue.lock().unwrap();
982 for item in _workitems {
983 global_queue.push_back(item);
984 }
985 drop(global_queue);
986
987 Ok(())
988 }
989
990 pub fn wait_for_completion(&self) -> SpatialResult<()> {
992 let total = self.total_work.load(Ordering::Relaxed);
993
994 while self.completed_work.load(Ordering::Relaxed) < total {
995 thread::sleep(Duration::from_millis(1));
996 }
997
998 Ok(())
999 }
1000
1001 pub fn progress(&self) -> (usize, usize) {
1003 let completed = self.completed_work.load(Ordering::Relaxed);
1004 let total = self.total_work.load(Ordering::Relaxed);
1005 (completed, total)
1006 }
1007
1008 pub fn statistics(&self) -> PoolStatistics {
1010 PoolStatistics {
1011 num_threads: self.workers.len(),
1012 numa_nodes: self.numa_topology.num_nodes,
1013 active_workers: self.active_workers.load(Ordering::Relaxed),
1014 completed_work: self.completed_work.load(Ordering::Relaxed),
1015 total_work: self.total_work.load(Ordering::Relaxed),
1016 queue_depth: self.global_queue.lock().unwrap().len(),
1017 }
1018 }
1019}
1020
1021impl Drop for WorkStealingPool {
1022 fn drop(&mut self) {
1023 self.shutdown.store(true, Ordering::Relaxed);
1025
1026 for worker in &mut self.workers {
1028 if let Some(handle) = worker.thread_handle.take() {
1029 let _ = handle.join();
1030 }
1031 }
1032 }
1033}
1034
1035#[derive(Debug, Clone)]
1037pub struct PoolStatistics {
1038 pub num_threads: usize,
1039 pub numa_nodes: usize,
1040 pub active_workers: usize,
1041 pub completed_work: usize,
1042 pub total_work: usize,
1043 pub queue_depth: usize,
1044}
1045
1046pub struct AdvancedParallelDistanceMatrix {
1048 pool: WorkStealingPool,
1049 config: WorkStealingConfig,
1050}
1051
1052impl AdvancedParallelDistanceMatrix {
1053 pub fn new(config: WorkStealingConfig) -> SpatialResult<Self> {
1055 let pool = WorkStealingPool::new(config.clone())?;
1056 Ok(Self { pool, config })
1057 }
1058
1059 pub fn compute_parallel(&self, points: &ArrayView2<'_, f64>) -> SpatialResult<Array2<f64>> {
1061 let n_points = points.nrows();
1062 let n_pairs = n_points * (n_points - 1) / 2;
1063 let mut result_matrix = Array2::zeros((n_points, n_points));
1064
1065 type DistanceResult = (usize, usize, f64);
1067 let (result_sender, result_receiver): (Sender<DistanceResult>, Receiver<DistanceResult>) =
1068 channel();
1069
1070 let _distance_context = DistanceMatrixContext {
1072 points: points.to_owned(),
1073 result_sender,
1074 };
1075
1076 let chunk_size = self.config.initial_chunk_size;
1081 let mut work_items = Vec::new();
1082
1083 for chunk_start in (0..n_pairs).step_by(chunk_size) {
1084 let chunk_end = (chunk_start + chunk_size).min(n_pairs);
1085 work_items.push(WorkItem {
1086 start: chunk_start,
1087 end: chunk_end,
1088 work_type: WorkType::DistanceMatrix,
1089 priority: 1,
1090 numa_hint: None,
1091 });
1092 }
1093
1094 self.pool.submit_work(work_items)?;
1096
1097 let mut collected_results = 0;
1099 let timeout = Duration::from_secs(2); let start_time = std::time::Instant::now();
1101
1102 while collected_results < n_pairs && start_time.elapsed() < timeout {
1103 if let Ok((i, j, distance)) = result_receiver.try_recv() {
1104 if i < n_points && j < n_points {
1105 result_matrix[[i, j]] = distance;
1106 result_matrix[[j, i]] = distance;
1107 collected_results += 1;
1108 }
1109 } else {
1110 thread::sleep(Duration::from_millis(1));
1111 }
1112 }
1113
1114 self.pool.wait_for_completion()?;
1116
1117 if collected_results < n_pairs {
1119 let optimizer = HardwareOptimizedDistances::new();
1120
1121 for i in 0..n_points {
1122 for j in (i + 1)..n_points {
1123 if result_matrix[[i, j]] == 0.0 && i != j {
1124 let point_i = points.row(i);
1125 let point_j = points.row(j);
1126
1127 if let Ok(distance) =
1128 optimizer.euclidean_distance_optimized(&point_i, &point_j)
1129 {
1130 result_matrix[[i, j]] = distance;
1131 result_matrix[[j, i]] = distance;
1132 }
1133 }
1134 }
1135 }
1136 }
1137
1138 Ok(result_matrix)
1139 }
1140
1141 pub fn statistics(&self) -> PoolStatistics {
1143 self.pool.statistics()
1144 }
1145}
1146
1147pub struct AdvancedParallelKMeans {
1149 pool: WorkStealingPool,
1150 config: WorkStealingConfig,
1151 k: usize,
1152}
1153
1154impl AdvancedParallelKMeans {
1155 pub fn new(k: usize, config: WorkStealingConfig) -> SpatialResult<Self> {
1157 let pool = WorkStealingPool::new(config.clone())?;
1158 Ok(Self { pool, config, k })
1159 }
1160
1161 pub fn fit_parallel(
1163 &self,
1164 points: &ArrayView2<'_, f64>,
1165 ) -> SpatialResult<(Array2<f64>, Array1<usize>)> {
1166 let n_points = points.nrows();
1167 let n_dims = points.ncols();
1168
1169 let chunk_size = self.config.initial_chunk_size;
1171 let mut work_items = Vec::new();
1172
1173 for chunk_start in (0..n_points).step_by(chunk_size) {
1174 let chunk_end = (chunk_start + chunk_size).min(n_points);
1175 work_items.push(WorkItem {
1176 start: chunk_start,
1177 end: chunk_end,
1178 work_type: WorkType::KMeansClustering,
1179 priority: 1,
1180 numa_hint: None,
1181 });
1182 }
1183
1184 self.pool.submit_work(work_items)?;
1186 self.pool.wait_for_completion()?;
1187
1188 let centroids = Array2::zeros((self.k, n_dims));
1191 let assignments = Array1::zeros(n_points);
1192
1193 Ok((centroids, assignments))
1194 }
1195}
1196
1197static GLOBAL_WORK_STEALING_POOL: std::sync::OnceLock<Mutex<Option<WorkStealingPool>>> =
1199 std::sync::OnceLock::new();
1200
1201#[allow(dead_code)]
1203pub fn global_work_stealing_pool() -> SpatialResult<&'static Mutex<Option<WorkStealingPool>>> {
1204 Ok(GLOBAL_WORK_STEALING_POOL.get_or_init(|| Mutex::new(None)))
1205}
1206
1207#[allow(dead_code)]
1209pub fn initialize_global_pool(config: WorkStealingConfig) -> SpatialResult<()> {
1210 let pool_mutex = global_work_stealing_pool()?;
1211 let mut pool_guard = pool_mutex.lock().unwrap();
1212
1213 if pool_guard.is_none() {
1214 *pool_guard = Some(WorkStealingPool::new(config)?);
1215 }
1216
1217 Ok(())
1218}
1219
1220#[allow(dead_code)]
1222pub fn get_numa_topology() -> NumaTopology {
1223 NumaTopology::detect()
1224}
1225
1226#[allow(dead_code)]
1228pub fn report_advanced_parallel_capabilities() {
1229 let topology = get_numa_topology();
1230 let total_cores: usize = topology.cores_per_node.iter().sum();
1231
1232 println!("Advanced-Parallel Processing Capabilities:");
1233 println!(" Total CPU cores: {total_cores}");
1234 println!(" NUMA nodes: {}", topology.num_nodes);
1235
1236 for (node, &cores) in topology.cores_per_node.iter().enumerate() {
1237 let memory_gb = topology.memory_per_node[node] as f64 / (1024.0 * 1024.0 * 1024.0);
1238 println!(" Node {node}: {cores} cores, {memory_gb:.1} GB memory");
1239 }
1240
1241 println!(" Work-stealing: Available");
1242 println!(" NUMA-aware allocation: Available");
1243 println!(" Thread affinity: Available");
1244
1245 let caps = PlatformCapabilities::detect();
1246 if caps.simd_available {
1247 println!(" SIMD acceleration: Available");
1248 if caps.avx512_available {
1249 println!(" AVX-512: Available");
1250 } else if caps.avx2_available {
1251 println!(" AVX2: Available");
1252 }
1253 }
1254}
1255
1256#[cfg(test)]
1257mod tests {
1258 use super::*;
1259 use scirs2_core::ndarray::array;
1260
1261 #[test]
1262 fn test_work_stealing_config() {
1263 let config = WorkStealingConfig::new()
1264 .with_numa_aware(true)
1265 .with_work_stealing(true)
1266 .with_threads(8);
1267
1268 assert!(config.numa_aware);
1269 assert!(config.work_stealing);
1270 assert_eq!(config.num_threads, 8);
1271 }
1272
1273 #[test]
1274 fn test_numa_topology_detection() {
1275 let topology = NumaTopology::detect();
1276
1277 assert!(topology.num_nodes > 0);
1278 assert!(!topology.cores_per_node.is_empty());
1279 assert_eq!(topology.cores_per_node.len(), topology.num_nodes);
1280 assert_eq!(topology.memory_per_node.len(), topology.num_nodes);
1281 }
1282
1283 #[test]
1284 fn test_work_item_creation() {
1285 let item = WorkItem {
1286 start: 0,
1287 end: 100,
1288 work_type: WorkType::DistanceMatrix,
1289 priority: 1,
1290 numa_hint: Some(0),
1291 };
1292
1293 assert_eq!(item.start, 0);
1294 assert_eq!(item.end, 100);
1295 assert_eq!(item.work_type, WorkType::DistanceMatrix);
1296 assert_eq!(item.priority, 1);
1297 assert_eq!(item.numa_hint, Some(0));
1298 }
1299
1300 #[test]
1301 fn test_work_stealing_pool_creation() {
1302 let config = WorkStealingConfig::new().with_threads(1); let pool = WorkStealingPool::new(config);
1304
1305 assert!(pool.is_ok());
1306 let pool = pool.unwrap();
1307 assert_eq!(pool.workers.len(), 1);
1308 }
1309
1310 #[test]
1311 fn test_advanced_parallel_distance_matrix() {
1312 let _points = array![[0.0, 0.0], [1.0, 0.0]];
1314 let config = WorkStealingConfig::new().with_threads(1);
1315
1316 let processor = AdvancedParallelDistanceMatrix::new(config);
1317 assert!(processor.is_ok());
1318
1319 let processor = processor.unwrap();
1321 let stats = processor.statistics();
1322 assert_eq!(stats.num_threads, 1);
1323 }
1324
1325 #[test]
1326 fn test_advanced_parallel_kmeans() {
1327 let points = array![[0.0, 0.0], [1.0, 1.0]];
1329 let config = WorkStealingConfig::new().with_threads(1); let kmeans = AdvancedParallelKMeans::new(1, config); assert!(kmeans.is_ok());
1333
1334 let kmeans = kmeans.unwrap();
1335 let result = kmeans.fit_parallel(&points.view());
1336 assert!(result.is_ok());
1337
1338 let (centroids, assignments) = result.unwrap();
1339 assert_eq!(centroids.dim(), (1, 2));
1340 assert_eq!(assignments.len(), 2);
1341 }
1342
1343 #[test]
1344 fn test_global_functions() {
1345 let _topology = get_numa_topology();
1347 report_advanced_parallel_capabilities();
1348
1349 let config = WorkStealingConfig::new().with_threads(1);
1350 let init_result = initialize_global_pool(config);
1351 assert!(init_result.is_ok());
1352 }
1353
1354 #[test]
1355 fn test_work_context_structures() {
1356 let (sender, _receiver) = channel::<(usize, usize, f64)>();
1358
1359 let distance_context = DistanceMatrixContext {
1360 points: Array2::zeros((4, 2)),
1361 result_sender: sender,
1362 };
1363
1364 let work_context = WorkContext {
1365 distance_context: Some(distance_context),
1366 kmeans_context: None,
1367 kdtree_context: None,
1368 nn_context: None,
1369 custom_context: None,
1370 };
1371
1372 assert!(work_context.distance_context.is_some());
1374 }
1375
1376 #[test]
1377 fn test_linear_to_matrix_indices() {
1378 let n = 4;
1379 let expected_pairs = [(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)];
1380
1381 for (_linearidx, expected) in expected_pairs.iter().enumerate() {
1382 let result = WorkStealingPool::linear_to_matrix_indices(_linearidx, n);
1383 assert_eq!(result, *expected, "Failed for linear index {_linearidx}");
1384 }
1385 }
1386
1387 #[test]
1388 fn test_kdtree_chunk_result() {
1389 let chunk_result = KDTreeChunkResult {
1390 node_index: 0,
1391 is_leaf: true,
1392 splitting_dimension: 0,
1393 split_value: 1.0,
1394 left_indices: Vec::new(),
1395 right_indices: Vec::new(),
1396 };
1397
1398 assert!(chunk_result.is_leaf);
1399 assert_eq!(chunk_result.node_index, 0);
1400 assert_eq!(chunk_result.splitting_dimension, 0);
1401 }
1402
1403 #[test]
1404 fn test_enhanced_distance_matrix_computation() {
1405 let _points = array![[0.0, 0.0], [1.0, 0.0]];
1407 let config = WorkStealingConfig::new().with_threads(1);
1408
1409 let processor = AdvancedParallelDistanceMatrix::new(config);
1410 assert!(processor.is_ok());
1411
1412 let processor = processor.unwrap();
1414 let stats = processor.statistics();
1415 assert_eq!(stats.num_threads, 1);
1416 assert_eq!(stats.numa_nodes, 1);
1417 }
1418
1419 #[test]
1420 fn test_enhanced_kmeans_with_context() {
1421 let points = array![[0.0, 0.0], [1.0, 1.0]];
1423 let config = WorkStealingConfig::new().with_threads(1); let kmeans = AdvancedParallelKMeans::new(1, config); assert!(kmeans.is_ok());
1427
1428 let kmeans = kmeans.unwrap();
1429 let result = kmeans.fit_parallel(&points.view());
1430 assert!(result.is_ok());
1431
1432 let (centroids, assignments) = result.unwrap();
1433 assert_eq!(centroids.dim(), (1, 2));
1434 assert_eq!(assignments.len(), 2);
1435 }
1436
1437 #[test]
1438 fn test_numa_topology_detailed() {
1439 let topology = NumaTopology::detect();
1440
1441 assert!(topology.num_nodes > 0);
1442 assert_eq!(topology.cores_per_node.len(), topology.num_nodes);
1443 assert_eq!(topology.memory_per_node.len(), topology.num_nodes);
1444 assert_eq!(topology.distance_matrix.len(), topology.num_nodes);
1445
1446 for node in 0..topology.num_nodes {
1448 let threads = topology.optimal_threads_per_node(node);
1449 assert!(threads > 0);
1450 }
1451
1452 for node in 0..topology.num_nodes {
1454 let _capacity = topology.memory_capacity(node);
1455 }
1457 }
1458
1459 #[test]
1460 fn test_work_stealing_configuration_advanced() {
1461 let config = WorkStealingConfig::new()
1462 .with_numa_aware(true)
1463 .with_work_stealing(true)
1464 .with_adaptive_scheduling(true)
1465 .with_threads(4)
1466 .with_chunk_sizes(512, 32)
1467 .with_thread_affinity(ThreadAffinityStrategy::NumaAware)
1468 .with_memory_strategy(MemoryStrategy::NumaInterleaved);
1469
1470 assert!(config.numa_aware);
1471 assert!(config.work_stealing);
1472 assert!(config.adaptive_scheduling);
1473 assert_eq!(config.num_threads, 4);
1474 assert_eq!(config.initial_chunk_size, 512);
1475 assert_eq!(config.min_chunk_size, 32);
1476 assert_eq!(config.thread_affinity, ThreadAffinityStrategy::NumaAware);
1477 assert_eq!(config.memory_strategy, MemoryStrategy::NumaInterleaved);
1478 }
1479}