1use crate::error::{StatsError, StatsResult};
8use scirs2_core::ndarray::{s, Array2, ArrayView1, ArrayView2};
9use scirs2_core::numeric::{Float, NumCast, Zero};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::{Arc, Mutex, RwLock};
13use std::thread;
14use std::time::{Duration, Instant};
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct AdvancedParallelConfig {
19 pub adaptive_work_distribution: bool,
21 pub numa_aware: bool,
23 pub hybrid_processing: bool,
25 pub work_stealing: WorkStealingStrategy,
27 pub load_balancing: LoadBalancingAlgorithm,
29 pub thread_pool_config: ThreadPoolConfig,
31 pub memory_strategy: ParallelMemoryStrategy,
33 pub performance_monitoring: bool,
35 pub min_worksize: usize,
37 pub max_parallel_depth: usize,
39 pub enable_simd_in_parallel: bool,
41 pub cache_optimization: CacheOptimizationLevel,
43}
44
45impl Default for AdvancedParallelConfig {
46 fn default() -> Self {
47 Self {
48 adaptive_work_distribution: true,
49 numa_aware: true,
50 hybrid_processing: false,
51 work_stealing: WorkStealingStrategy::Adaptive,
52 load_balancing: LoadBalancingAlgorithm::DynamicRoundRobin,
53 thread_pool_config: ThreadPoolConfig::default(),
54 memory_strategy: ParallelMemoryStrategy::CacheAware,
55 performance_monitoring: true,
56 min_worksize: 1000,
57 max_parallel_depth: 3,
58 enable_simd_in_parallel: true,
59 cache_optimization: CacheOptimizationLevel::Aggressive,
60 }
61 }
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub enum WorkStealingStrategy {
67 None,
69 Random,
71 LocalityAware,
73 Adaptive,
75 NumaAware,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81pub enum LoadBalancingAlgorithm {
82 StaticRoundRobin,
84 DynamicRoundRobin,
86 WorkBased,
88 PerformanceBased,
90 Hierarchical,
92 MLBased,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct ThreadPoolConfig {
99 pub num_workers: Option<usize>,
101 pub thread_priority: ThreadPriority,
103 pub affinity_strategy: ThreadAffinityStrategy,
105 pub stacksize: Option<usize>,
107 pub idle_timeout: Duration,
109 pub scaling_strategy: ScalingStrategy,
111}
112
113impl Default for ThreadPoolConfig {
114 fn default() -> Self {
115 Self {
116 num_workers: None, thread_priority: ThreadPriority::Normal,
118 affinity_strategy: ThreadAffinityStrategy::NUMA,
119 stacksize: Some(2 * 1024 * 1024), idle_timeout: Duration::from_secs(60),
121 scaling_strategy: ScalingStrategy::Adaptive,
122 }
123 }
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
128pub enum ThreadPriority {
129 Low,
130 Normal,
131 High,
132 RealTime,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
137pub enum ThreadAffinityStrategy {
138 None,
140 NUMA,
142 CoreBased,
144 Custom(Vec<usize>),
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
150pub enum ScalingStrategy {
151 Fixed,
153 Adaptive,
155 PerformanceBased,
157 ResourceAware,
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
163pub enum ParallelMemoryStrategy {
164 Simple,
166 CacheAware,
168 NumaAware,
170 PoolBased,
172 LockFree,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
178pub enum CacheOptimizationLevel {
179 None,
181 Basic,
183 Aggressive,
185 HardwareSpecific,
187}
188
189#[derive(Debug, Clone)]
191pub struct WorkUnit<T> {
192 pub id: usize,
194 pub data: T,
196 pub cost: f64,
198 pub dependencies: Vec<usize>,
200 pub priority: WorkPriority,
202 pub numa_node: Option<usize>,
204}
205
206#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
208pub enum WorkPriority {
209 Low = 0,
210 Normal = 1,
211 High = 2,
212 Critical = 3,
213}
214
215#[derive(Debug, Clone)]
217pub struct ParallelExecutionContext {
218 pub thread_id: usize,
220 pub numa_node: usize,
222 pub memory_pool: Option<Arc<Mutex<MemoryPool>>>,
224 pub counters: PerformanceCounters,
226 pub cache: ThreadLocalCache,
228}
229
230#[derive(Debug, Clone, Default)]
232pub struct PerformanceCounters {
233 pub tasks_completed: usize,
235 pub total_time: Duration,
237 pub cache_hits: usize,
239 pub cache_misses: usize,
241 pub work_stolen: usize,
243 pub work_given: usize,
245 pub memory_allocations: usize,
247 pub memory_deallocations: usize,
249}
250
251#[derive(Debug, Clone)]
253pub struct ThreadLocalCache {
254 pub results: HashMap<String, CachedResult>,
256 pub stats: CacheStatistics,
258}
259
260#[derive(Debug, Clone)]
262pub struct CachedResult {
263 pub value: Vec<f64>,
265 pub timestamp: Instant,
267 pub access_count: usize,
269 pub recompute_cost: f64,
271}
272
273#[derive(Debug, Clone, Default)]
275pub struct CacheStatistics {
276 pub total_accesses: usize,
278 pub hits: usize,
280 pub misses: usize,
282 pub evictions: usize,
284}
285
286#[derive(Debug)]
288pub struct MemoryPool {
289 blocks: Vec<Vec<u8>>,
291 available: Vec<usize>,
293 blocksize: usize,
295 total_allocations: usize,
297}
298
299impl MemoryPool {
300 pub fn new(num_blocks: usize, blocksize: usize) -> Self {
302 let mut blocks = Vec::with_capacity(num_blocks);
303 let mut available = Vec::with_capacity(num_blocks);
304
305 for i in 0..num_blocks {
306 blocks.push(vec![0u8; blocksize]);
307 available.push(i);
308 }
309
310 Self {
311 blocks,
312 available,
313 blocksize,
314 total_allocations: 0,
315 }
316 }
317
318 pub fn allocate(&mut self) -> Option<*mut u8> {
320 if let Some(index) = self.available.pop() {
321 self.total_allocations += 1;
322 Some(self.blocks[index].as_mut_ptr())
323 } else {
324 None
325 }
326 }
327
328 pub fn deallocate(&mut self, ptr: *mut u8) {
330 for (i, block) in self.blocks.iter().enumerate() {
332 if ptr == block.as_ptr() as *mut u8 {
333 self.available.push(i);
334 break;
335 }
336 }
337 }
338}
339
340#[derive(Debug, Clone)]
342pub struct AdvancedParallelResult<T> {
343 pub result: T,
345 pub metrics: ParallelExecutionMetrics,
347 pub analysis: ParallelPerformanceAnalysis,
349 pub resource_utilization: ResourceUtilization,
351}
352
353#[derive(Debug, Clone)]
355pub struct ParallelExecutionMetrics {
356 pub total_time: Duration,
358 pub parallel_time: Duration,
360 pub sequential_time: Duration,
362 pub sync_time: Duration,
364 pub threads_used: usize,
366 pub load_balance_efficiency: f64,
368 pub parallel_efficiency: f64,
370 pub speedup: f64,
372 pub work_distribution_quality: f64,
374}
375
376#[derive(Debug, Clone)]
378pub struct ParallelPerformanceAnalysis {
379 pub bottlenecks: Vec<PerformanceBottleneck>,
381 pub scaling_analysis: ScalingAnalysis,
383 pub optimization_opportunities: Vec<OptimizationOpportunity>,
385 pub performance_rating: PerformanceRating,
387}
388
389#[derive(Debug, Clone)]
391pub struct PerformanceBottleneck {
392 pub bottleneck_type: BottleneckType,
394 pub severity: f64,
396 pub description: String,
398 pub mitigation: String,
400}
401
402#[derive(Debug, Clone)]
404pub enum BottleneckType {
405 MemoryBandwidth,
407 CacheContention,
409 LoadImbalance,
411 SynchronizationOverhead,
413 NumaEffects,
415 FalseSharing,
417 ContextSwitching,
419 InsufficientParallelism,
421}
422
423#[derive(Debug, Clone)]
425pub struct ScalingAnalysis {
426 pub theoretical_max_speedup: f64,
428 pub achieved_speedup: f64,
430 pub parallel_fraction: f64,
432 pub serial_bottleneck_impact: f64,
434 pub scaling_efficiency: HashMap<usize, f64>,
436 pub optimal_thread_count: usize,
438}
439
440#[derive(Debug, Clone)]
442pub struct OptimizationOpportunity {
443 pub opportunity_type: OptimizationType,
445 pub potential_improvement: f64,
447 pub complexity: OptimizationComplexity,
449 pub description: String,
451 pub implementation_steps: Vec<String>,
453}
454
455#[derive(Debug, Clone)]
457pub enum OptimizationType {
458 WorkDistribution,
460 MemoryLayout,
462 CacheOptimization,
464 SimdIntegration,
466 AlgorithmSelection,
468 ResourceAllocation,
470}
471
472#[derive(Debug, Clone)]
474pub enum OptimizationComplexity {
475 Low,
476 Medium,
477 High,
478 VeryHigh,
479}
480
481#[derive(Debug, Clone)]
483pub enum PerformanceRating {
484 Excellent,
485 Good,
486 Acceptable,
487 Poor,
488 Unacceptable,
489}
490
491#[derive(Debug, Clone)]
493pub struct PerformanceStatistics {
494 pub total_operations: usize,
496 pub average_speedup: f64,
498 pub best_strategies: Vec<String>,
500 pub hardware_utilization: HardwareUtilization,
502}
503
504#[derive(Debug, Clone)]
506pub struct HardwareUtilization {
507 pub simd_utilization: f64,
509 pub memory_bandwidth_utilization: f64,
511 pub cache_efficiency: f64,
513 pub energy_efficiency: Option<f64>,
515}
516
517#[derive(Debug, Clone)]
519pub struct ResourceUtilization {
520 pub cpu_utilization: Vec<f64>,
522 pub memory_utilization: f64,
524 pub cache_utilization: CacheUtilization,
526 pub numa_utilization: Vec<f64>,
528 pub energy_consumption: Option<f64>,
530}
531
532#[derive(Debug, Clone)]
534pub struct CacheUtilization {
535 pub l1_hit_rate: f64,
537 pub l2_hit_rate: f64,
539 pub l3_hit_rate: f64,
541 pub cache_line_utilization: f64,
543}
544
545pub struct AdvancedParallelStatsProcessor {
547 config: AdvancedParallelConfig,
548 execution_contexts: Vec<Arc<RwLock<ParallelExecutionContext>>>,
549 work_queue: Arc<Mutex<Vec<WorkUnit<Vec<f64>>>>>,
550 performance_history: Arc<Mutex<Vec<ParallelExecutionMetrics>>>,
551 optimization_cache: Arc<RwLock<HashMap<String, OptimizationStrategy>>>,
552}
553
554#[derive(Debug, Clone)]
556pub struct OptimizationStrategy {
557 pub name: String,
559 pub thread_count: usize,
561 pub work_distribution: WorkDistributionMethod,
563 pub memory_layout: MemoryLayoutStrategy,
565 pub expected_performance: f64,
567}
568
569#[derive(Debug, Clone)]
571pub enum WorkDistributionMethod {
572 EqualChunks,
574 SizeBased,
576 CostBased,
578 Adaptive,
580 LocalityAware,
582}
583
584#[derive(Debug, Clone)]
586pub enum MemoryLayoutStrategy {
587 Contiguous,
589 Interleaved,
591 NumaAware,
593 CacheOptimized,
595}
596
597impl AdvancedParallelStatsProcessor {
598 pub fn new(config: AdvancedParallelConfig) -> StatsResult<Self> {
600 let num_threads = config
601 .thread_pool_config
602 .num_workers
603 .unwrap_or_else(|| scirs2_core::parallel_ops::current_num_threads().max(1));
604
605 let mut execution_contexts = Vec::with_capacity(num_threads);
606
607 for i in 0..num_threads {
608 let context = ParallelExecutionContext {
609 thread_id: i,
610 numa_node: i % 2, memory_pool: Some(Arc::new(Mutex::new(MemoryPool::new(100, 4096)))),
612 counters: PerformanceCounters::default(),
613 cache: ThreadLocalCache {
614 results: HashMap::new(),
615 stats: CacheStatistics::default(),
616 },
617 };
618 execution_contexts.push(Arc::new(RwLock::new(context)));
619 }
620
621 Ok(Self {
622 config,
623 execution_contexts,
624 work_queue: Arc::new(Mutex::new(Vec::new())),
625 performance_history: Arc::new(Mutex::new(Vec::new())),
626 optimization_cache: Arc::new(RwLock::new(HashMap::new())),
627 })
628 }
629
630 pub fn default() -> StatsResult<Self> {
632 Self::new(AdvancedParallelConfig::default())
633 }
634
635 pub fn mean_advanced_parallel<F>(
637 &self,
638 data: ArrayView1<F>,
639 ) -> StatsResult<AdvancedParallelResult<F>>
640 where
641 F: Float + NumCast + Send + Sync + Zero + std::iter::Sum + std::fmt::Display,
642 {
643 let start_time = Instant::now();
644
645 let strategy = self.select_optimization_strategy("mean", data.len())?;
647
648 let work_units = self.create_work_units(&data, &strategy)?;
650
651 let partial_results = self.execute_parallel_work(&work_units)?;
653
654 let result = self.combine_mean_results(&partial_results, data.len())?;
656
657 let total_time = start_time.elapsed();
659 let metrics = self.calculate_execution_metrics(total_time, &work_units)?;
660 let analysis = self.analyze_performance(&metrics)?;
661 let resource_utilization = self.measure_resource_utilization()?;
662
663 self.update_performance_history(&metrics);
665
666 Ok(AdvancedParallelResult {
667 result,
668 metrics,
669 analysis,
670 resource_utilization,
671 })
672 }
673
674 pub fn variance_advanced_parallel<F>(
676 &self,
677 data: ArrayView1<F>,
678 ddof: usize,
679 ) -> StatsResult<AdvancedParallelResult<F>>
680 where
681 F: Float + NumCast + Send + Sync + Zero + std::iter::Sum + std::fmt::Display,
682 {
683 let start_time = Instant::now();
684
685 let mean_result = self.mean_advanced_parallel(data)?;
687 let mean_val = mean_result.result;
688
689 let strategy = self.select_optimization_strategy("variance", data.len())?;
691
692 let work_units = self.create_variance_work_units(&data, mean_val, ddof, &strategy)?;
694
695 let partial_results = self.execute_parallel_work(&work_units)?;
697
698 let result = self.combine_variance_results(&partial_results, data.len(), ddof)?;
700
701 let total_time = start_time.elapsed();
702 let metrics = self.calculate_execution_metrics(total_time, &work_units)?;
703 let analysis = self.analyze_performance(&metrics)?;
704 let resource_utilization = self.measure_resource_utilization()?;
705
706 self.update_performance_history(&metrics);
707
708 Ok(AdvancedParallelResult {
709 result,
710 metrics,
711 analysis,
712 resource_utilization,
713 })
714 }
715
716 pub fn correlation_matrix_advanced_parallel<F>(
718 &self,
719 data: ArrayView2<F>,
720 ) -> StatsResult<AdvancedParallelResult<Array2<F>>>
721 where
722 F: Float + NumCast + Send + Sync + Zero + std::iter::Sum + Clone + std::fmt::Display,
723 {
724 let start_time = Instant::now();
725 let (n_rows, n_cols) = data.dim();
726
727 let mut correlation_work = Vec::new();
729 let mut work_id = 0;
730
731 for i in 0..n_cols {
732 for j in i..n_cols {
733 let col_i = data.column(i).to_owned();
734 let col_j = data.column(j).to_owned();
735
736 correlation_work.push(WorkUnit {
737 id: work_id,
738 data: (
739 col_i.into_raw_vec_and_offset().0,
740 col_j.into_raw_vec_and_offset().0,
741 i,
742 j,
743 ),
744 cost: (n_rows as f64).sqrt(), dependencies: Vec::new(),
746 priority: WorkPriority::Normal,
747 numa_node: Some(work_id % 2),
748 });
749 work_id += 1;
750 }
751 }
752
753 let correlation_results = self.execute_correlation_work(correlation_work.as_slice())?;
755
756 let mut result_matrix = Array2::zeros((n_cols, n_cols));
758 for ((i, j), correlation) in correlation_results {
759 result_matrix[[i, j]] = correlation;
760 if i != j {
761 result_matrix[[j, i]] = correlation; }
763 }
764
765 let total_time = start_time.elapsed();
766 let metrics = self.calculate_matrix_execution_metrics(total_time, &correlation_work)?;
767 let analysis = self.analyze_performance(&metrics)?;
768 let resource_utilization = self.measure_resource_utilization()?;
769
770 self.update_performance_history(&metrics);
771
772 Ok(AdvancedParallelResult {
773 result: result_matrix,
774 metrics,
775 analysis,
776 resource_utilization,
777 })
778 }
779
780 fn select_optimization_strategy(
782 &self,
783 operation: &str,
784 datasize: usize,
785 ) -> StatsResult<OptimizationStrategy> {
786 let cache_key = format!("{}_{}", operation, datasize / 1000); if let Ok(cache) = self.optimization_cache.read() {
790 if let Some(strategy) = cache.get(&cache_key) {
791 return Ok(strategy.clone());
792 }
793 }
794
795 let optimal_threads = self.calculate_optimal_thread_count(datasize);
797 let work_distribution = if datasize > 1_000_000 {
798 WorkDistributionMethod::CostBased
799 } else if datasize > 100_000 {
800 WorkDistributionMethod::SizeBased
801 } else {
802 WorkDistributionMethod::EqualChunks
803 };
804
805 let memory_layout = if self.config.numa_aware {
806 MemoryLayoutStrategy::NumaAware
807 } else if self.config.cache_optimization != CacheOptimizationLevel::None {
808 MemoryLayoutStrategy::CacheOptimized
809 } else {
810 MemoryLayoutStrategy::Contiguous
811 };
812
813 let strategy = OptimizationStrategy {
814 name: format!("{}_optimized", operation),
815 thread_count: optimal_threads,
816 work_distribution,
817 memory_layout,
818 expected_performance: self.estimate_performance(optimal_threads, datasize),
819 };
820
821 if let Ok(mut cache) = self.optimization_cache.write() {
823 cache.insert(cache_key, strategy.clone());
824 }
825
826 Ok(strategy)
827 }
828
829 fn calculate_optimal_thread_count(&self, datasize: usize) -> usize {
831 let available_threads = self.execution_contexts.len();
832 let min_work_per_thread = self.config.min_worksize;
833
834 let max_useful_threads = (datasize / min_work_per_thread).max(1);
836
837 let numa_optimal = if self.config.numa_aware {
839 (available_threads / 2) * 2
841 } else {
842 available_threads
843 };
844
845 max_useful_threads.min(numa_optimal).min(available_threads)
846 }
847
848 fn estimate_performance(&self, thread_count: usize, datasize: usize) -> f64 {
850 let sequential_time = datasize as f64;
852 let parallel_efficiency = 0.8; let parallel_time = sequential_time / (thread_count as f64 * parallel_efficiency);
854
855 sequential_time / parallel_time
856 }
857
858 fn create_work_units<F>(
860 &self,
861 data: &ArrayView1<F>,
862 strategy: &OptimizationStrategy,
863 ) -> StatsResult<Vec<WorkUnit<Vec<f64>>>>
864 where
865 F: Float + NumCast + std::fmt::Display,
866 {
867 let mut work_units = Vec::new();
868 let datasize = data.len();
869 let chunksize = datasize / strategy.thread_count;
870
871 for i in 0..strategy.thread_count {
872 let start = i * chunksize;
873 let end = if i == strategy.thread_count - 1 {
874 datasize
875 } else {
876 (i + 1) * chunksize
877 };
878
879 let chunkdata: Vec<f64> = data
880 .slice(s![start..end])
881 .iter()
882 .map(|&x| x.to_f64().unwrap_or(0.0))
883 .collect();
884
885 work_units.push(WorkUnit {
886 id: i,
887 data: chunkdata,
888 cost: (end - start) as f64,
889 dependencies: Vec::new(),
890 priority: WorkPriority::Normal,
891 numa_node: if self.config.numa_aware {
892 Some(i % 2) } else {
894 None
895 },
896 });
897 }
898
899 Ok(work_units)
900 }
901
902 fn create_variance_work_units<F>(
904 &self,
905 data: &ArrayView1<F>,
906 mean_val: F,
907 _ddof: usize,
908 strategy: &OptimizationStrategy,
909 ) -> StatsResult<Vec<WorkUnit<Vec<f64>>>>
910 where
911 F: Float + NumCast + std::fmt::Display,
912 {
913 let mut work_units = Vec::new();
914 let datasize = data.len();
915 let chunksize = datasize / strategy.thread_count;
916 let mean_f64 = mean_val.to_f64().unwrap_or(0.0);
917
918 for i in 0..strategy.thread_count {
919 let start = i * chunksize;
920 let end = if i == strategy.thread_count - 1 {
921 datasize
922 } else {
923 (i + 1) * chunksize
924 };
925
926 let chunkdata: Vec<f64> = data
927 .slice(s![start..end])
928 .iter()
929 .map(|&x| {
930 let _val = x.to_f64().unwrap_or(0.0);
931 let diff = _val - mean_f64;
932 diff * diff
933 })
934 .collect();
935
936 work_units.push(WorkUnit {
937 id: i,
938 data: chunkdata,
939 cost: (end - start) as f64,
940 dependencies: Vec::new(),
941 priority: WorkPriority::Normal,
942 numa_node: if self.config.numa_aware {
943 Some(i % 2)
944 } else {
945 None
946 },
947 });
948 }
949
950 Ok(work_units)
951 }
952
953 fn execute_parallel_work(&self, workunits: &[WorkUnit<Vec<f64>>]) -> StatsResult<Vec<f64>> {
955 let num_threads = workunits.len();
956 let results = Arc::new(Mutex::new(vec![0.0; num_threads]));
957 let work_units = Arc::new(workunits.to_vec());
958
959 thread::scope(|s| {
960 let handles: Vec<_> = (0..num_threads)
961 .map(|thread_id| {
962 let results = Arc::clone(&results);
963 let work_units = Arc::clone(&work_units);
964
965 s.spawn(move || {
966 let work_unit = &work_units[thread_id];
967 let sum: f64 = work_unit.data.iter().sum();
968
969 if let Ok(mut results) = results.lock() {
970 results[thread_id] = sum;
971 }
972 })
973 })
974 .collect();
975
976 for handle in handles {
978 let _ = handle.join();
979 }
980 });
981
982 let results = results.lock().expect("Operation failed").clone();
983 Ok(results)
984 }
985
986 fn execute_correlation_work<F>(
988 &self,
989 work_units: &[WorkUnit<(Vec<F>, Vec<F>, usize, usize)>],
990 ) -> StatsResult<Vec<((usize, usize), F)>>
991 where
992 F: Float + NumCast + Send + Sync + Clone + std::iter::Sum + std::fmt::Display,
993 {
994 let num_work_units = work_units.len();
995 let results = Arc::new(Mutex::new(Vec::with_capacity(num_work_units)));
996 let work_units = Arc::new(work_units.to_vec());
997
998 thread::scope(|s| {
999 let handles: Vec<_> = (0..num_work_units)
1000 .map(|work_id| {
1001 let results = Arc::clone(&results);
1002 let work_units = Arc::clone(&work_units);
1003
1004 s.spawn(move || {
1005 let work_unit = &work_units[work_id];
1006 let (ref x, ref y, i, j) = work_unit.data;
1007
1008 let correlation = self.compute_correlation(x, y).unwrap_or(F::zero());
1010
1011 if let Ok(mut results) = results.lock() {
1012 results.push(((i, j), correlation));
1013 }
1014 })
1015 })
1016 .collect();
1017
1018 for handle in handles {
1019 let _ = handle.join();
1020 }
1021 });
1022
1023 let results = results.lock().expect("Operation failed").clone();
1024 Ok(results)
1025 }
1026
1027 fn compute_correlation<F>(&self, x: &[F], y: &[F]) -> StatsResult<F>
1029 where
1030 F: Float + NumCast + Send + Sync + Clone + std::iter::Sum + std::fmt::Display,
1031 {
1032 if x.len() != y.len() || x.is_empty() {
1033 return Ok(F::zero());
1034 }
1035
1036 let n = F::from(x.len()).expect("Operation failed");
1037 let sum_x: F = x.iter().cloned().sum();
1038 let sum_y: F = y.iter().cloned().sum();
1039 let sum_xx: F = x.iter().map(|&xi| xi * xi).sum();
1040 let sum_yy: F = y.iter().map(|&yi| yi * yi).sum();
1041 let sum_xy: F = x.iter().zip(y).map(|(&xi, &yi)| xi * yi).sum();
1042
1043 let numerator = n * sum_xy - sum_x * sum_y;
1044 let denominator = ((n * sum_xx - sum_x * sum_x) * (n * sum_yy - sum_y * sum_y)).sqrt();
1045
1046 if denominator == F::zero() {
1047 Ok(F::zero())
1048 } else {
1049 Ok(numerator / denominator)
1050 }
1051 }
1052
1053 fn combine_mean_results<F>(&self, partial_results: &[f64], totalcount: usize) -> StatsResult<F>
1055 where
1056 F: Float + NumCast + std::fmt::Display,
1057 {
1058 let total_sum: f64 = partial_results.iter().sum();
1059 let mean = total_sum / totalcount as f64;
1060 F::from(mean).ok_or_else(|| {
1061 StatsError::ComputationError("Failed to convert mean result".to_string())
1062 })
1063 }
1064
1065 fn combine_variance_results<F>(
1067 &self,
1068 partial_results: &[f64],
1069 total_count: usize,
1070 ddof: usize,
1071 ) -> StatsResult<F>
1072 where
1073 F: Float + NumCast + std::fmt::Display,
1074 {
1075 let total_sum_sq_dev: f64 = partial_results.iter().sum();
1076 let variance = total_sum_sq_dev / (total_count - ddof) as f64;
1077 F::from(variance).ok_or_else(|| {
1078 StatsError::ComputationError("Failed to convert variance result".to_string())
1079 })
1080 }
1081
1082 fn calculate_execution_metrics(
1084 &self,
1085 total_time: Duration,
1086 work_units: &[WorkUnit<Vec<f64>>],
1087 ) -> StatsResult<ParallelExecutionMetrics> {
1088 let threads_used = work_units.len();
1089 let total_work: f64 = work_units.iter().map(|wu| wu.cost).sum();
1090 let avg_work_per_thread = total_work / threads_used as f64;
1091
1092 let work_variance = work_units
1094 .iter()
1095 .map(|wu| (wu.cost - avg_work_per_thread).powi(2))
1096 .sum::<f64>()
1097 / threads_used as f64;
1098 let load_balance_efficiency = 1.0 - (work_variance.sqrt() / avg_work_per_thread).min(1.0);
1099
1100 let sequential_time_estimate = total_time.mul_f64(threads_used as f64);
1102 let parallel_efficiency = total_time.as_secs_f64() / sequential_time_estimate.as_secs_f64();
1103
1104 let speedup = threads_used as f64 * parallel_efficiency;
1105
1106 Ok(ParallelExecutionMetrics {
1107 total_time,
1108 parallel_time: total_time.mul_f64(0.9), sequential_time: total_time.mul_f64(0.1), sync_time: total_time.mul_f64(0.05), threads_used,
1112 load_balance_efficiency,
1113 parallel_efficiency,
1114 speedup,
1115 work_distribution_quality: load_balance_efficiency,
1116 })
1117 }
1118
1119 fn calculate_matrix_execution_metrics<F>(
1121 &self,
1122 total_time: Duration,
1123 work_units: &[WorkUnit<(Vec<F>, Vec<F>, usize, usize)>],
1124 ) -> StatsResult<ParallelExecutionMetrics>
1125 where
1126 F: Float + NumCast + Send + Sync + Clone + std::iter::Sum + std::fmt::Display,
1127 {
1128 let threads_used = work_units.len();
1129 let _total_work: f64 = work_units.iter().map(|wu| wu.cost).sum();
1130 let load_balance_efficiency = 0.85; Ok(ParallelExecutionMetrics {
1133 total_time,
1134 parallel_time: total_time.mul_f64(0.85),
1135 sequential_time: total_time.mul_f64(0.15),
1136 sync_time: total_time.mul_f64(0.08),
1137 threads_used,
1138 load_balance_efficiency,
1139 parallel_efficiency: 0.8, speedup: threads_used as f64 * 0.8,
1141 work_distribution_quality: load_balance_efficiency,
1142 })
1143 }
1144
1145 fn analyze_performance(
1147 &self,
1148 metrics: &ParallelExecutionMetrics,
1149 ) -> StatsResult<ParallelPerformanceAnalysis> {
1150 let mut bottlenecks = Vec::new();
1151
1152 if metrics.load_balance_efficiency < 0.8 {
1154 bottlenecks.push(PerformanceBottleneck {
1155 bottleneck_type: BottleneckType::LoadImbalance,
1156 severity: 1.0 - metrics.load_balance_efficiency,
1157 description: "Load imbalance detected among threads".to_string(),
1158 mitigation: "Consider dynamic work distribution".to_string(),
1159 });
1160 }
1161
1162 if metrics.sync_time.as_secs_f64() / metrics.total_time.as_secs_f64() > 0.1 {
1164 bottlenecks.push(PerformanceBottleneck {
1165 bottleneck_type: BottleneckType::SynchronizationOverhead,
1166 severity: metrics.sync_time.as_secs_f64() / metrics.total_time.as_secs_f64(),
1167 description: "High synchronization overhead".to_string(),
1168 mitigation: "Reduce synchronization points or use lock-free algorithms".to_string(),
1169 });
1170 }
1171
1172 let performance_rating = if metrics.parallel_efficiency > 0.9 {
1174 PerformanceRating::Excellent
1175 } else if metrics.parallel_efficiency > 0.7 {
1176 PerformanceRating::Good
1177 } else if metrics.parallel_efficiency > 0.5 {
1178 PerformanceRating::Acceptable
1179 } else if metrics.parallel_efficiency > 0.3 {
1180 PerformanceRating::Poor
1181 } else {
1182 PerformanceRating::Unacceptable
1183 };
1184
1185 Ok(ParallelPerformanceAnalysis {
1186 bottlenecks,
1187 scaling_analysis: ScalingAnalysis {
1188 theoretical_max_speedup: metrics.threads_used as f64,
1189 achieved_speedup: metrics.speedup,
1190 parallel_fraction: 0.9, serial_bottleneck_impact: 0.1, scaling_efficiency: HashMap::new(), optimal_thread_count: metrics.threads_used,
1194 },
1195 optimization_opportunities: Vec::new(), performance_rating,
1197 })
1198 }
1199
1200 fn measure_resource_utilization(&self) -> StatsResult<ResourceUtilization> {
1202 Ok(ResourceUtilization {
1204 cpu_utilization: vec![0.8; self.execution_contexts.len()], memory_utilization: 0.6, cache_utilization: CacheUtilization {
1207 l1_hit_rate: 0.95,
1208 l2_hit_rate: 0.85,
1209 l3_hit_rate: 0.75,
1210 cache_line_utilization: 0.8,
1211 },
1212 numa_utilization: vec![0.8, 0.8], energy_consumption: None, })
1215 }
1216
1217 fn update_performance_history(&self, metrics: &ParallelExecutionMetrics) {
1219 if let Ok(mut history) = self.performance_history.lock() {
1220 history.push(metrics.clone());
1221
1222 if history.len() > 1000 {
1224 history.remove(0);
1225 }
1226 }
1227 }
1228
1229 pub fn get_performance_statistics(&self) -> PerformanceStatistics {
1231 if let Ok(history) = self.performance_history.lock() {
1232 let total_operations = history.len();
1233 let avg_speedup = if !history.is_empty() {
1234 history.iter().map(|m| m.speedup).sum::<f64>() / history.len() as f64
1235 } else {
1236 0.0
1237 };
1238
1239 let avg_efficiency = if !history.is_empty() {
1240 history.iter().map(|m| m.parallel_efficiency).sum::<f64>() / history.len() as f64
1241 } else {
1242 0.0
1243 };
1244
1245 PerformanceStatistics {
1246 total_operations,
1247 average_speedup: avg_speedup,
1248 best_strategies: Vec::new(), hardware_utilization: HardwareUtilization {
1250 simd_utilization: 0.7, memory_bandwidth_utilization: 0.6,
1252 cache_efficiency: avg_efficiency,
1253 energy_efficiency: None,
1254 },
1255 }
1256 } else {
1257 PerformanceStatistics {
1258 total_operations: 0,
1259 average_speedup: 0.0,
1260 best_strategies: Vec::new(),
1261 hardware_utilization: HardwareUtilization {
1262 simd_utilization: 0.0,
1263 memory_bandwidth_utilization: 0.0,
1264 cache_efficiency: 0.0,
1265 energy_efficiency: None,
1266 },
1267 }
1268 }
1269 }
1270}
1271
1272#[allow(dead_code)]
1274pub fn create_advanced_parallel_processor() -> StatsResult<AdvancedParallelStatsProcessor> {
1275 AdvancedParallelStatsProcessor::default()
1276}
1277
1278#[allow(dead_code)]
1279pub fn mean_advanced_parallel<F>(data: ArrayView1<F>) -> StatsResult<AdvancedParallelResult<F>>
1280where
1281 F: Float + NumCast + Send + Sync + Zero + std::iter::Sum + std::fmt::Display,
1282{
1283 let processor = AdvancedParallelStatsProcessor::default()?;
1284 processor.mean_advanced_parallel(data)
1285}
1286
1287#[allow(dead_code)]
1288pub fn variance_advanced_parallel<F>(
1289 data: ArrayView1<F>,
1290 ddof: usize,
1291) -> StatsResult<AdvancedParallelResult<F>>
1292where
1293 F: Float + NumCast + Send + Sync + Zero + std::iter::Sum + std::fmt::Display,
1294{
1295 let processor = AdvancedParallelStatsProcessor::default()?;
1296 processor.variance_advanced_parallel(data, ddof)
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301 use super::*;
1302 use scirs2_core::ndarray::array;
1303
1304 #[test]
1305 fn test_advanced_parallel_config() {
1306 let config = AdvancedParallelConfig::default();
1307 assert!(config.adaptive_work_distribution);
1308 assert!(config.numa_aware);
1309 assert!(config.performance_monitoring);
1310 }
1311
1312 #[test]
1313 fn test_processor_creation() {
1314 let processor = AdvancedParallelStatsProcessor::default().expect("Operation failed");
1315 assert!(!processor.execution_contexts.is_empty());
1316 }
1317
1318 #[test]
1319 fn test_optimization_strategy_selection() {
1320 let processor = AdvancedParallelStatsProcessor::default().expect("Operation failed");
1321 let strategy = processor
1322 .select_optimization_strategy("mean", 10000)
1323 .expect("Operation failed");
1324
1325 assert!(!strategy.name.is_empty());
1326 assert!(strategy.thread_count > 0);
1327 assert!(strategy.expected_performance > 0.0);
1328 }
1329
1330 #[test]
1331 fn test_work_unit_creation() {
1332 let processor = AdvancedParallelStatsProcessor::default().expect("Operation failed");
1333 let data = array![1.0f64, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0];
1334 let strategy = OptimizationStrategy {
1335 name: "test".to_string(),
1336 thread_count: 2,
1337 work_distribution: WorkDistributionMethod::EqualChunks,
1338 memory_layout: MemoryLayoutStrategy::Contiguous,
1339 expected_performance: 2.0,
1340 };
1341
1342 let work_units = processor
1343 .create_work_units(&data.view(), &strategy)
1344 .expect("Operation failed");
1345 assert_eq!(work_units.len(), 2);
1346 assert!(!work_units[0].data.is_empty());
1347 assert!(!work_units[1].data.is_empty());
1348 }
1349
1350 #[test]
1351 fn test_correlation_computation() {
1352 let processor = AdvancedParallelStatsProcessor::default().expect("Operation failed");
1353 let x = vec![1.0, 2.0, 3.0, 4.0, 5.0];
1354 let y = vec![2.0, 4.0, 6.0, 8.0, 10.0];
1355
1356 let correlation = processor
1357 .compute_correlation(&x, &y)
1358 .expect("Operation failed");
1359 assert!((correlation - 1.0).abs() < 1e-10); }
1361
1362 #[test]
1363 fn test_performance_metrics_calculation() {
1364 let processor = AdvancedParallelStatsProcessor::default().expect("Operation failed");
1365 let work_units = vec![
1366 WorkUnit {
1367 id: 0,
1368 data: vec![1.0, 2.0],
1369 cost: 100.0,
1370 dependencies: Vec::new(),
1371 priority: WorkPriority::Normal,
1372 numa_node: None,
1373 },
1374 WorkUnit {
1375 id: 1,
1376 data: vec![3.0, 4.0],
1377 cost: 120.0,
1378 dependencies: Vec::new(),
1379 priority: WorkPriority::Normal,
1380 numa_node: None,
1381 },
1382 ];
1383
1384 let metrics = processor
1385 .calculate_execution_metrics(Duration::from_millis(100), &work_units)
1386 .expect("Operation failed");
1387
1388 assert_eq!(metrics.threads_used, 2);
1389 assert!(metrics.load_balance_efficiency > 0.0);
1390 assert!(metrics.speedup > 0.0);
1391 }
1392}