1use crate::error::{StatsError, StatsResult};
13use scirs2_core::ndarray::ArrayView2;
14use scirs2_core::numeric::{Float, NumCast, One, Zero};
15use scirs2_core::{
16 parallel_ops::*,
17 simd_ops::{PlatformCapabilities, SimdUnifiedOps},
18};
19use std::collections::{HashMap, VecDeque};
20use std::marker::PhantomData;
21use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22use std::sync::{Arc, Mutex, RwLock};
23use std::thread;
24use std::time::{Duration, Instant};
25
26#[derive(Debug, Clone)]
28pub struct AdvancedParallelConfig {
29 pub hardware: HardwareConfig,
31 pub strategy: ParallelStrategy,
33 pub memory: MemoryConfig,
35 pub optimization: OptimizationConfig,
37 pub fault_tolerance: FaultToleranceConfig,
39 pub gpu: GpuConfig,
41}
42
43#[derive(Debug, Clone)]
45pub struct HardwareConfig {
46 pub cpu_cores: usize,
48 pub numa_nodes: usize,
50 pub cachesizes: CacheSizes,
52 pub memory_bandwidth: f64,
54 pub simd_capabilities: PlatformCapabilities,
56 pub gpu_devices: Vec<GpuDevice>,
58}
59
60#[derive(Debug, Clone)]
62pub struct CacheSizes {
63 pub l1data: usize,
64 pub l1_instruction: usize,
65 pub l2_unified: usize,
66 pub l3_shared: usize,
67}
68
69#[derive(Debug, Clone)]
71pub struct GpuDevice {
72 pub device_id: usize,
73 pub memory_gb: f64,
74 pub compute_capability: f64,
75 pub multiprocessors: usize,
76 pub max_threads_per_block: usize,
77}
78
79#[derive(Debug, Clone, Copy)]
81pub enum ParallelStrategy {
82 CpuOptimal,
84 CpuSimd,
86 HybridCpuGpu,
88 GpuPrimary,
90 Distributed,
92 Adaptive,
94}
95
96#[derive(Debug, Clone, Default)]
98pub struct MemoryConfig {
99 pub system_ram: usize,
101 pub memory_limit: Option<usize>,
103 pub enable_out_of_core: bool,
105 pub out_of_core_chunksize: usize,
107 pub enable_memory_mapping: bool,
109 pub memory_poolsize: usize,
111 pub enable_gc: bool,
113}
114
115#[derive(Debug, Clone)]
117pub struct OptimizationConfig {
118 pub adaptive_load_balancing: bool,
120 pub work_stealing: bool,
122 pub cache_aware_scheduling: bool,
124 pub numa_aware_allocation: bool,
126 pub dynamic_thread_scaling: bool,
128 pub monitoring_interval: Duration,
130 pub optimization_aggressiveness: f64,
132}
133
134#[derive(Debug, Clone)]
136pub struct FaultToleranceConfig {
137 pub enable_checkpointing: bool,
139 pub checkpoint_interval: Duration,
141 pub enable_retry: bool,
143 pub max_retries: usize,
145 pub enable_degradation: bool,
147 pub health_check_interval: Duration,
149}
150
151#[derive(Debug, Clone)]
153pub struct GpuConfig {
154 pub enable_gpu: bool,
156 pub preferred_device: Option<usize>,
158 pub gpu_memory_limit: Option<usize>,
160 pub transfer_threshold: usize,
162 pub enable_unified_memory: bool,
164 pub stream_count: usize,
166}
167
168impl Default for AdvancedParallelConfig {
169 fn default() -> Self {
170 let cpu_cores = num_threads();
171 let system_ram = Self::detect_system_ram();
172
173 Self {
174 hardware: HardwareConfig {
175 cpu_cores,
176 numa_nodes: Self::detect_numa_nodes(),
177 cachesizes: Self::detect_cachesizes(),
178 memory_bandwidth: Self::detect_memory_bandwidth(),
179 simd_capabilities: PlatformCapabilities::detect(),
180 gpu_devices: Self::detect_gpu_devices(),
181 },
182 strategy: ParallelStrategy::Adaptive,
183 memory: MemoryConfig {
184 system_ram,
185 memory_limit: Some(system_ram * 3 / 4), enable_out_of_core: true,
187 out_of_core_chunksize: 1024 * 1024 * 1024, enable_memory_mapping: true,
189 memory_poolsize: system_ram / 8,
190 enable_gc: true,
191 },
192 optimization: OptimizationConfig {
193 adaptive_load_balancing: true,
194 work_stealing: true,
195 cache_aware_scheduling: true,
196 numa_aware_allocation: true,
197 dynamic_thread_scaling: true,
198 monitoring_interval: Duration::from_millis(100),
199 optimization_aggressiveness: 0.8,
200 },
201 fault_tolerance: FaultToleranceConfig {
202 enable_checkpointing: false, checkpoint_interval: Duration::from_secs(60),
204 enable_retry: true,
205 max_retries: 3,
206 enable_degradation: true,
207 health_check_interval: Duration::from_secs(10),
208 },
209 gpu: GpuConfig {
210 enable_gpu: false, preferred_device: None,
212 gpu_memory_limit: None,
213 transfer_threshold: 1024 * 1024, enable_unified_memory: false,
215 stream_count: 4,
216 },
217 }
218 }
219}
220
221impl AdvancedParallelConfig {
222 fn detect_system_ram() -> usize {
224 #[cfg(target_os = "linux")]
226 {
227 use std::fs;
228 if let Ok(meminfo) = fs::read_to_string("/proc/meminfo") {
229 for line in meminfo.lines() {
230 if line.starts_with("MemTotal:") {
231 if let Some(kb_str) = line.split_whitespace().nth(1) {
232 if let Ok(kb) = kb_str.parse::<usize>() {
233 return kb * 1024; }
235 }
236 }
237 }
238 }
239 }
240
241 #[cfg(target_os = "windows")]
242 {
243 if let Ok(mem_str) = std::env::var("SCIRS_SYSTEM_RAM") {
246 if let Ok(mem_gb) = mem_str.parse::<usize>() {
247 return mem_gb * 1024 * 1024 * 1024;
248 }
249 }
250 }
251
252 #[cfg(target_os = "macos")]
253 {
254 if let Ok(mem_str) = std::env::var("SCIRS_SYSTEM_RAM") {
257 if let Ok(mem_gb) = mem_str.parse::<usize>() {
258 return mem_gb * 1024 * 1024 * 1024;
259 }
260 }
261 }
262
263 let num_cores = num_threads().max(1);
265
266 if num_cores >= 16 {
267 32 * 1024 * 1024 * 1024 } else if num_cores >= 8 {
269 16 * 1024 * 1024 * 1024 } else if num_cores >= 4 {
271 8 * 1024 * 1024 * 1024 } else {
273 4 * 1024 * 1024 * 1024 }
275 }
276
277 fn detect_numa_nodes() -> usize {
279 #[cfg(target_os = "linux")]
281 {
282 use std::fs;
283
284 if let Ok(entries) = fs::read_dir("/sys/devices/system/node") {
286 let mut numa_count = 0;
287 for entry in entries {
288 if let Ok(entry) = entry {
289 let name = entry.file_name();
290 if let Some(name_str) = name.to_str() {
291 if name_str.starts_with("node")
292 && name_str[4..].parse::<usize>().is_ok()
293 {
294 numa_count += 1;
295 }
296 }
297 }
298 }
299 if numa_count > 0 {
300 return numa_count;
301 }
302 }
303
304 if let Ok(output) = std::process::Command::new("lscpu").output() {
306 if let Ok(output_str) = String::from_utf8(output.stdout) {
307 for line in output_str.lines() {
308 if line.contains("NUMA node(s):") {
309 if let Some(numa_str) = line.split(':').nth(1) {
310 if let Ok(numa_count) = numa_str.trim().parse::<usize>() {
311 return numa_count;
312 }
313 }
314 }
315 }
316 }
317 }
318 }
319
320 let num_cores = num_threads();
322 if num_cores >= 32 {
323 4 } else if num_cores >= 16 {
325 2 } else {
327 1 }
329 }
330
331 fn detect_cachesizes() -> CacheSizes {
333 #[cfg(target_os = "linux")]
335 {
336 use std::fs;
337
338 let mut l1data = 32 * 1024;
339 let mut l1_instruction = 32 * 1024;
340 let mut l2_unified = 256 * 1024;
341 let mut l3_shared = 8 * 1024 * 1024;
342
343 if let Ok(entries) = fs::read_dir("/sys/devices/system/cpu/cpu0/cache") {
345 for entry in entries {
346 if let Ok(entry) = entry {
347 let cache_path = entry.path();
348
349 if let Ok(level_str) = fs::read_to_string(cache_path.join("level")) {
351 if let Ok(level) = level_str.trim().parse::<u32>() {
352 if let Ok(size_str) = fs::read_to_string(cache_path.join("size")) {
354 let size_str = size_str.trim();
355 let size = if size_str.ends_with('K') {
356 size_str[..size_str.len() - 1].parse::<usize>().unwrap_or(0)
357 * 1024
358 } else if size_str.ends_with('M') {
359 size_str[..size_str.len() - 1].parse::<usize>().unwrap_or(0)
360 * 1024
361 * 1024
362 } else {
363 size_str.parse::<usize>().unwrap_or(0)
364 };
365
366 if let Ok(type_str) =
368 fs::read_to_string(cache_path.join("type"))
369 {
370 match (level, type_str.trim()) {
371 (1, "Data") => l1data = size,
372 (1, "Instruction") => l1_instruction = size,
373 (2, "Unified") => l2_unified = size,
374 (3, "Unified") => l3_shared = size,
375 _ => {} }
377 }
378 }
379 }
380 }
381 }
382 }
383 }
384
385 CacheSizes {
386 l1data,
387 l1_instruction,
388 l2_unified,
389 l3_shared,
390 }
391 }
392
393 #[cfg(not(target_os = "linux"))]
394 {
395 let num_cores = num_threads();
397
398 if num_cores >= 16 {
400 CacheSizes {
402 l1data: 48 * 1024, l1_instruction: 32 * 1024, l2_unified: 512 * 1024, l3_shared: 32 * 1024 * 1024, }
407 } else if num_cores >= 8 {
408 CacheSizes {
410 l1data: 32 * 1024, l1_instruction: 32 * 1024, l2_unified: 256 * 1024, l3_shared: 16 * 1024 * 1024, }
415 } else {
416 CacheSizes {
418 l1data: 32 * 1024, l1_instruction: 32 * 1024, l2_unified: 256 * 1024, l3_shared: 6 * 1024 * 1024, }
423 }
424 }
425 }
426
427 fn detect_memory_bandwidth() -> f64 {
429 let testsize = 64 * 1024 * 1024; let iterations = 10;
432
433 let mut total_bandwidth = 0.0;
434 let mut successful_tests = 0;
435
436 for _ in 0..iterations {
437 if let Some(bandwidth) = Self::measure_memory_bandwidth(testsize) {
438 total_bandwidth += bandwidth;
439 successful_tests += 1;
440 }
441 }
442
443 if successful_tests > 0 {
444 let avg_bandwidth = total_bandwidth / successful_tests as f64;
445 avg_bandwidth.min(200.0) } else {
448 let num_cores = num_threads();
450 if num_cores >= 16 {
451 100.0 } else if num_cores >= 8 {
453 50.0 } else {
455 25.6 }
457 }
458 }
459
460 fn measure_memory_bandwidth(size: usize) -> Option<f64> {
462 use std::time::Instant;
463
464 let source = vec![1.0f64; size / 8]; let mut dest = vec![0.0f64; size / 8];
467
468 for i in 0..source.len().min(1000) {
470 dest[i] = source[i];
471 }
472
473 let start = Instant::now();
475
476 for _ in 0..4 {
478 dest.copy_from_slice(&source);
479 std::hint::black_box(&dest);
481 }
482
483 let duration = start.elapsed();
484
485 if duration.as_nanos() > 0 {
486 let bytes_transferred = (size * 4 * 2) as f64; let seconds = duration.as_secs_f64();
488 let bandwidth_gbps = (bytes_transferred / seconds) / (1024.0 * 1024.0 * 1024.0);
489 Some(bandwidth_gbps)
490 } else {
491 None
492 }
493 }
494
495 fn detect_gpu_devices() -> Vec<GpuDevice> {
497 vec![]
499 }
500}
501
502pub struct AdvancedParallelProcessor<F> {
504 config: AdvancedParallelConfig,
505 thread_pool: Option<ThreadPool>,
506 performance_monitor: Arc<PerformanceMonitor>,
507 memory_manager: Arc<MemoryManager>,
508 gpu_context: Option<GpuContext>,
509 _phantom: PhantomData<F>,
510}
511
512pub struct ThreadPool {
514 workers: Vec<Worker>,
515 work_queue: Arc<Mutex<VecDeque<Task>>>,
516 shutdown: Arc<AtomicBool>,
517 active_workers: Arc<AtomicUsize>,
518}
519
520pub struct Worker {
522 id: usize,
523 thread: Option<thread::JoinHandle<()>>,
524 local_queue: VecDeque<Task>,
525 numa_node: Option<usize>,
526}
527
528pub struct Task {
530 id: u64,
531 priority: u8,
532 complexity: f64,
533 datasize: usize,
534 function: Box<dyn FnOnce() -> TaskResult + Send>,
535}
536
537#[derive(Debug)]
539pub struct TaskResult {
540 pub success: bool,
541 pub execution_time: Duration,
542 pub memory_used: usize,
543 pub error: Option<String>,
544}
545
546pub struct PerformanceMonitor {
548 metrics: RwLock<PerformanceMetrics>,
549 history: RwLock<VecDeque<PerformanceSnapshot>>,
550 monitoring_active: AtomicBool,
551}
552
553#[derive(Debug, Clone)]
555pub struct MemoryUsageStats {
556 pub current_allocated: usize,
558 pub peak_allocated: usize,
560 pub total_allocations: usize,
562 pub total_deallocations: usize,
564 pub fragmentation_ratio: f64,
566}
567
568#[derive(Debug, Clone)]
570pub struct PerformanceMetrics {
571 pub throughput_ops_per_sec: f64,
572 pub cpu_utilization: f64,
573 pub memory_utilization: f64,
574 pub cache_hit_ratio: f64,
575 pub load_balance_factor: f64,
576 pub average_task_time: Duration,
577 pub active_threads: usize,
578 pub completed_tasks: u64,
579 pub failed_tasks: u64,
580}
581
582#[derive(Debug, Clone)]
584pub struct PerformanceSnapshot {
585 pub timestamp: Instant,
586 pub metrics: PerformanceMetrics,
587}
588
589pub struct MemoryManager {
591 allocated_memory: AtomicUsize,
592 peak_memory: AtomicUsize,
593 memory_pools: RwLock<HashMap<usize, MemoryPool>>,
594 gc_enabled: AtomicBool,
595}
596
597pub struct MemoryPool {
599 chunksize: usize,
600 available_chunks: Mutex<Vec<*mut u8>>,
601 total_chunks: AtomicUsize,
602}
603
604pub struct GpuContext {
606 device_id: usize,
607 available_memory: usize,
608 stream_handles: Vec<GpuStream>,
609 unified_memory_enabled: bool,
610}
611
612pub struct GpuStream {
614 stream_id: usize,
615 active: AtomicBool,
616 pending_operations: AtomicUsize,
617}
618
619impl<F> AdvancedParallelProcessor<F>
620where
621 F: Float
622 + NumCast
623 + SimdUnifiedOps
624 + Zero
625 + One
626 + PartialOrd
627 + Copy
628 + Send
629 + Sync
630 + 'static
631 + std::fmt::Display
632 + scirs2_core::ndarray::ScalarOperand,
633{
634 pub fn new() -> Self {
636 Self::with_config(AdvancedParallelConfig::default())
637 }
638
639 pub fn with_config(config: AdvancedParallelConfig) -> Self {
641 let performance_monitor = Arc::new(PerformanceMonitor::new());
642 let memory_manager = Arc::new(MemoryManager::new(&config.memory));
643
644 let thread_pool = if config.hardware.cpu_cores > 1 {
645 Some(ThreadPool::new(&config))
646 } else {
647 None
648 };
649
650 let gpu_context = if config.gpu.enable_gpu {
651 GpuContext::new(&config.gpu).ok()
652 } else {
653 None
654 };
655
656 Self {
657 config,
658 thread_pool,
659 performance_monitor,
660 memory_manager,
661 gpu_context: None,
662 _phantom: PhantomData,
663 }
664 }
665
666 pub fn process_massivedataset<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
668 where
669 T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
670 R: Send + Sync + 'static,
671 {
672 let strategy = self.select_optimal_strategy(data)?;
674
675 match strategy {
676 ParallelStrategy::CpuOptimal => self.process_cpu_optimal(data, operation),
677 ParallelStrategy::CpuSimd => self.process_cpu_simd(data, operation),
678 ParallelStrategy::HybridCpuGpu => self.process_hybrid_cpu_gpu(data, operation),
679 ParallelStrategy::GpuPrimary => self.process_gpu_primary(data, operation),
680 ParallelStrategy::Distributed => self.process_distributed(data, operation),
681 ParallelStrategy::Adaptive => self.process_adaptive(data, operation),
682 }
683 }
684
685 fn select_optimal_strategy(&self, data: &ArrayView2<F>) -> StatsResult<ParallelStrategy> {
687 let datasize = data.len() * std::mem::size_of::<F>();
688 let (rows, cols) = data.dim();
689
690 if datasize > self.config.memory.system_ram {
692 Ok(ParallelStrategy::CpuOptimal)
694 } else if self.config.gpu.enable_gpu && datasize > self.config.gpu.transfer_threshold {
695 Ok(ParallelStrategy::HybridCpuGpu)
697 } else if rows * cols > 1_000_000 {
698 Ok(ParallelStrategy::CpuSimd)
700 } else {
701 Ok(ParallelStrategy::CpuOptimal)
703 }
704 }
705
706 fn process_cpu_optimal<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
708 where
709 T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
710 R: Send + Sync + 'static,
711 {
712 let (rows, cols) = data.dim();
713 let num_threads = self.config.hardware.cpu_cores;
714 let chunksize = rows.div_ceil(num_threads);
715
716 let results: Vec<_> = (0..num_threads)
718 .into_par_iter()
719 .map(|thread_id| {
720 let start_row = thread_id * chunksize;
721 let end_row = ((thread_id + 1) * chunksize).min(rows);
722
723 if start_row < rows {
724 let chunk = data.slice(scirs2_core::ndarray::s![start_row..end_row, ..]);
725 operation(&chunk)
726 } else {
727 Err(StatsError::InvalidArgument("Empty chunk".to_string()))
729 }
730 })
731 .filter_map(|result| result.ok())
732 .collect();
733
734 results.into_iter().next().ok_or_else(|| {
737 StatsError::ComputationError("No successful parallel results".to_string())
738 })
739 }
740
741 fn process_cpu_simd<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
743 where
744 T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
745 R: Send + Sync + 'static,
746 {
747 let _simd_processor =
749 crate::simd_comprehensive::AdvancedComprehensiveSimdProcessor::<F>::new();
750
751 operation(data)
754 }
755
756 fn process_hybrid_cpu_gpu<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
758 where
759 T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
760 R: Send + Sync + 'static,
761 {
762 if let Some(_gpu_context) = &self.gpu_context {
763 self.process_cpu_optimal(data, operation)
766 } else {
767 self.process_cpu_optimal(data, operation)
768 }
769 }
770
771 fn process_gpu_primary<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
773 where
774 T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
775 R: Send + Sync + 'static,
776 {
777 self.process_cpu_optimal(data, operation)
779 }
780
781 fn process_distributed<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
783 where
784 T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
785 R: Send + Sync + 'static,
786 {
787 self.process_cpu_optimal(data, operation)
789 }
790
791 fn process_adaptive<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
793 where
794 T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
795 R: Send + Sync + 'static,
796 {
797 let start_time = Instant::now();
799 let result = self.process_cpu_optimal(data, operation)?;
800 let duration = start_time.elapsed();
801
802 self.performance_monitor
804 .update_metrics(duration, data.len());
805
806 Ok(result)
807 }
808
809 pub fn get_performance_metrics(&self) -> PerformanceMetrics {
811 self.performance_monitor.get_current_metrics()
812 }
813
814 pub fn get_config(&self) -> &AdvancedParallelConfig {
816 &self.config
817 }
818
819 pub fn update_config(&mut self, config: AdvancedParallelConfig) {
821 self.config = config;
822 }
823}
824
825impl PerformanceMonitor {
826 fn new() -> Self {
827 Self {
828 metrics: RwLock::new(PerformanceMetrics::default()),
829 history: RwLock::new(VecDeque::new()),
830 monitoring_active: AtomicBool::new(true),
831 }
832 }
833
834 fn update_metrics(&self, execution_time: Duration, datasize: usize) {
835 if let Ok(mut metrics) = self.metrics.write() {
836 metrics.completed_tasks += 1;
837 metrics.average_task_time = execution_time;
838
839 let ops_per_sec = if execution_time.as_secs_f64() > 0.0 {
841 datasize as f64 / execution_time.as_secs_f64()
842 } else {
843 0.0
844 };
845 metrics.throughput_ops_per_sec = ops_per_sec;
846 }
847 }
848
849 fn get_current_metrics(&self) -> PerformanceMetrics {
850 self.metrics.read().unwrap().clone()
851 }
852}
853
854impl Default for PerformanceMetrics {
855 fn default() -> Self {
856 Self {
857 throughput_ops_per_sec: 0.0,
858 cpu_utilization: 0.0,
859 memory_utilization: 0.0,
860 cache_hit_ratio: 0.0,
861 load_balance_factor: 1.0,
862 average_task_time: Duration::from_secs(0),
863 active_threads: 0,
864 completed_tasks: 0,
865 failed_tasks: 0,
866 }
867 }
868}
869
870impl<F> Default for AdvancedParallelProcessor<F>
871where
872 F: Float
873 + NumCast
874 + SimdUnifiedOps
875 + Zero
876 + One
877 + PartialOrd
878 + Copy
879 + Send
880 + Sync
881 + 'static
882 + std::fmt::Display
883 + scirs2_core::ndarray::ScalarOperand,
884{
885 fn default() -> Self {
886 Self::new()
887 }
888}
889
890pub type F64AdvancedParallelProcessor = AdvancedParallelProcessor<f64>;
892pub type F32AdvancedParallelProcessor = AdvancedParallelProcessor<f32>;
893
894#[allow(dead_code)]
896pub fn create_advanced_parallel_processor<F>() -> AdvancedParallelProcessor<F>
897where
898 F: Float
899 + NumCast
900 + SimdUnifiedOps
901 + Zero
902 + One
903 + PartialOrd
904 + Copy
905 + Send
906 + Sync
907 + 'static
908 + std::fmt::Display
909 + scirs2_core::ndarray::ScalarOperand,
910{
911 AdvancedParallelProcessor::new()
912}
913
914#[allow(dead_code)]
915pub fn create_optimized_parallel_processor<F>(
916 config: AdvancedParallelConfig,
917) -> AdvancedParallelProcessor<F>
918where
919 F: Float
920 + NumCast
921 + SimdUnifiedOps
922 + Zero
923 + One
924 + PartialOrd
925 + Copy
926 + Send
927 + Sync
928 + 'static
929 + std::fmt::Display
930 + scirs2_core::ndarray::ScalarOperand,
931{
932 AdvancedParallelProcessor::with_config(config)
933}
934
935unsafe impl Send for MemoryPool {}
937unsafe impl Sync for MemoryPool {}
938
939#[cfg(test)]
940mod tests {
941 use super::*;
942 use scirs2_core::ndarray::Array2;
943
944 #[test]
945 fn test_advanced_parallel_config_default() {
946 let config = AdvancedParallelConfig::default();
947 assert!(config.hardware.cpu_cores > 0);
948 assert!(config.memory.system_ram > 0);
949 }
950
951 #[test]
952 fn test_memory_bandwidth_detection() {
953 let bandwidth = AdvancedParallelConfig::detect_memory_bandwidth();
954 assert!(bandwidth > 0.0);
955 assert!(bandwidth < 1000.0); }
957
958 #[test]
959 fn test_cachesize_detection() {
960 let cachesizes = AdvancedParallelConfig::detect_cachesizes();
961 assert!(cachesizes.l1data > 0);
962 assert!(cachesizes.l2_unified > cachesizes.l1data);
963 assert!(cachesizes.l3_shared > cachesizes.l2_unified);
964 }
965
966 #[test]
967 fn test_numa_detection() {
968 let numa_nodes = AdvancedParallelConfig::detect_numa_nodes();
969 assert!(numa_nodes > 0);
970 assert!(numa_nodes <= 16); }
972
973 #[test]
974 fn test_advanced_parallel_processor_creation() {
975 let processor = AdvancedParallelProcessor::<f64>::new();
976 assert!(processor.config.hardware.cpu_cores > 0);
977 }
978
979 #[test]
980 #[ignore = "timeout"]
981 fn test_strategy_selection() {
982 let processor = AdvancedParallelProcessor::<f64>::new();
983 let smalldata = Array2::<f64>::zeros((10, 10));
984 let strategy = processor
985 .select_optimal_strategy(&smalldata.view())
986 .unwrap();
987
988 assert!(matches!(strategy, ParallelStrategy::CpuOptimal));
990 }
991
992 #[test]
993 #[ignore = "timeout"]
994 fn test_performance_monitor() {
995 let monitor = PerformanceMonitor::new();
996 let metrics = monitor.get_current_metrics();
997 assert_eq!(metrics.completed_tasks, 0);
998 }
999
1000 #[test]
1001 fn test_memory_manager() {
1002 let config = MemoryConfig::default();
1003 let manager = MemoryManager::new(&config);
1004 assert_eq!(manager.allocated_memory.load(Ordering::Relaxed), 0);
1005 }
1006}
1007
1008impl MemoryManager {
1009 fn new(config: &MemoryConfig) -> Self {
1010 Self {
1011 allocated_memory: AtomicUsize::new(0),
1012 peak_memory: AtomicUsize::new(0),
1013 memory_pools: RwLock::new(HashMap::new()),
1014 gc_enabled: AtomicBool::new(config.enable_gc),
1015 }
1016 }
1017
1018 fn get_usage_stats(&self) -> MemoryUsageStats {
1019 MemoryUsageStats {
1020 current_allocated: self.allocated_memory.load(Ordering::Acquire),
1021 peak_allocated: self.peak_memory.load(Ordering::Acquire),
1022 total_allocations: 0, total_deallocations: 0, fragmentation_ratio: 0.0, }
1026 }
1027}
1028
1029impl ThreadPool {
1030 fn new(config: &AdvancedParallelConfig) -> Self {
1031 let num_workers = config.hardware.cpu_cores;
1032 let work_queue = Arc::new(Mutex::new(VecDeque::new()));
1033 let shutdown = Arc::new(AtomicBool::new(false));
1034 let active_workers = Arc::new(AtomicUsize::new(0));
1035
1036 let workers = (0..num_workers)
1037 .map(|id| Worker::new(id, work_queue.clone(), shutdown.clone()))
1038 .collect();
1039
1040 Self {
1041 workers,
1042 work_queue,
1043 shutdown,
1044 active_workers,
1045 }
1046 }
1047}
1048
1049impl Worker {
1050 fn new(
1051 _id: usize,
1052 _work_queue: Arc<Mutex<VecDeque<Task>>>,
1053 _shutdown: Arc<AtomicBool>,
1054 ) -> Self {
1055 Self {
1056 id: _id,
1057 thread: None, local_queue: VecDeque::new(),
1059 numa_node: None,
1060 }
1061 }
1062}
1063
1064impl GpuContext {
1065 fn new(config: &GpuConfig) -> Result<Self, String> {
1066 Ok(Self {
1068 device_id: config.preferred_device.unwrap_or(0),
1069 available_memory: config.gpu_memory_limit.unwrap_or(1024 * 1024 * 1024),
1070 stream_handles: Vec::new(),
1071 unified_memory_enabled: config.enable_unified_memory,
1072 })
1073 }
1074}