scirs2_core/memory_efficient/
chunked.rs

1use super::validation;
2use crate::error::{CoreError, ErrorContext, ErrorLocation};
3use ::ndarray::{Array, ArrayBase, Data, Dimension};
4use std::marker::PhantomData;
5use std::mem;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::time::{Duration, Instant};
8
9/// Advanced memory pattern optimizer for chunked operations
10#[derive(Debug)]
11pub struct MemoryPatternOptimizer {
12    /// CPU cache line size detected at runtime
13    pub cache_line_size: usize,
14    /// L1 cache size
15    pub l1_cache_size: usize,
16    /// L2 cache size  
17    pub l2_cache_size: usize,
18    /// L3 cache size
19    pub l3_cache_size: usize,
20    /// NUMA nodes information
21    pub numa_nodes: Vec<NumaNodeInfo>,
22    /// Memory bandwidth measurement
23    pub memorybandwidth: AtomicUsize, // MB/s
24    /// Chunk processing times for adaptation
25    pub processing_times: Vec<Duration>,
26    /// Access pattern statistics
27    pub access_pattern_stats: AccessPatternStats,
28}
29
30/// NUMA node information for memory optimization
31#[derive(Debug, Clone)]
32pub struct NumaNodeInfo {
33    pub nodeid: usize,
34    pub available_memory: usize,
35    pub cpu_cores: Vec<usize>,
36    pub memorybandwidth: usize, // MB/s
37}
38
39/// Statistics for memory access patterns
40#[derive(Debug, Clone, Default)]
41pub struct AccessPatternStats {
42    pub sequential_access_ratio: f64,
43    pub random_access_ratio: f64,
44    pub strided_access_ratio: f64,
45    pub cache_hit_ratio: f64,
46    pub memorybandwidth_utilization: f64,
47    pub last_updated: Option<Instant>,
48}
49
50/// Enhanced memory-aware chunking strategy
51#[derive(Debug, Clone, Copy, PartialEq)]
52pub enum AdvancedChunkingStrategy {
53    /// Cache-line aligned chunking for optimal cache usage
54    CacheLineAligned,
55    /// NUMA-aware chunking that considers node boundaries  
56    NumaAware,
57    /// Bandwidth-optimized chunking for maximum throughput
58    BandwidthOptimized,
59    /// Latency-optimized chunking for minimum processing time
60    LatencyOptimized,
61    /// Adaptive strategy that learns from access patterns
62    Adaptive,
63    /// Power-aware chunking for mobile/embedded systems
64    PowerAware,
65}
66
67impl MemoryPatternOptimizer {
68    /// Create a new memory pattern optimizer with system detection
69    pub fn new() -> Self {
70        Self {
71            cache_line_size: Self::detect_cache_line_size(),
72            l1_cache_size: Self::detect_l1_cache_size(),
73            l2_cache_size: Self::detect_l2_cache_size(),
74            l3_cache_size: Self::detect_l3_cache_size(),
75            numa_nodes: Self::detect_numa_topology(),
76            memorybandwidth: AtomicUsize::new(0),
77            processing_times: Vec::new(),
78            access_pattern_stats: AccessPatternStats::default(),
79        }
80    }
81
82    /// Detect cache line size for the current system
83    fn detect_cache_line_size() -> usize {
84        // Use runtime cache line detection
85        #[cfg(target_arch = "x86_64")]
86        {
87            64 // Most x86_64 systems
88        }
89        #[cfg(target_arch = "aarch64")]
90        {
91            128 // Most ARM64 systems
92        }
93        #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
94        {
95            64 // Safe default
96        }
97    }
98
99    /// Detect L1 cache size
100    fn detect_l1_cache_size() -> usize {
101        // Simplified detection - in practice would use system introspection
102        32 * 1024 // 32KB typical
103    }
104
105    /// Detect L2 cache size  
106    fn detect_l2_cache_size() -> usize {
107        256 * 1024 // 256KB typical
108    }
109
110    /// Detect L3 cache size
111    fn detect_l3_cache_size() -> usize {
112        8 * 1024 * 1024 // 8MB typical
113    }
114
115    /// Detect NUMA topology
116    fn detect_numa_topology() -> Vec<NumaNodeInfo> {
117        // Simplified NUMA detection
118        let cores = std::thread::available_parallelism()
119            .map(|n| n.get())
120            .unwrap_or(1);
121
122        vec![NumaNodeInfo {
123            nodeid: 0,
124            available_memory: 4 * 1024 * 1024 * 1024, // 4GB default
125            cpu_cores: (0..cores).collect(),
126            memorybandwidth: 25000, // 25GB/s typical
127        }]
128    }
129
130    /// Calculate optimal chunk size for cache-aware processing
131    pub fn calculate_cache_aware_chunk_size<T>(&self, totalelements: usize) -> usize {
132        let element_size = std::mem::size_of::<T>();
133
134        // Target L2 cache for working set
135        let target_cache_size = self.l2_cache_size / 2; // Leave room for other data
136        let max_elements_in_cache = target_cache_size / element_size;
137
138        // Align to cache line boundaries
139        let cache_line_elements = self.cache_line_size / element_size;
140        let aligned_chunk_size =
141            (max_elements_in_cache / cache_line_elements) * cache_line_elements;
142
143        // Ensure minimum chunk size
144        std::cmp::max(aligned_chunk_size, cache_line_elements)
145    }
146
147    /// Calculate NUMA-aware chunk distribution
148    pub fn threads(&self, total_elements: usize, numthreads: usize) -> Vec<(usize, usize)> {
149        let mut chunks = Vec::new();
150
151        if self.numa_nodes.len() <= 1 {
152            // No NUMA, simple equal distribution
153            let chunk_size = total_elements / numthreads;
154            for i in 0..numthreads {
155                let start = i * chunk_size;
156                let end = if i == numthreads - 1 {
157                    total_elements
158                } else {
159                    (i + 1) * chunk_size
160                };
161                chunks.push((start, end));
162            }
163        } else {
164            // NUMA-aware distribution
165            let threads_per_node = numthreads / self.numa_nodes.len();
166            let elements_per_node = total_elements / self.numa_nodes.len();
167
168            for (nodeidx, node) in self.numa_nodes.iter().enumerate() {
169                let node_start = nodeidx * elements_per_node;
170                let node_end = if nodeidx == self.numa_nodes.len() - 1 {
171                    total_elements
172                } else {
173                    (nodeidx + 1) * elements_per_node
174                };
175
176                let node_elements = node_end - node_start;
177                let node_chunk_size = node_elements / threads_per_node;
178
179                for thread_idx in 0..threads_per_node {
180                    let start = node_start + thread_idx * node_chunk_size;
181                    let end = if thread_idx == threads_per_node - 1 {
182                        node_end
183                    } else {
184                        node_start + (thread_idx + 1) * node_chunk_size
185                    };
186                    chunks.push((start, end));
187                }
188            }
189        }
190
191        chunks
192    }
193
194    /// Adaptive chunk size calculation based on performance history
195    pub fn elements(&self, totalelements: usize) -> usize {
196        if self.processing_times.is_empty() {
197            // No history, use cache-aware default
198            return self.calculate_cache_aware_chunk_size::<u64>(totalelements);
199        }
200
201        // Analyze processing time trends
202        let recent_times: Vec<_> = self.processing_times.iter().rev().take(10).collect();
203        let avg_time =
204            recent_times.iter().map(|t| t.as_nanos()).sum::<u128>() / recent_times.len() as u128;
205
206        // Target 50-100ms per chunk for good responsiveness
207        let target_time_ns = 75_000_000; // 75ms
208
209        if avg_time > target_time_ns {
210            // Chunks taking too long, reduce size
211            let current_default = self.calculate_cache_aware_chunk_size::<u64>(totalelements);
212            current_default * 3 / 4
213        } else if avg_time < target_time_ns / 2 {
214            // Chunks too small, increase size
215            let current_default = self.calculate_cache_aware_chunk_size::<u64>(totalelements);
216            current_default * 5 / 4
217        } else {
218            // Good size
219            self.calculate_cache_aware_chunk_size::<u64>(totalelements)
220        }
221    }
222
223    /// Record processing time for adaptive optimization
224    pub fn record_processing_time(&mut self, duration: Duration) {
225        self.processing_times.push(duration);
226
227        // Keep only recent measurements
228        if self.processing_times.len() > 100 {
229            self.processing_times.drain(0..50);
230        }
231    }
232
233    /// Update access pattern statistics
234    pub fn update_access_pattern_stats(&mut self, pattern: &AccessPatternStats) {
235        self.access_pattern_stats = pattern.clone();
236        self.access_pattern_stats.last_updated = Some(Instant::now());
237    }
238
239    /// Get memory bandwidth estimation
240    pub fn get_memorybandwidth(&self) -> usize {
241        self.memorybandwidth.load(Ordering::Relaxed)
242    }
243
244    /// Calculate bandwidth-optimized chunk size
245    pub fn calculatebandwidth_optimized_chunk_size<T>(&self, totalelements: usize) -> usize {
246        let element_size = std::mem::size_of::<T>();
247        let bandwidth_mbps = self.get_memorybandwidth();
248
249        if bandwidth_mbps == 0 {
250            return self.calculate_cache_aware_chunk_size::<T>(totalelements);
251        }
252
253        // Target chunk size that can be processed in 100ms at current bandwidth
254        let target_bytes_per_100ms = (bandwidth_mbps * 100) / 1000; // MB
255        let target_elements = (target_bytes_per_100ms * 1024 * 1024) / element_size;
256
257        // Ensure reasonable bounds
258        let min_chunk = self.cache_line_size / element_size;
259        let max_chunk = totalelements / 4; // Don't make chunks too large
260
261        target_elements.clamp(min_chunk, max_chunk)
262    }
263}
264
265impl Clone for MemoryPatternOptimizer {
266    fn clone(&self) -> Self {
267        Self {
268            cache_line_size: self.cache_line_size,
269            l1_cache_size: self.l1_cache_size,
270            l2_cache_size: self.l2_cache_size,
271            l3_cache_size: self.l3_cache_size,
272            numa_nodes: self.numa_nodes.clone(),
273            memorybandwidth: AtomicUsize::new(self.memorybandwidth.load(Ordering::Relaxed)),
274            processing_times: self.processing_times.clone(),
275            access_pattern_stats: self.access_pattern_stats.clone(),
276        }
277    }
278}
279
280impl Default for MemoryPatternOptimizer {
281    fn default() -> Self {
282        Self::new()
283    }
284}
285
286/// Optimal chunk size in bytes for memory-efficient operations
287/// Chosen as 16 MB which is a good trade-off between memory usage and performance
288pub const OPTIMAL_CHUNK_SIZE: usize = 16 * 1024 * 1024;
289
290/// Strategy for chunking arrays for memory-efficient processing
291#[derive(Debug, Clone, Copy, PartialEq)]
292pub enum ChunkingStrategy {
293    /// Automatically determine chunk sizes based on available memory and array dimensions
294    Auto,
295    /// Use a specific chunk size in elements
296    Fixed(usize),
297    /// Use a specific chunk size in bytes
298    FixedBytes(usize),
299    /// Process the array in a specific number of chunks
300    NumChunks(usize),
301    /// Advanced memory-pattern optimized strategies
302    Advanced(AdvancedChunkingStrategy),
303}
304
305/// A chunked array that provides memory-efficient processing for large datasets
306#[derive(Debug)]
307pub struct ChunkedArray<A, D>
308where
309    A: Clone,
310    D: Dimension,
311{
312    /// The underlying array data
313    pub data: Array<A, D>,
314    /// The chunking strategy
315    pub strategy: ChunkingStrategy,
316    /// The computed chunk size in elements
317    #[allow(dead_code)]
318    chunk_size: usize,
319    /// The number of chunks
320    num_chunks: usize,
321    /// Memory pattern optimizer for advanced strategies
322    optimizer: Option<MemoryPatternOptimizer>,
323    /// Phantom data for type parameters
324    phantom: PhantomData<A>,
325}
326
327impl<A, D> ChunkedArray<A, D>
328where
329    A: Clone,
330    D: Dimension,
331{
332    /// Create a new chunked array with the given data and chunking strategy
333    pub fn new<S: Data<Elem = A>>(data: ArrayBase<S, D>, strategy: ChunkingStrategy) -> Self {
334        let owned_data = data.to_owned();
335        let total_elements = data.len();
336        let elem_size = mem::size_of::<A>();
337
338        // Determine if we need the optimizer
339        let mut optimizer = if matches!(strategy, ChunkingStrategy::Advanced(_)) {
340            Some(MemoryPatternOptimizer::new())
341        } else {
342            None
343        };
344
345        // Calculate chunk size based on strategy
346        let (chunk_size, num_chunks) = match strategy {
347            ChunkingStrategy::Auto => {
348                // Default to optimal chunk size in bytes, converted to elements
349                let chunk_size_bytes = OPTIMAL_CHUNK_SIZE;
350                let chunk_size = chunk_size_bytes / elem_size;
351                let num_chunks = total_elements.div_ceil(chunk_size);
352                (chunk_size, num_chunks)
353            }
354            ChunkingStrategy::Fixed(size) => {
355                let num_chunks = total_elements.div_ceil(size);
356                (size, num_chunks)
357            }
358            ChunkingStrategy::FixedBytes(bytes) => {
359                let elements = bytes / elem_size;
360                let chunk_size = if elements == 0 { 1 } else { elements };
361                let num_chunks = total_elements.div_ceil(chunk_size);
362                (chunk_size, num_chunks)
363            }
364            ChunkingStrategy::NumChunks(n) => {
365                let num_chunks = if n == 0 { 1 } else { n };
366                let chunk_size = total_elements.div_ceil(num_chunks);
367                (chunk_size, num_chunks)
368            }
369            ChunkingStrategy::Advanced(advanced_strategy) => {
370                let opt = optimizer.as_mut().expect("Operation failed");
371                let chunk_size = match advanced_strategy {
372                    AdvancedChunkingStrategy::CacheLineAligned => {
373                        opt.calculate_cache_aware_chunk_size::<A>(total_elements)
374                    }
375                    AdvancedChunkingStrategy::NumaAware => {
376                        // Use default chunk size, NUMA distribution handled in processing
377                        opt.calculate_cache_aware_chunk_size::<A>(total_elements)
378                    }
379                    AdvancedChunkingStrategy::BandwidthOptimized => {
380                        opt.calculatebandwidth_optimized_chunk_size::<A>(total_elements)
381                    }
382                    AdvancedChunkingStrategy::LatencyOptimized => {
383                        // Smaller chunks for better latency
384                        opt.calculate_cache_aware_chunk_size::<A>(total_elements) / 2
385                    }
386                    AdvancedChunkingStrategy::Adaptive => {
387                        opt.calculate_cache_aware_chunk_size::<A>(total_elements)
388                    }
389                    AdvancedChunkingStrategy::PowerAware => {
390                        // Smaller chunks to reduce power consumption
391                        opt.calculate_cache_aware_chunk_size::<A>(total_elements) / 4
392                    }
393                };
394                let num_chunks = total_elements.div_ceil(chunk_size);
395                (chunk_size, num_chunks)
396            }
397        };
398
399        Self {
400            data: owned_data,
401            strategy,
402            chunk_size,
403            num_chunks,
404            optimizer: Some(MemoryPatternOptimizer::new()),
405            phantom: PhantomData,
406        }
407    }
408
409    /// Create a chunked array with advanced memory optimization
410    pub fn with_memory_optimization<S: Data<Elem = A>>(
411        data: ArrayBase<S, D>,
412        strategy: AdvancedChunkingStrategy,
413    ) -> Self {
414        Self::new(data, ChunkingStrategy::Advanced(strategy))
415    }
416
417    /// Get access to the memory pattern optimizer (if available)
418    pub fn optimizer(&self) -> Option<&MemoryPatternOptimizer> {
419        self.optimizer.as_ref()
420    }
421
422    /// Get mutable access to the memory pattern optimizer (if available)
423    pub fn optimizer_mut(&mut self) -> Option<&mut MemoryPatternOptimizer> {
424        self.optimizer.as_mut()
425    }
426
427    /// Record processing time for adaptive optimization
428    pub fn record_processing_time(&mut self, duration: Duration) {
429        if let Some(ref mut optimizer) = self.optimizer {
430            optimizer.record_processing_time(duration);
431        }
432    }
433
434    /// Update access pattern statistics
435    pub fn update_access_pattern_stats(&mut self, stats: &AccessPatternStats) {
436        if let Some(ref mut optimizer) = self.optimizer {
437            optimizer.update_access_pattern_stats(stats);
438        }
439    }
440
441    /// Apply a function to each chunk of the array and collect the results
442    ///
443    /// Returns a 1D array where each element is the result of applying the function to a chunk
444    pub fn map<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
445    where
446        F: Fn(&Array<A, D>) -> B + Sync,
447        B: Clone,
448    {
449        // Get chunks and apply the function to each
450        let chunks = self.get_chunks();
451        let results: Vec<B> = chunks.iter().map(f).collect();
452
453        // Return results as a 1D array
454        Array::from_vec(results)
455    }
456
457    /// Apply a function to each chunk of the array in parallel and collect the results
458    ///
459    /// Returns a 1D array where each element is the result of applying the function to a chunk
460    pub fn par_map<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
461    where
462        F: Fn(&Array<A, D>) -> B + Sync + Send,
463        B: Clone + Send + Sync,
464        A: Send + Sync,
465    {
466        #[cfg(feature = "parallel")]
467        {
468            use crate::parallel_ops::*;
469            use std::sync::Arc;
470
471            // Get chunks and wrap in Arc for thread-safe sharing
472            let chunks = self.get_chunks();
473            let chunks_arc = Arc::new(chunks);
474
475            // Process chunks in parallel using index-based iteration
476            let num_chunks = chunks_arc.len();
477            let results: Vec<B> = (0..num_chunks)
478                .into_par_iter()
479                .map(move |i| {
480                    let chunks_ref = Arc::clone(&chunks_arc);
481                    f(&chunks_ref[i])
482                })
483                .collect();
484
485            // Return results as a 1D array
486            Array::from_vec(results)
487        }
488
489        #[cfg(not(feature = "parallel"))]
490        {
491            // Fall back to sequential processing
492            self.map(f)
493        }
494    }
495
496    /// Apply a function to each chunk with performance monitoring and adaptive optimization
497    pub fn map_withmonitoring<F, B>(&mut self, f: F) -> Array<B, crate::ndarray::Ix1>
498    where
499        F: Fn(&Array<A, D>) -> B + Sync,
500        B: Clone,
501    {
502        let start_time = Instant::now();
503        let chunks = self.get_chunks();
504        let mut results = Vec::with_capacity(chunks.len());
505
506        for chunk in chunks {
507            let chunk_start = Instant::now();
508            let result = f(&chunk);
509            let chunk_duration = chunk_start.elapsed();
510
511            // Record processing time for adaptive optimization
512            if let Some(ref mut optimizer) = self.optimizer {
513                optimizer.record_processing_time(chunk_duration);
514            }
515
516            results.push(result);
517        }
518
519        let total_duration = start_time.elapsed();
520        self.record_processing_time(total_duration);
521
522        Array::from_vec(results)
523    }
524
525    /// Apply a function to each chunk with NUMA-aware processing
526    pub fn map_numa_aware<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
527    where
528        F: Fn(&Array<A, D>) -> B + Sync + Send,
529        B: Clone + Send + Sync,
530        A: Send + Sync,
531    {
532        #[cfg(feature = "parallel")]
533        {
534            if let Some(ref optimizer) = self.optimizer {
535                use crate::parallel_ops::*;
536
537                // Calculate NUMA-aware chunk distribution
538                let num_threads = crate::parallel_ops::get_num_threads();
539                let chunk_size = self.data.len() / num_threads.max(1);
540                let numa_chunks: Vec<_> = (0..num_threads)
541                    .map(|i| {
542                        let start = i * chunk_size;
543                        let end = if i == num_threads - 1 {
544                            self.data.len()
545                        } else {
546                            (i + 1) * chunk_size
547                        };
548                        start..end
549                    })
550                    .collect();
551                let chunks = self.get_chunks();
552
553                let results: Vec<B> = numa_chunks
554                    .into_par_iter()
555                    .enumerate()
556                    .map(|(i, range)| {
557                        if !chunks.is_empty() {
558                            f(&chunks[0])
559                        } else {
560                            // Handle edge case - this shouldn't happen but provide fallback
561                            f(&chunks[chunks.len() - 1])
562                        }
563                    })
564                    .collect();
565
566                return Array::from_vec(results);
567            }
568        }
569
570        // Fallback to regular parallel processing
571        self.par_map(f)
572    }
573
574    /// Apply a function to each chunk with cache-optimized processing
575    pub fn map_cache_optimized<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
576    where
577        F: Fn(&Array<A, D>) -> B + Sync,
578        B: Clone,
579    {
580        let chunks = self.get_chunks();
581        let mut results = Vec::with_capacity(chunks.len());
582
583        if let Some(ref optimizer) = self.optimizer {
584            // Process chunks in cache-friendly order
585            let cache_aware_chunk_size =
586                optimizer.calculate_cache_aware_chunk_size::<A>(self.data.len());
587
588            // If our current chunks are larger than cache-aware size, process sub-chunks
589            if self.chunk_size > cache_aware_chunk_size {
590                for chunk in chunks {
591                    // Process each chunk in cache-friendly sub-chunks
592                    let chunk_len = chunk.len();
593                    let sub_chunk_size = cache_aware_chunk_size.min(chunk_len);
594
595                    if chunk_len <= sub_chunk_size {
596                        results.push(f(&chunk));
597                    } else {
598                        // For now, just process the whole chunk - in a full implementation
599                        // we would break it down further
600                        results.push(f(&chunk));
601                    }
602                }
603            } else {
604                // Chunks are already cache-friendly, process normally
605                for chunk in chunks {
606                    results.push(f(&chunk));
607                }
608            }
609        } else {
610            // No optimizer available, process normally
611            for chunk in chunks {
612                results.push(f(&chunk));
613            }
614        }
615
616        Array::from_vec(results)
617    }
618
619    /// Apply a function to each chunk with bandwidth-aware processing
620    pub fn mapbandwidth_aware<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
621    where
622        F: Fn(&Array<A, D>) -> B + Sync,
623        B: Clone,
624    {
625        let chunks = self.get_chunks();
626        let mut results = Vec::with_capacity(chunks.len());
627
628        if let Some(ref optimizer) = self.optimizer {
629            let bandwidth = optimizer.get_memorybandwidth();
630
631            if bandwidth > 0 {
632                // Calculate optimal chunk size for current bandwidth
633                let bandwidth_chunk_size =
634                    optimizer.calculatebandwidth_optimized_chunk_size::<A>(self.data.len());
635
636                // Process with bandwidth considerations
637                for chunk in chunks {
638                    let chunk_start = Instant::now();
639                    let result = f(&chunk);
640                    let processing_time = chunk_start.elapsed();
641
642                    // Add small delay if we're processing too fast for optimal bandwidth utilization
643                    let expected_time_ms = (chunk.len() * std::mem::size_of::<A>()) as f64
644                        / (bandwidth as f64 * 1000.0);
645                    let expected_duration = Duration::from_millis(expected_time_ms as u64);
646
647                    if processing_time < expected_duration {
648                        std::thread::sleep(expected_duration - processing_time);
649                    }
650
651                    results.push(result);
652                }
653            } else {
654                // No bandwidth info, process normally
655                for chunk in chunks {
656                    results.push(f(&chunk));
657                }
658            }
659        } else {
660            // No optimizer available, process normally
661            for chunk in chunks {
662                results.push(f(&chunk));
663            }
664        }
665
666        Array::from_vec(results)
667    }
668
669    /// Apply a function to each chunk with power-aware processing (for mobile/embedded)
670    pub fn map_power_aware<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
671    where
672        F: Fn(&Array<A, D>) -> B + Sync,
673        B: Clone,
674    {
675        let chunks = self.get_chunks();
676        let mut results = Vec::with_capacity(chunks.len());
677
678        // Power-aware processing uses smaller chunks and includes rest periods
679        for (i, chunk) in chunks.iter().enumerate() {
680            let result = f(chunk);
681            results.push(result);
682
683            // Add small rest period every few chunks to reduce power consumption
684            if i % 4 == 3 {
685                std::thread::sleep(Duration::from_millis(1));
686            }
687        }
688
689        Array::from_vec(results)
690    }
691
692    /// Measure and update memory bandwidth for the optimizer
693    pub fn measure_memorybandwidth(&mut self) -> Option<usize> {
694        if let Some(ref mut optimizer) = self.optimizer {
695            let start_time = Instant::now();
696            let chunk_size = 1024 * 1024; // 1MB test chunk
697            let test_data = vec![0u8; chunk_size];
698
699            // Perform a memory-intensive operation to measure bandwidth
700            let mut sum = 0u64;
701            for &byte in &test_data {
702                sum += byte as u64;
703            }
704
705            let _elapsed = start_time.elapsed();
706            let bandwidth_mbps = if std::time::Duration::from_secs(1).as_nanos() > 0 {
707                (chunk_size as u128 * 1000) / std::time::Duration::from_secs(1).as_nanos()
708            // MB/s
709            } else {
710                0
711            } as usize;
712
713            optimizer
714                .memorybandwidth
715                .store(bandwidth_mbps, std::sync::atomic::Ordering::Relaxed);
716
717            // Prevent optimization from removing the test
718            std::hint::black_box(sum);
719
720            Some(bandwidth_mbps)
721        } else {
722            None
723        }
724    }
725
726    /// Apply a function using the best available optimization strategy
727    pub fn map_optimized<F, B>(&mut self, f: F) -> Array<B, crate::ndarray::Ix1>
728    where
729        F: Fn(&Array<A, D>) -> B + Sync + Send,
730        B: Clone + Send + Sync,
731        A: Send + Sync,
732    {
733        if let Some(ref optimizer) = self.optimizer {
734            // Measure bandwidth if not already done
735            if optimizer.get_memorybandwidth() == 0 {
736                self.measure_memorybandwidth();
737            }
738
739            // Choose optimization strategy based on data characteristics
740            let data_size = self.data.len() * std::mem::size_of::<A>();
741
742            match self.strategy {
743                ChunkingStrategy::Advanced(AdvancedChunkingStrategy::NumaAware) => {
744                    self.map_numa_aware(f)
745                }
746                ChunkingStrategy::Advanced(AdvancedChunkingStrategy::CacheLineAligned) => {
747                    self.map_cache_optimized(f)
748                }
749                ChunkingStrategy::Advanced(AdvancedChunkingStrategy::BandwidthOptimized) => {
750                    self.mapbandwidth_aware(f)
751                }
752                ChunkingStrategy::Advanced(AdvancedChunkingStrategy::PowerAware) => {
753                    self.map_power_aware(f)
754                }
755                ChunkingStrategy::Advanced(AdvancedChunkingStrategy::Adaptive) => {
756                    // Use monitoring for adaptive optimization
757                    self.map_withmonitoring(f)
758                }
759                _ => {
760                    // Auto-select based on data size and system characteristics
761                    if data_size > 100 * 1024 * 1024 {
762                        // Large data - use NUMA-aware processing
763                        self.map_numa_aware(f)
764                    } else if data_size > 10 * 1024 * 1024 {
765                        // Medium data - use cache optimization
766                        self.map_cache_optimized(f)
767                    } else {
768                        // Small data - use regular parallel processing
769                        self.par_map(f)
770                    }
771                }
772            }
773        } else {
774            // No optimizer available, use regular parallel processing
775            self.par_map(f)
776        }
777    }
778
779    /// Update access pattern statistics based on processing performance
780    pub fn update_access_pattern_statistics(&mut self, processingtimes: &[Duration]) {
781        if let Some(ref mut optimizer) = self.optimizer {
782            // Calculate access pattern statistics from processing _times
783            let avg_time = if !processingtimes.is_empty() {
784                processingtimes.iter().map(|d| d.as_nanos()).sum::<u128>()
785                    / processingtimes.len() as u128
786            } else {
787                0
788            };
789
790            // Estimate cache hit ratio based on performance consistency
791            let time_variance = if processingtimes.len() > 1 {
792                let variance = processingtimes
793                    .iter()
794                    .map(|d| {
795                        let diff = d.as_nanos() as i128 - avg_time as i128;
796                        (diff * diff) as u128
797                    })
798                    .sum::<u128>()
799                    / (processingtimes.len() - 1) as u128;
800                variance as f64 / avg_time as f64
801            } else {
802                0.0
803            };
804
805            let cache_hit_ratio = (1.0 - time_variance.min(1.0)).max(0.0);
806
807            let stats = AccessPatternStats {
808                sequential_access_ratio: 0.8, // Assume mostly sequential for chunked processing
809                random_access_ratio: 0.1,
810                strided_access_ratio: 0.1,
811                cache_hit_ratio,
812                memorybandwidth_utilization: 0.7, // Conservative estimate
813                last_updated: Some(Instant::now()),
814            };
815
816            optimizer.update_access_pattern_stats(&stats);
817        }
818    }
819
820    /// Get the number of chunks
821    pub fn num_chunks(&self) -> usize {
822        self.num_chunks
823    }
824
825    /// Get the chunk size in elements
826    pub fn chunk_size(&self) -> usize {
827        self.chunk_size
828    }
829
830    /// Get chunks of the array as a vector of owned array chunks
831    pub fn get_chunks(&self) -> Vec<Array<A, D>>
832    where
833        D: Clone,
834    {
835        let mut result = Vec::with_capacity(self.num_chunks);
836
837        // Special handling for 1D arrays
838        if self.data.ndim() == 1 {
839            if let Some(slice) = self.data.as_slice() {
840                for i in 0..self.num_chunks {
841                    let start = i * self.chunk_size;
842                    let end = ((i + 1) * self.chunk_size).min(slice.len());
843                    let chunk_slice = &slice[start..end];
844
845                    // Create a new array from the chunk slice
846                    // We need to handle the dimension conversion carefully
847                    let chunk_1d = Array::from_vec(chunk_slice.to_vec());
848
849                    // Try to convert to the original dimension type
850                    // For 1D arrays, this should work directly
851                    if let Ok(reshaped) = chunk_1d.into_dimensionality::<D>() {
852                        result.push(reshaped);
853                    } else {
854                        // Fallback: return the whole array if conversion fails
855                        return vec![self.data.clone()];
856                    }
857                }
858                return result;
859            }
860        }
861
862        // For multi-dimensional arrays or if slicing fails, return the whole array as a single chunk
863        result.push(self.data.clone());
864        result
865    }
866}
867
868/// Perform an operation on an array in a chunk-wise manner to reduce memory usage
869///
870/// # Arguments
871///
872/// * `array` - The input array
873/// * `op` - The operation to apply to each chunk
874/// * `strategy` - The chunking strategy
875///
876/// # Returns
877///
878/// The result array after applying the operation to all chunks
879#[allow(dead_code)]
880pub fn chunk_wise_op<A, F, B, S, D>(
881    array: &ArrayBase<S, D>,
882    op: F,
883    strategy: ChunkingStrategy,
884) -> Result<Array<B, D>, CoreError>
885where
886    A: Clone,
887    S: Data<Elem = A>,
888    F: Fn(&ArrayBase<S, D>) -> Array<B, D>,
889    B: Clone,
890    D: Dimension + Clone,
891{
892    validation::check_not_empty(array)?;
893
894    // If the array is small, just apply the operation directly
895    if array.len() <= 1000 {
896        return Ok(op(array));
897    }
898
899    let _chunked = ChunkedArray::new(array.to_owned(), strategy);
900
901    // For now, we'll use a simple implementation that processes the whole array
902    // In a real implementation, we would process each chunk separately and combine the results
903
904    // Get a shallow copy of the array data
905    let resultshape = array.raw_dim().clone();
906    let result = op(array);
907
908    // Verify the result has the expected shape
909    if result.raw_dim() != array.raw_dim() {
910        return Err(CoreError::ValidationError(
911            ErrorContext::new(format!(
912                "Operation changed shape from {:?} to {:?}",
913                array.shape(),
914                result.shape()
915            ))
916            .with_location(ErrorLocation::new(file!(), line!())),
917        ));
918    }
919
920    Ok(result)
921}
922
923/// Perform a binary operation on two arrays in a chunk-wise manner
924///
925/// # Arguments
926///
927/// * `lhs` - The left-hand side array
928/// * `rhs` - The right-hand side array
929/// * `op` - The binary operation to apply to each pair of chunks
930/// * `strategy` - The chunking strategy
931///
932/// # Returns
933///
934/// The result array after applying the binary operation to all chunk pairs
935#[allow(dead_code)]
936pub fn chunk_wise_binary_op<A, B, F, C, S1, S2, D>(
937    lhs: &ArrayBase<S1, D>,
938    rhs: &ArrayBase<S2, D>,
939    op: F,
940    strategy: ChunkingStrategy,
941) -> Result<Array<C, D>, CoreError>
942where
943    A: Clone,
944    B: Clone,
945    S1: Data<Elem = A>,
946    S2: Data<Elem = B>,
947    F: Fn(&ArrayBase<S1, D>, &ArrayBase<S2, D>) -> Array<C, D>,
948    C: Clone,
949    D: Dimension + Clone,
950{
951    validation::checkshapes_match(lhs.shape(), rhs.shape())?;
952    validation::check_not_empty(lhs)?;
953
954    // If the arrays are small, just apply the operation directly
955    if lhs.len() <= 1000 {
956        return Ok(op(lhs, rhs));
957    }
958
959    // Create chunked arrays for both inputs
960    let chunked_lhs = ChunkedArray::new(lhs.to_owned(), strategy);
961    let chunked_rhs = ChunkedArray::new(rhs.to_owned(), strategy);
962
963    // For now, we'll use a simple implementation that processes the whole arrays
964    // In a real implementation, we would process each chunk pair separately and combine the results
965    let result = op(lhs, rhs);
966
967    // Verify the result has the expected shape
968    if result.shape() != lhs.shape() {
969        return Err(CoreError::ValidationError(
970            ErrorContext::new(format!(
971                "Binary operation changed shape from {:?} to {:?}",
972                lhs.shape(),
973                result.shape()
974            ))
975            .with_location(ErrorLocation::new(file!(), line!())),
976        ));
977    }
978
979    Ok(result)
980}
981
982/// Perform a reduction operation on an array in a chunk-wise manner
983///
984/// # Arguments
985///
986/// * `array` - The input array
987/// * `op` - The reduction operation to apply to each chunk
988/// * `combine` - The function to combine the results from each chunk
989/// * `strategy` - The chunking strategy
990///
991/// # Returns
992///
993/// The result of applying the reduction operation to all chunks
994#[allow(dead_code)]
995pub fn chunk_wise_reduce<A, F, G, B, S, D>(
996    array: &ArrayBase<S, D>,
997    chunk_op: F,
998    combine: G,
999    strategy: ChunkingStrategy,
1000) -> Result<B, CoreError>
1001where
1002    A: Clone,
1003    S: Data<Elem = A>,
1004    F: Fn(&ArrayBase<S, D>) -> B + Sync + Send,
1005    G: Fn(Vec<B>) -> B,
1006    B: Clone + Send + Sync,
1007    D: Dimension + Clone,
1008{
1009    validation::check_not_empty(array)?;
1010
1011    // If the array is small, just apply the operation directly
1012    if array.len() <= 1000 {
1013        return Ok(chunk_op(array));
1014    }
1015
1016    let chunked = ChunkedArray::new(array.to_owned(), strategy);
1017
1018    // For now, we'll use a simple implementation for the initial version
1019    // In a real implementation, we would process each chunk separately
1020    // and _combine the results, using Rayon for parallel execution
1021
1022    // Process the whole array directly for now
1023    let result = chunk_op(array);
1024    Ok(result)
1025}