1use crate::error::{StatsError, StatsResult};
13use scirs2_core::ndarray::ArrayView2;
14use scirs2_core::numeric::{Float, NumCast, One, Zero};
15use scirs2_core::{
16 parallel_ops::*,
17 simd_ops::{PlatformCapabilities, SimdUnifiedOps},
18};
19use std::collections::{HashMap, VecDeque};
20use std::marker::PhantomData;
21use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22use std::sync::{Arc, Mutex, RwLock};
23use std::thread;
24use std::time::{Duration, Instant};
25
26#[derive(Debug, Clone)]
28pub struct AdvancedParallelConfig {
29 pub hardware: HardwareConfig,
31 pub strategy: ParallelStrategy,
33 pub memory: MemoryConfig,
35 pub optimization: OptimizationConfig,
37 pub fault_tolerance: FaultToleranceConfig,
39 pub gpu: GpuConfig,
41}
42
43#[derive(Debug, Clone)]
45pub struct HardwareConfig {
46 pub cpu_cores: usize,
48 pub numa_nodes: usize,
50 pub cachesizes: CacheSizes,
52 pub memory_bandwidth: f64,
54 pub simd_capabilities: PlatformCapabilities,
56 pub gpu_devices: Vec<GpuDevice>,
58}
59
60#[derive(Debug, Clone)]
62pub struct CacheSizes {
63 pub l1data: usize,
64 pub l1_instruction: usize,
65 pub l2_unified: usize,
66 pub l3_shared: usize,
67}
68
69#[derive(Debug, Clone)]
71pub struct GpuDevice {
72 pub device_id: usize,
73 pub memory_gb: f64,
74 pub compute_capability: f64,
75 pub multiprocessors: usize,
76 pub max_threads_per_block: usize,
77}
78
79#[derive(Debug, Clone, Copy)]
81pub enum ParallelStrategy {
82 CpuOptimal,
84 CpuSimd,
86 HybridCpuGpu,
88 GpuPrimary,
90 Distributed,
92 Adaptive,
94}
95
96#[derive(Debug, Clone, Default)]
98pub struct MemoryConfig {
99 pub system_ram: usize,
101 pub memory_limit: Option<usize>,
103 pub enable_out_of_core: bool,
105 pub out_of_core_chunksize: usize,
107 pub enable_memory_mapping: bool,
109 pub memory_poolsize: usize,
111 pub enable_gc: bool,
113}
114
115#[derive(Debug, Clone)]
117pub struct OptimizationConfig {
118 pub adaptive_load_balancing: bool,
120 pub work_stealing: bool,
122 pub cache_aware_scheduling: bool,
124 pub numa_aware_allocation: bool,
126 pub dynamic_thread_scaling: bool,
128 pub monitoring_interval: Duration,
130 pub optimization_aggressiveness: f64,
132}
133
134#[derive(Debug, Clone)]
136pub struct FaultToleranceConfig {
137 pub enable_checkpointing: bool,
139 pub checkpoint_interval: Duration,
141 pub enable_retry: bool,
143 pub max_retries: usize,
145 pub enable_degradation: bool,
147 pub health_check_interval: Duration,
149}
150
151#[derive(Debug, Clone)]
153pub struct GpuConfig {
154 pub enable_gpu: bool,
156 pub preferred_device: Option<usize>,
158 pub gpu_memory_limit: Option<usize>,
160 pub transfer_threshold: usize,
162 pub enable_unified_memory: bool,
164 pub stream_count: usize,
166}
167
168impl Default for AdvancedParallelConfig {
169 fn default() -> Self {
170 let cpu_cores = num_threads();
171 let system_ram = Self::detect_system_ram();
172
173 Self {
174 hardware: HardwareConfig {
175 cpu_cores,
176 numa_nodes: Self::detect_numa_nodes(),
177 cachesizes: Self::detect_cachesizes(),
178 memory_bandwidth: Self::detect_memory_bandwidth(),
179 simd_capabilities: PlatformCapabilities::detect(),
180 gpu_devices: Self::detect_gpu_devices(),
181 },
182 strategy: ParallelStrategy::Adaptive,
183 memory: MemoryConfig {
184 system_ram,
185 memory_limit: Some(system_ram * 3 / 4), enable_out_of_core: true,
187 out_of_core_chunksize: {
188 #[cfg(target_pointer_width = "32")]
189 {
190 64 * 1024 * 1024
191 } #[cfg(target_pointer_width = "64")]
193 {
194 1024 * 1024 * 1024
195 } },
197 enable_memory_mapping: true,
198 memory_poolsize: system_ram / 8,
199 enable_gc: true,
200 },
201 optimization: OptimizationConfig {
202 adaptive_load_balancing: true,
203 work_stealing: true,
204 cache_aware_scheduling: true,
205 numa_aware_allocation: true,
206 dynamic_thread_scaling: true,
207 monitoring_interval: Duration::from_millis(100),
208 optimization_aggressiveness: 0.8,
209 },
210 fault_tolerance: FaultToleranceConfig {
211 enable_checkpointing: false, checkpoint_interval: Duration::from_secs(60),
213 enable_retry: true,
214 max_retries: 3,
215 enable_degradation: true,
216 health_check_interval: Duration::from_secs(10),
217 },
218 gpu: GpuConfig {
219 enable_gpu: false, preferred_device: None,
221 gpu_memory_limit: None,
222 transfer_threshold: 1024 * 1024, enable_unified_memory: false,
224 stream_count: 4,
225 },
226 }
227 }
228}
229
230impl AdvancedParallelConfig {
231 fn detect_system_ram() -> usize {
233 #[cfg(target_os = "linux")]
235 {
236 use std::fs;
237 if let Ok(meminfo) = fs::read_to_string("/proc/meminfo") {
238 for line in meminfo.lines() {
239 if line.starts_with("MemTotal:") {
240 if let Some(kb_str) = line.split_whitespace().nth(1) {
241 if let Ok(kb) = kb_str.parse::<usize>() {
242 return kb * 1024; }
244 }
245 }
246 }
247 }
248 }
249
250 #[cfg(target_os = "windows")]
251 {
252 if let Ok(mem_str) = std::env::var("SCIRS_SYSTEM_RAM") {
255 if let Ok(mem_gb) = mem_str.parse::<usize>() {
256 #[cfg(target_pointer_width = "32")]
257 {
258 return (mem_gb * 1024 * 1024 * 1024).min(2 * 1024 * 1024 * 1024);
259 }
260 #[cfg(target_pointer_width = "64")]
261 {
262 return mem_gb * 1024 * 1024 * 1024;
263 }
264 }
265 }
266 }
267
268 #[cfg(target_os = "macos")]
269 {
270 if let Ok(mem_str) = std::env::var("SCIRS_SYSTEM_RAM") {
273 if let Ok(mem_gb) = mem_str.parse::<usize>() {
274 #[cfg(target_pointer_width = "32")]
275 {
276 return (mem_gb * 1024 * 1024 * 1024).min(2 * 1024 * 1024 * 1024);
277 }
278 #[cfg(target_pointer_width = "64")]
279 {
280 return mem_gb * 1024 * 1024 * 1024;
281 }
282 }
283 }
284 }
285
286 let num_cores = num_threads().max(1);
288
289 #[cfg(target_pointer_width = "32")]
290 {
291 if num_cores >= 16 {
292 2 * 1024 * 1024 * 1024 } else if num_cores >= 8 {
294 1024 * 1024 * 1024 } else if num_cores >= 4 {
296 512 * 1024 * 1024 } else {
298 256 * 1024 * 1024 }
300 }
301 #[cfg(target_pointer_width = "64")]
302 {
303 if num_cores >= 16 {
304 32usize * 1024 * 1024 * 1024 } else if num_cores >= 8 {
306 16usize * 1024 * 1024 * 1024 } else if num_cores >= 4 {
308 8usize * 1024 * 1024 * 1024 } else {
310 4usize * 1024 * 1024 * 1024 }
312 }
313 }
314
315 fn detect_numa_nodes() -> usize {
317 #[cfg(target_os = "linux")]
319 {
320 use std::fs;
321
322 if let Ok(entries) = fs::read_dir("/sys/devices/system/node") {
324 let mut numa_count = 0;
325 for entry in entries {
326 if let Ok(entry) = entry {
327 let name = entry.file_name();
328 if let Some(name_str) = name.to_str() {
329 if name_str.starts_with("node")
330 && name_str[4..].parse::<usize>().is_ok()
331 {
332 numa_count += 1;
333 }
334 }
335 }
336 }
337 if numa_count > 0 {
338 return numa_count;
339 }
340 }
341
342 if let Ok(output) = std::process::Command::new("lscpu").output() {
344 if let Ok(output_str) = String::from_utf8(output.stdout) {
345 for line in output_str.lines() {
346 if line.contains("NUMA node(s):") {
347 if let Some(numa_str) = line.split(':').nth(1) {
348 if let Ok(numa_count) = numa_str.trim().parse::<usize>() {
349 return numa_count;
350 }
351 }
352 }
353 }
354 }
355 }
356 }
357
358 let num_cores = num_threads();
360 if num_cores >= 32 {
361 4 } else if num_cores >= 16 {
363 2 } else {
365 1 }
367 }
368
369 fn detect_cachesizes() -> CacheSizes {
371 #[cfg(target_os = "linux")]
373 {
374 use std::fs;
375
376 let mut l1data = 32 * 1024;
377 let mut l1_instruction = 32 * 1024;
378 let mut l2_unified = 256 * 1024;
379 let mut l3_shared = 8 * 1024 * 1024;
380
381 if let Ok(entries) = fs::read_dir("/sys/devices/system/cpu/cpu0/cache") {
383 for entry in entries {
384 if let Ok(entry) = entry {
385 let cache_path = entry.path();
386
387 if let Ok(level_str) = fs::read_to_string(cache_path.join("level")) {
389 if let Ok(level) = level_str.trim().parse::<u32>() {
390 if let Ok(size_str) = fs::read_to_string(cache_path.join("size")) {
392 let size_str = size_str.trim();
393 let size = if size_str.ends_with('K') {
394 size_str[..size_str.len() - 1].parse::<usize>().unwrap_or(0)
395 * 1024
396 } else if size_str.ends_with('M') {
397 size_str[..size_str.len() - 1].parse::<usize>().unwrap_or(0)
398 * 1024
399 * 1024
400 } else {
401 size_str.parse::<usize>().unwrap_or(0)
402 };
403
404 if let Ok(type_str) =
406 fs::read_to_string(cache_path.join("type"))
407 {
408 match (level, type_str.trim()) {
409 (1, "Data") => l1data = size,
410 (1, "Instruction") => l1_instruction = size,
411 (2, "Unified") => l2_unified = size,
412 (3, "Unified") => l3_shared = size,
413 _ => {} }
415 }
416 }
417 }
418 }
419 }
420 }
421 }
422
423 CacheSizes {
424 l1data,
425 l1_instruction,
426 l2_unified,
427 l3_shared,
428 }
429 }
430
431 #[cfg(not(target_os = "linux"))]
432 {
433 let num_cores = num_threads();
435
436 if num_cores >= 16 {
438 CacheSizes {
440 l1data: 48 * 1024, l1_instruction: 32 * 1024, l2_unified: 512 * 1024, l3_shared: 32 * 1024 * 1024, }
445 } else if num_cores >= 8 {
446 CacheSizes {
448 l1data: 32 * 1024, l1_instruction: 32 * 1024, l2_unified: 256 * 1024, l3_shared: 16 * 1024 * 1024, }
453 } else {
454 CacheSizes {
456 l1data: 32 * 1024, l1_instruction: 32 * 1024, l2_unified: 256 * 1024, l3_shared: 6 * 1024 * 1024, }
461 }
462 }
463 }
464
465 fn detect_memory_bandwidth() -> f64 {
467 let testsize = 64 * 1024 * 1024; let iterations = 10;
470
471 let mut total_bandwidth = 0.0;
472 let mut successful_tests = 0;
473
474 for _ in 0..iterations {
475 if let Some(bandwidth) = Self::measure_memory_bandwidth(testsize) {
476 total_bandwidth += bandwidth;
477 successful_tests += 1;
478 }
479 }
480
481 if successful_tests > 0 {
482 let avg_bandwidth = total_bandwidth / successful_tests as f64;
483 avg_bandwidth.min(200.0) } else {
486 let num_cores = num_threads();
488 if num_cores >= 16 {
489 100.0 } else if num_cores >= 8 {
491 50.0 } else {
493 25.6 }
495 }
496 }
497
498 fn measure_memory_bandwidth(size: usize) -> Option<f64> {
500 use std::time::Instant;
501
502 let source = vec![1.0f64; size / 8]; let mut dest = vec![0.0f64; size / 8];
505
506 for i in 0..source.len().min(1000) {
508 dest[i] = source[i];
509 }
510
511 let start = Instant::now();
513
514 for _ in 0..4 {
516 dest.copy_from_slice(&source);
517 std::hint::black_box(&dest);
519 }
520
521 let duration = start.elapsed();
522
523 if duration.as_nanos() > 0 {
524 let bytes_transferred = (size * 4 * 2) as f64; let seconds = duration.as_secs_f64();
526 let bandwidth_gbps = (bytes_transferred / seconds) / (1024.0 * 1024.0 * 1024.0);
527 Some(bandwidth_gbps)
528 } else {
529 None
530 }
531 }
532
533 fn detect_gpu_devices() -> Vec<GpuDevice> {
535 vec![]
537 }
538}
539
540pub struct AdvancedParallelProcessor<F> {
542 config: AdvancedParallelConfig,
543 thread_pool: Option<ThreadPool>,
544 performance_monitor: Arc<PerformanceMonitor>,
545 memory_manager: Arc<MemoryManager>,
546 gpu_context: Option<GpuContext>,
547 _phantom: PhantomData<F>,
548}
549
550pub struct ThreadPool {
552 workers: Vec<Worker>,
553 work_queue: Arc<Mutex<VecDeque<Task>>>,
554 shutdown: Arc<AtomicBool>,
555 active_workers: Arc<AtomicUsize>,
556}
557
558pub struct Worker {
560 id: usize,
561 thread: Option<thread::JoinHandle<()>>,
562 local_queue: VecDeque<Task>,
563 numa_node: Option<usize>,
564}
565
566pub struct Task {
568 id: u64,
569 priority: u8,
570 complexity: f64,
571 datasize: usize,
572 function: Box<dyn FnOnce() -> TaskResult + Send>,
573}
574
575#[derive(Debug)]
577pub struct TaskResult {
578 pub success: bool,
579 pub execution_time: Duration,
580 pub memory_used: usize,
581 pub error: Option<String>,
582}
583
584pub struct PerformanceMonitor {
586 metrics: RwLock<PerformanceMetrics>,
587 history: RwLock<VecDeque<PerformanceSnapshot>>,
588 monitoring_active: AtomicBool,
589}
590
591#[derive(Debug, Clone)]
593pub struct MemoryUsageStats {
594 pub current_allocated: usize,
596 pub peak_allocated: usize,
598 pub total_allocations: usize,
600 pub total_deallocations: usize,
602 pub fragmentation_ratio: f64,
604}
605
606#[derive(Debug, Clone)]
608pub struct PerformanceMetrics {
609 pub throughput_ops_per_sec: f64,
610 pub cpu_utilization: f64,
611 pub memory_utilization: f64,
612 pub cache_hit_ratio: f64,
613 pub load_balance_factor: f64,
614 pub average_task_time: Duration,
615 pub active_threads: usize,
616 pub completed_tasks: u64,
617 pub failed_tasks: u64,
618}
619
620#[derive(Debug, Clone)]
622pub struct PerformanceSnapshot {
623 pub timestamp: Instant,
624 pub metrics: PerformanceMetrics,
625}
626
627pub struct MemoryManager {
629 allocated_memory: AtomicUsize,
630 peak_memory: AtomicUsize,
631 memory_pools: RwLock<HashMap<usize, MemoryPool>>,
632 gc_enabled: AtomicBool,
633}
634
635pub struct MemoryPool {
637 chunksize: usize,
638 available_chunks: Mutex<Vec<*mut u8>>,
639 total_chunks: AtomicUsize,
640}
641
642pub struct GpuContext {
644 device_id: usize,
645 available_memory: usize,
646 stream_handles: Vec<GpuStream>,
647 unified_memory_enabled: bool,
648}
649
650pub struct GpuStream {
652 stream_id: usize,
653 active: AtomicBool,
654 pending_operations: AtomicUsize,
655}
656
657impl<F> AdvancedParallelProcessor<F>
658where
659 F: Float
660 + NumCast
661 + SimdUnifiedOps
662 + Zero
663 + One
664 + PartialOrd
665 + Copy
666 + Send
667 + Sync
668 + 'static
669 + std::fmt::Display
670 + scirs2_core::ndarray::ScalarOperand,
671{
672 pub fn new() -> Self {
674 Self::with_config(AdvancedParallelConfig::default())
675 }
676
677 pub fn with_config(config: AdvancedParallelConfig) -> Self {
679 let performance_monitor = Arc::new(PerformanceMonitor::new());
680 let memory_manager = Arc::new(MemoryManager::new(&config.memory));
681
682 let thread_pool = if config.hardware.cpu_cores > 1 {
683 Some(ThreadPool::new(&config))
684 } else {
685 None
686 };
687
688 let gpu_context = if config.gpu.enable_gpu {
689 GpuContext::new(&config.gpu).ok()
690 } else {
691 None
692 };
693
694 Self {
695 config,
696 thread_pool,
697 performance_monitor,
698 memory_manager,
699 gpu_context: None,
700 _phantom: PhantomData,
701 }
702 }
703
704 pub fn process_massivedataset<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
706 where
707 T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
708 R: Send + Sync + 'static,
709 {
710 let strategy = self.select_optimal_strategy(data)?;
712
713 match strategy {
714 ParallelStrategy::CpuOptimal => self.process_cpu_optimal(data, operation),
715 ParallelStrategy::CpuSimd => self.process_cpu_simd(data, operation),
716 ParallelStrategy::HybridCpuGpu => self.process_hybrid_cpu_gpu(data, operation),
717 ParallelStrategy::GpuPrimary => self.process_gpu_primary(data, operation),
718 ParallelStrategy::Distributed => self.process_distributed(data, operation),
719 ParallelStrategy::Adaptive => self.process_adaptive(data, operation),
720 }
721 }
722
723 fn select_optimal_strategy(&self, data: &ArrayView2<F>) -> StatsResult<ParallelStrategy> {
725 let datasize = data.len() * std::mem::size_of::<F>();
726 let (rows, cols) = data.dim();
727
728 if datasize > self.config.memory.system_ram {
730 Ok(ParallelStrategy::CpuOptimal)
732 } else if self.config.gpu.enable_gpu && datasize > self.config.gpu.transfer_threshold {
733 Ok(ParallelStrategy::HybridCpuGpu)
735 } else if rows * cols > 1_000_000 {
736 Ok(ParallelStrategy::CpuSimd)
738 } else {
739 Ok(ParallelStrategy::CpuOptimal)
741 }
742 }
743
744 fn process_cpu_optimal<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
746 where
747 T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
748 R: Send + Sync + 'static,
749 {
750 let (rows, cols) = data.dim();
751 let num_threads = self.config.hardware.cpu_cores;
752 let chunksize = rows.div_ceil(num_threads);
753
754 let results: Vec<_> = (0..num_threads)
756 .into_par_iter()
757 .map(|thread_id| {
758 let start_row = thread_id * chunksize;
759 let end_row = ((thread_id + 1) * chunksize).min(rows);
760
761 if start_row < rows {
762 let chunk = data.slice(scirs2_core::ndarray::s![start_row..end_row, ..]);
763 operation(&chunk)
764 } else {
765 Err(StatsError::InvalidArgument("Empty chunk".to_string()))
767 }
768 })
769 .filter_map(|result| result.ok())
770 .collect();
771
772 results.into_iter().next().ok_or_else(|| {
775 StatsError::ComputationError("No successful parallel results".to_string())
776 })
777 }
778
779 fn process_cpu_simd<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
781 where
782 T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
783 R: Send + Sync + 'static,
784 {
785 let _simd_processor =
787 crate::simd_comprehensive::AdvancedComprehensiveSimdProcessor::<F>::new();
788
789 operation(data)
792 }
793
794 fn process_hybrid_cpu_gpu<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
796 where
797 T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
798 R: Send + Sync + 'static,
799 {
800 if let Some(_gpu_context) = &self.gpu_context {
801 self.process_cpu_optimal(data, operation)
804 } else {
805 self.process_cpu_optimal(data, operation)
806 }
807 }
808
809 fn process_gpu_primary<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
811 where
812 T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
813 R: Send + Sync + 'static,
814 {
815 self.process_cpu_optimal(data, operation)
817 }
818
819 fn process_distributed<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
821 where
822 T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
823 R: Send + Sync + 'static,
824 {
825 self.process_cpu_optimal(data, operation)
827 }
828
829 fn process_adaptive<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
831 where
832 T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
833 R: Send + Sync + 'static,
834 {
835 let start_time = Instant::now();
837 let result = self.process_cpu_optimal(data, operation)?;
838 let duration = start_time.elapsed();
839
840 self.performance_monitor
842 .update_metrics(duration, data.len());
843
844 Ok(result)
845 }
846
847 pub fn get_performance_metrics(&self) -> PerformanceMetrics {
849 self.performance_monitor.get_current_metrics()
850 }
851
852 pub fn get_config(&self) -> &AdvancedParallelConfig {
854 &self.config
855 }
856
857 pub fn update_config(&mut self, config: AdvancedParallelConfig) {
859 self.config = config;
860 }
861}
862
863impl PerformanceMonitor {
864 fn new() -> Self {
865 Self {
866 metrics: RwLock::new(PerformanceMetrics::default()),
867 history: RwLock::new(VecDeque::new()),
868 monitoring_active: AtomicBool::new(true),
869 }
870 }
871
872 fn update_metrics(&self, execution_time: Duration, datasize: usize) {
873 if let Ok(mut metrics) = self.metrics.write() {
874 metrics.completed_tasks += 1;
875 metrics.average_task_time = execution_time;
876
877 let ops_per_sec = if execution_time.as_secs_f64() > 0.0 {
879 datasize as f64 / execution_time.as_secs_f64()
880 } else {
881 0.0
882 };
883 metrics.throughput_ops_per_sec = ops_per_sec;
884 }
885 }
886
887 fn get_current_metrics(&self) -> PerformanceMetrics {
888 self.metrics.read().expect("Operation failed").clone()
889 }
890}
891
892impl Default for PerformanceMetrics {
893 fn default() -> Self {
894 Self {
895 throughput_ops_per_sec: 0.0,
896 cpu_utilization: 0.0,
897 memory_utilization: 0.0,
898 cache_hit_ratio: 0.0,
899 load_balance_factor: 1.0,
900 average_task_time: Duration::from_secs(0),
901 active_threads: 0,
902 completed_tasks: 0,
903 failed_tasks: 0,
904 }
905 }
906}
907
908impl<F> Default for AdvancedParallelProcessor<F>
909where
910 F: Float
911 + NumCast
912 + SimdUnifiedOps
913 + Zero
914 + One
915 + PartialOrd
916 + Copy
917 + Send
918 + Sync
919 + 'static
920 + std::fmt::Display
921 + scirs2_core::ndarray::ScalarOperand,
922{
923 fn default() -> Self {
924 Self::new()
925 }
926}
927
928pub type F64AdvancedParallelProcessor = AdvancedParallelProcessor<f64>;
930pub type F32AdvancedParallelProcessor = AdvancedParallelProcessor<f32>;
931
932#[allow(dead_code)]
934pub fn create_advanced_parallel_processor<F>() -> AdvancedParallelProcessor<F>
935where
936 F: Float
937 + NumCast
938 + SimdUnifiedOps
939 + Zero
940 + One
941 + PartialOrd
942 + Copy
943 + Send
944 + Sync
945 + 'static
946 + std::fmt::Display
947 + scirs2_core::ndarray::ScalarOperand,
948{
949 AdvancedParallelProcessor::new()
950}
951
952#[allow(dead_code)]
953pub fn create_optimized_parallel_processor<F>(
954 config: AdvancedParallelConfig,
955) -> AdvancedParallelProcessor<F>
956where
957 F: Float
958 + NumCast
959 + SimdUnifiedOps
960 + Zero
961 + One
962 + PartialOrd
963 + Copy
964 + Send
965 + Sync
966 + 'static
967 + std::fmt::Display
968 + scirs2_core::ndarray::ScalarOperand,
969{
970 AdvancedParallelProcessor::with_config(config)
971}
972
973unsafe impl Send for MemoryPool {}
975unsafe impl Sync for MemoryPool {}
976
977#[cfg(test)]
978mod tests {
979 use super::*;
980 use scirs2_core::ndarray::Array2;
981
982 #[test]
983 fn test_advanced_parallel_config_default() {
984 let config = AdvancedParallelConfig::default();
985 assert!(config.hardware.cpu_cores > 0);
986 assert!(config.memory.system_ram > 0);
987 }
988
989 #[test]
990 fn test_memory_bandwidth_detection() {
991 let bandwidth = AdvancedParallelConfig::detect_memory_bandwidth();
992 assert!(bandwidth > 0.0);
993 assert!(bandwidth < 1000.0); }
995
996 #[test]
997 fn test_cachesize_detection() {
998 let cachesizes = AdvancedParallelConfig::detect_cachesizes();
999 assert!(cachesizes.l1data > 0);
1000 assert!(cachesizes.l2_unified > cachesizes.l1data);
1001 assert!(cachesizes.l3_shared > cachesizes.l2_unified);
1002 }
1003
1004 #[test]
1005 fn test_numa_detection() {
1006 let numa_nodes = AdvancedParallelConfig::detect_numa_nodes();
1007 assert!(numa_nodes > 0);
1008 assert!(numa_nodes <= 16); }
1010
1011 #[test]
1012 fn test_advanced_parallel_processor_creation() {
1013 let processor = AdvancedParallelProcessor::<f64>::new();
1014 assert!(processor.config.hardware.cpu_cores > 0);
1015 }
1016
1017 #[test]
1018 fn test_strategy_selection() {
1019 let processor = AdvancedParallelProcessor::<f64>::new();
1020 let smalldata = Array2::<f64>::zeros((10, 10));
1021 let strategy = processor
1022 .select_optimal_strategy(&smalldata.view())
1023 .expect("Operation failed");
1024
1025 assert!(matches!(strategy, ParallelStrategy::CpuOptimal));
1027 }
1028
1029 #[test]
1030 fn test_performance_monitor() {
1031 let monitor = PerformanceMonitor::new();
1032 let metrics = monitor.get_current_metrics();
1033 assert_eq!(metrics.completed_tasks, 0);
1034 }
1035
1036 #[test]
1037 fn test_memory_manager() {
1038 let config = MemoryConfig::default();
1039 let manager = MemoryManager::new(&config);
1040 assert_eq!(manager.allocated_memory.load(Ordering::Relaxed), 0);
1041 }
1042}
1043
1044impl MemoryManager {
1045 fn new(config: &MemoryConfig) -> Self {
1046 Self {
1047 allocated_memory: AtomicUsize::new(0),
1048 peak_memory: AtomicUsize::new(0),
1049 memory_pools: RwLock::new(HashMap::new()),
1050 gc_enabled: AtomicBool::new(config.enable_gc),
1051 }
1052 }
1053
1054 fn get_usage_stats(&self) -> MemoryUsageStats {
1055 MemoryUsageStats {
1056 current_allocated: self.allocated_memory.load(Ordering::Acquire),
1057 peak_allocated: self.peak_memory.load(Ordering::Acquire),
1058 total_allocations: 0, total_deallocations: 0, fragmentation_ratio: 0.0, }
1062 }
1063}
1064
1065impl ThreadPool {
1066 fn new(config: &AdvancedParallelConfig) -> Self {
1067 let num_workers = config.hardware.cpu_cores;
1068 let work_queue = Arc::new(Mutex::new(VecDeque::new()));
1069 let shutdown = Arc::new(AtomicBool::new(false));
1070 let active_workers = Arc::new(AtomicUsize::new(0));
1071
1072 let workers = (0..num_workers)
1073 .map(|id| Worker::new(id, work_queue.clone(), shutdown.clone()))
1074 .collect();
1075
1076 Self {
1077 workers,
1078 work_queue,
1079 shutdown,
1080 active_workers,
1081 }
1082 }
1083}
1084
1085impl Worker {
1086 fn new(
1087 _id: usize,
1088 _work_queue: Arc<Mutex<VecDeque<Task>>>,
1089 _shutdown: Arc<AtomicBool>,
1090 ) -> Self {
1091 Self {
1092 id: _id,
1093 thread: None, local_queue: VecDeque::new(),
1095 numa_node: None,
1096 }
1097 }
1098}
1099
1100impl GpuContext {
1101 fn new(config: &GpuConfig) -> Result<Self, String> {
1102 #[cfg(target_pointer_width = "32")]
1104 let default_gpu_memory = 256 * 1024 * 1024; #[cfg(target_pointer_width = "64")]
1106 let default_gpu_memory = 1024 * 1024 * 1024; Ok(Self {
1109 device_id: config.preferred_device.unwrap_or(0),
1110 available_memory: config.gpu_memory_limit.unwrap_or(default_gpu_memory),
1111 stream_handles: Vec::new(),
1112 unified_memory_enabled: config.enable_unified_memory,
1113 })
1114 }
1115}