Skip to main content

scirs2_core/memory_efficient/
chunked.rs

1use super::validation;
2use crate::error::{CoreError, ErrorContext, ErrorLocation};
3use ::ndarray::{Array, ArrayBase, Data, Dimension};
4use std::marker::PhantomData;
5use std::mem;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::time::{Duration, Instant};
8
9/// Advanced memory pattern optimizer for chunked operations
10#[derive(Debug)]
11pub struct MemoryPatternOptimizer {
12    /// CPU cache line size detected at runtime
13    pub cache_line_size: usize,
14    /// L1 cache size
15    pub l1_cache_size: usize,
16    /// L2 cache size  
17    pub l2_cache_size: usize,
18    /// L3 cache size
19    pub l3_cache_size: usize,
20    /// NUMA nodes information
21    pub numa_nodes: Vec<NumaNodeInfo>,
22    /// Memory bandwidth measurement
23    pub memorybandwidth: AtomicUsize, // MB/s
24    /// Chunk processing times for adaptation
25    pub processing_times: Vec<Duration>,
26    /// Access pattern statistics
27    pub access_pattern_stats: AccessPatternStats,
28}
29
30/// NUMA node information for memory optimization
31#[derive(Debug, Clone)]
32pub struct NumaNodeInfo {
33    pub nodeid: usize,
34    pub available_memory: usize,
35    pub cpu_cores: Vec<usize>,
36    pub memorybandwidth: usize, // MB/s
37}
38
39/// Statistics for memory access patterns
40#[derive(Debug, Clone, Default)]
41pub struct AccessPatternStats {
42    pub sequential_access_ratio: f64,
43    pub random_access_ratio: f64,
44    pub strided_access_ratio: f64,
45    pub cache_hit_ratio: f64,
46    pub memorybandwidth_utilization: f64,
47    pub last_updated: Option<Instant>,
48}
49
50/// Enhanced memory-aware chunking strategy
51#[derive(Debug, Clone, Copy, PartialEq)]
52pub enum AdvancedChunkingStrategy {
53    /// Cache-line aligned chunking for optimal cache usage
54    CacheLineAligned,
55    /// NUMA-aware chunking that considers node boundaries
56    NumaAware,
57    /// Bandwidth-optimized chunking for maximum throughput
58    BandwidthOptimized,
59    /// Latency-optimized chunking for minimum processing time
60    LatencyOptimized,
61    /// Adaptive strategy that learns from access patterns
62    Adaptive,
63    /// Power-aware chunking for mobile/embedded systems
64    PowerAware,
65    /// Large data chunking (256MB-2GB chunks) for GB-scale datasets
66    LargeData,
67    /// Huge data streaming with minimal memory footprint
68    HugeDataStreaming {
69        /// Maximum memory usage in MB
70        max_memory_mb: usize,
71    },
72    /// Adaptive large chunking that auto-adjusts based on available memory
73    AdaptiveLarge,
74}
75
76impl MemoryPatternOptimizer {
77    /// Create a new memory pattern optimizer with system detection
78    pub fn new() -> Self {
79        Self {
80            cache_line_size: Self::detect_cache_line_size(),
81            l1_cache_size: Self::detect_l1_cache_size(),
82            l2_cache_size: Self::detect_l2_cache_size(),
83            l3_cache_size: Self::detect_l3_cache_size(),
84            numa_nodes: Self::detect_numa_topology(),
85            memorybandwidth: AtomicUsize::new(0),
86            processing_times: Vec::new(),
87            access_pattern_stats: AccessPatternStats::default(),
88        }
89    }
90
91    /// Detect cache line size for the current system
92    fn detect_cache_line_size() -> usize {
93        // Use runtime cache line detection
94        #[cfg(target_arch = "x86_64")]
95        {
96            64 // Most x86_64 systems
97        }
98        #[cfg(target_arch = "aarch64")]
99        {
100            128 // Most ARM64 systems
101        }
102        #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
103        {
104            64 // Safe default
105        }
106    }
107
108    /// Detect L1 cache size
109    fn detect_l1_cache_size() -> usize {
110        // Simplified detection - in practice would use system introspection
111        32 * 1024 // 32KB typical
112    }
113
114    /// Detect L2 cache size  
115    fn detect_l2_cache_size() -> usize {
116        256 * 1024 // 256KB typical
117    }
118
119    /// Detect L3 cache size
120    fn detect_l3_cache_size() -> usize {
121        8 * 1024 * 1024 // 8MB typical
122    }
123
124    /// Detect NUMA topology
125    fn detect_numa_topology() -> Vec<NumaNodeInfo> {
126        // Simplified NUMA detection
127        let cores = std::thread::available_parallelism()
128            .map(|n| n.get())
129            .unwrap_or(1);
130
131        vec![NumaNodeInfo {
132            nodeid: 0,
133            available_memory: 4 * 1024 * 1024 * 1024, // 4GB default
134            cpu_cores: (0..cores).collect(),
135            memorybandwidth: 25000, // 25GB/s typical
136        }]
137    }
138
139    /// Calculate optimal chunk size for cache-aware processing
140    pub fn calculate_cache_aware_chunk_size<T>(&self, totalelements: usize) -> usize {
141        let element_size = std::mem::size_of::<T>();
142
143        // Target L2 cache for working set
144        let target_cache_size = self.l2_cache_size / 2; // Leave room for other data
145        let max_elements_in_cache = target_cache_size / element_size;
146
147        // Align to cache line boundaries
148        let cache_line_elements = self.cache_line_size / element_size;
149        let aligned_chunk_size =
150            (max_elements_in_cache / cache_line_elements) * cache_line_elements;
151
152        // Ensure minimum chunk size
153        std::cmp::max(aligned_chunk_size, cache_line_elements)
154    }
155
156    /// Calculate NUMA-aware chunk distribution
157    pub fn threads(&self, total_elements: usize, numthreads: usize) -> Vec<(usize, usize)> {
158        let mut chunks = Vec::new();
159
160        if self.numa_nodes.len() <= 1 {
161            // No NUMA, simple equal distribution
162            let chunk_size = total_elements / numthreads;
163            for i in 0..numthreads {
164                let start = i * chunk_size;
165                let end = if i == numthreads - 1 {
166                    total_elements
167                } else {
168                    (i + 1) * chunk_size
169                };
170                chunks.push((start, end));
171            }
172        } else {
173            // NUMA-aware distribution
174            let threads_per_node = numthreads / self.numa_nodes.len();
175            let elements_per_node = total_elements / self.numa_nodes.len();
176
177            for (nodeidx, node) in self.numa_nodes.iter().enumerate() {
178                let node_start = nodeidx * elements_per_node;
179                let node_end = if nodeidx == self.numa_nodes.len() - 1 {
180                    total_elements
181                } else {
182                    (nodeidx + 1) * elements_per_node
183                };
184
185                let node_elements = node_end - node_start;
186                let node_chunk_size = node_elements / threads_per_node;
187
188                for thread_idx in 0..threads_per_node {
189                    let start = node_start + thread_idx * node_chunk_size;
190                    let end = if thread_idx == threads_per_node - 1 {
191                        node_end
192                    } else {
193                        node_start + (thread_idx + 1) * node_chunk_size
194                    };
195                    chunks.push((start, end));
196                }
197            }
198        }
199
200        chunks
201    }
202
203    /// Adaptive chunk size calculation based on performance history
204    pub fn elements(&self, totalelements: usize) -> usize {
205        if self.processing_times.is_empty() {
206            // No history, use cache-aware default
207            return self.calculate_cache_aware_chunk_size::<u64>(totalelements);
208        }
209
210        // Analyze processing time trends
211        let recent_times: Vec<_> = self.processing_times.iter().rev().take(10).collect();
212        let avg_time =
213            recent_times.iter().map(|t| t.as_nanos()).sum::<u128>() / recent_times.len() as u128;
214
215        // Target 50-100ms per chunk for good responsiveness
216        let target_time_ns = 75_000_000; // 75ms
217
218        if avg_time > target_time_ns {
219            // Chunks taking too long, reduce size
220            let current_default = self.calculate_cache_aware_chunk_size::<u64>(totalelements);
221            current_default * 3 / 4
222        } else if avg_time < target_time_ns / 2 {
223            // Chunks too small, increase size
224            let current_default = self.calculate_cache_aware_chunk_size::<u64>(totalelements);
225            current_default * 5 / 4
226        } else {
227            // Good size
228            self.calculate_cache_aware_chunk_size::<u64>(totalelements)
229        }
230    }
231
232    /// Record processing time for adaptive optimization
233    pub fn record_processing_time(&mut self, duration: Duration) {
234        self.processing_times.push(duration);
235
236        // Keep only recent measurements
237        if self.processing_times.len() > 100 {
238            self.processing_times.drain(0..50);
239        }
240    }
241
242    /// Update access pattern statistics
243    pub fn update_access_pattern_stats(&mut self, pattern: &AccessPatternStats) {
244        self.access_pattern_stats = pattern.clone();
245        self.access_pattern_stats.last_updated = Some(Instant::now());
246    }
247
248    /// Get memory bandwidth estimation
249    pub fn get_memorybandwidth(&self) -> usize {
250        self.memorybandwidth.load(Ordering::Relaxed)
251    }
252
253    /// Calculate bandwidth-optimized chunk size
254    pub fn calculatebandwidth_optimized_chunk_size<T>(&self, totalelements: usize) -> usize {
255        let element_size = std::mem::size_of::<T>();
256        let bandwidth_mbps = self.get_memorybandwidth();
257
258        if bandwidth_mbps == 0 {
259            return self.calculate_cache_aware_chunk_size::<T>(totalelements);
260        }
261
262        // Target chunk size that can be processed in 100ms at current bandwidth
263        let target_bytes_per_100ms = (bandwidth_mbps * 100) / 1000; // MB
264        let target_elements = (target_bytes_per_100ms * 1024 * 1024) / element_size;
265
266        // Ensure reasonable bounds
267        let min_chunk = self.cache_line_size / element_size;
268        let max_chunk = totalelements / 4; // Don't make chunks too large
269
270        target_elements.clamp(min_chunk, max_chunk)
271    }
272}
273
274impl Clone for MemoryPatternOptimizer {
275    fn clone(&self) -> Self {
276        Self {
277            cache_line_size: self.cache_line_size,
278            l1_cache_size: self.l1_cache_size,
279            l2_cache_size: self.l2_cache_size,
280            l3_cache_size: self.l3_cache_size,
281            numa_nodes: self.numa_nodes.clone(),
282            memorybandwidth: AtomicUsize::new(self.memorybandwidth.load(Ordering::Relaxed)),
283            processing_times: self.processing_times.clone(),
284            access_pattern_stats: self.access_pattern_stats.clone(),
285        }
286    }
287}
288
289impl Default for MemoryPatternOptimizer {
290    fn default() -> Self {
291        Self::new()
292    }
293}
294
295/// Optimal chunk size in bytes for memory-efficient operations
296/// Chosen as 16 MB which is a good trade-off between memory usage and performance
297pub const OPTIMAL_CHUNK_SIZE: usize = 16 * 1024 * 1024;
298
299/// Strategy for chunking arrays for memory-efficient processing
300#[derive(Debug, Clone, Copy, PartialEq)]
301pub enum ChunkingStrategy {
302    /// Automatically determine chunk sizes based on available memory and array dimensions
303    Auto,
304    /// Use a specific chunk size in elements
305    Fixed(usize),
306    /// Use a specific chunk size in bytes
307    FixedBytes(usize),
308    /// Process the array in a specific number of chunks
309    NumChunks(usize),
310    /// Advanced memory-pattern optimized strategies
311    Advanced(AdvancedChunkingStrategy),
312}
313
314/// A chunked array that provides memory-efficient processing for large datasets
315#[derive(Debug)]
316pub struct ChunkedArray<A, D>
317where
318    A: Clone,
319    D: Dimension,
320{
321    /// The underlying array data
322    pub data: Array<A, D>,
323    /// The chunking strategy
324    pub strategy: ChunkingStrategy,
325    /// The computed chunk size in elements
326    #[allow(dead_code)]
327    chunk_size: usize,
328    /// The number of chunks
329    num_chunks: usize,
330    /// Memory pattern optimizer for advanced strategies
331    optimizer: Option<MemoryPatternOptimizer>,
332    /// Phantom data for type parameters
333    phantom: PhantomData<A>,
334}
335
336impl<A, D> ChunkedArray<A, D>
337where
338    A: Clone,
339    D: Dimension,
340{
341    /// Create a new chunked array with the given data and chunking strategy
342    pub fn new<S: Data<Elem = A>>(data: ArrayBase<S, D>, strategy: ChunkingStrategy) -> Self {
343        let owned_data = data.to_owned();
344        let total_elements = data.len();
345        let elem_size = mem::size_of::<A>();
346
347        // Determine if we need the optimizer
348        let mut optimizer = if matches!(strategy, ChunkingStrategy::Advanced(_)) {
349            Some(MemoryPatternOptimizer::new())
350        } else {
351            None
352        };
353
354        // Calculate chunk size based on strategy
355        let (chunk_size, num_chunks) = match strategy {
356            ChunkingStrategy::Auto => {
357                // Default to optimal chunk size in bytes, converted to elements
358                let chunk_size_bytes = OPTIMAL_CHUNK_SIZE;
359                let chunk_size = chunk_size_bytes / elem_size;
360                let num_chunks = total_elements.div_ceil(chunk_size);
361                (chunk_size, num_chunks)
362            }
363            ChunkingStrategy::Fixed(size) => {
364                let num_chunks = total_elements.div_ceil(size);
365                (size, num_chunks)
366            }
367            ChunkingStrategy::FixedBytes(bytes) => {
368                let elements = bytes / elem_size;
369                let chunk_size = if elements == 0 { 1 } else { elements };
370                let num_chunks = total_elements.div_ceil(chunk_size);
371                (chunk_size, num_chunks)
372            }
373            ChunkingStrategy::NumChunks(n) => {
374                let num_chunks = if n == 0 { 1 } else { n };
375                let chunk_size = total_elements.div_ceil(num_chunks);
376                (chunk_size, num_chunks)
377            }
378            ChunkingStrategy::Advanced(advanced_strategy) => {
379                let opt = optimizer.as_mut().expect("Operation failed");
380                let chunk_size = match advanced_strategy {
381                    AdvancedChunkingStrategy::CacheLineAligned => {
382                        opt.calculate_cache_aware_chunk_size::<A>(total_elements)
383                    }
384                    AdvancedChunkingStrategy::NumaAware => {
385                        // Use default chunk size, NUMA distribution handled in processing
386                        opt.calculate_cache_aware_chunk_size::<A>(total_elements)
387                    }
388                    AdvancedChunkingStrategy::BandwidthOptimized => {
389                        opt.calculatebandwidth_optimized_chunk_size::<A>(total_elements)
390                    }
391                    AdvancedChunkingStrategy::LatencyOptimized => {
392                        // Smaller chunks for better latency
393                        opt.calculate_cache_aware_chunk_size::<A>(total_elements) / 2
394                    }
395                    AdvancedChunkingStrategy::Adaptive => {
396                        opt.calculate_cache_aware_chunk_size::<A>(total_elements)
397                    }
398                    AdvancedChunkingStrategy::PowerAware => {
399                        // Smaller chunks to reduce power consumption
400                        opt.calculate_cache_aware_chunk_size::<A>(total_elements) / 4
401                    }
402                    AdvancedChunkingStrategy::LargeData => {
403                        // 256MB-2GB chunks for GB-scale datasets
404                        // Use 512MB as default for large data
405                        let target_bytes = 512 * 1024 * 1024;
406                        (target_bytes / elem_size).max(1).min(total_elements)
407                    }
408                    AdvancedChunkingStrategy::HugeDataStreaming { max_memory_mb } => {
409                        // Minimal memory footprint - use specified limit
410                        let target_bytes = max_memory_mb * 1024 * 1024;
411                        (target_bytes / elem_size).max(1).min(total_elements)
412                    }
413                    AdvancedChunkingStrategy::AdaptiveLarge => {
414                        // Auto-adjust based on available memory
415                        use super::platform_memory::PlatformMemoryInfo;
416
417                        let available_mem = PlatformMemoryInfo::detect()
418                            .map(|info| info.available_memory)
419                            .unwrap_or(512 * 1024 * 1024);
420
421                        // Use 25% of available memory for chunks
422                        let target_bytes =
423                            (available_mem / 4).clamp(256 * 1024 * 1024, 2 * 1024 * 1024 * 1024); // 256MB to 2GB
424
425                        (target_bytes / elem_size).max(1).min(total_elements)
426                    }
427                };
428                let num_chunks = total_elements.div_ceil(chunk_size);
429                (chunk_size, num_chunks)
430            }
431        };
432
433        Self {
434            data: owned_data,
435            strategy,
436            chunk_size,
437            num_chunks,
438            optimizer: Some(MemoryPatternOptimizer::new()),
439            phantom: PhantomData,
440        }
441    }
442
443    /// Create a chunked array with advanced memory optimization
444    pub fn with_memory_optimization<S: Data<Elem = A>>(
445        data: ArrayBase<S, D>,
446        strategy: AdvancedChunkingStrategy,
447    ) -> Self {
448        Self::new(data, ChunkingStrategy::Advanced(strategy))
449    }
450
451    /// Get access to the memory pattern optimizer (if available)
452    pub fn optimizer(&self) -> Option<&MemoryPatternOptimizer> {
453        self.optimizer.as_ref()
454    }
455
456    /// Get mutable access to the memory pattern optimizer (if available)
457    pub fn optimizer_mut(&mut self) -> Option<&mut MemoryPatternOptimizer> {
458        self.optimizer.as_mut()
459    }
460
461    /// Record processing time for adaptive optimization
462    pub fn record_processing_time(&mut self, duration: Duration) {
463        if let Some(ref mut optimizer) = self.optimizer {
464            optimizer.record_processing_time(duration);
465        }
466    }
467
468    /// Update access pattern statistics
469    pub fn update_access_pattern_stats(&mut self, stats: &AccessPatternStats) {
470        if let Some(ref mut optimizer) = self.optimizer {
471            optimizer.update_access_pattern_stats(stats);
472        }
473    }
474
475    /// Apply a function to each chunk of the array and collect the results
476    ///
477    /// Returns a 1D array where each element is the result of applying the function to a chunk
478    pub fn map<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
479    where
480        F: Fn(&Array<A, D>) -> B + Sync,
481        B: Clone,
482    {
483        // Get chunks and apply the function to each
484        let chunks = self.get_chunks();
485        let results: Vec<B> = chunks.iter().map(f).collect();
486
487        // Return results as a 1D array
488        Array::from_vec(results)
489    }
490
491    /// Apply a function to each chunk of the array in parallel and collect the results
492    ///
493    /// Returns a 1D array where each element is the result of applying the function to a chunk
494    pub fn par_map<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
495    where
496        F: Fn(&Array<A, D>) -> B + Sync + Send,
497        B: Clone + Send + Sync,
498        A: Send + Sync,
499    {
500        #[cfg(feature = "parallel")]
501        {
502            use crate::parallel_ops::*;
503            use std::sync::Arc;
504
505            // Get chunks and wrap in Arc for thread-safe sharing
506            let chunks = self.get_chunks();
507            let chunks_arc = Arc::new(chunks);
508
509            // Process chunks in parallel using index-based iteration
510            let num_chunks = chunks_arc.len();
511            let results: Vec<B> = (0..num_chunks)
512                .into_par_iter()
513                .map(move |i| {
514                    let chunks_ref = Arc::clone(&chunks_arc);
515                    f(&chunks_ref[i])
516                })
517                .collect();
518
519            // Return results as a 1D array
520            Array::from_vec(results)
521        }
522
523        #[cfg(not(feature = "parallel"))]
524        {
525            // Fall back to sequential processing
526            self.map(f)
527        }
528    }
529
530    /// Apply a function to each chunk with performance monitoring and adaptive optimization
531    pub fn map_withmonitoring<F, B>(&mut self, f: F) -> Array<B, crate::ndarray::Ix1>
532    where
533        F: Fn(&Array<A, D>) -> B + Sync,
534        B: Clone,
535    {
536        let start_time = Instant::now();
537        let chunks = self.get_chunks();
538        let mut results = Vec::with_capacity(chunks.len());
539
540        for chunk in chunks {
541            let chunk_start = Instant::now();
542            let result = f(&chunk);
543            let chunk_duration = chunk_start.elapsed();
544
545            // Record processing time for adaptive optimization
546            if let Some(ref mut optimizer) = self.optimizer {
547                optimizer.record_processing_time(chunk_duration);
548            }
549
550            results.push(result);
551        }
552
553        let total_duration = start_time.elapsed();
554        self.record_processing_time(total_duration);
555
556        Array::from_vec(results)
557    }
558
559    /// Apply a function to each chunk with NUMA-aware processing
560    pub fn map_numa_aware<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
561    where
562        F: Fn(&Array<A, D>) -> B + Sync + Send,
563        B: Clone + Send + Sync,
564        A: Send + Sync,
565    {
566        #[cfg(feature = "parallel")]
567        {
568            if let Some(ref optimizer) = self.optimizer {
569                use crate::parallel_ops::*;
570
571                // Calculate NUMA-aware chunk distribution
572                let num_threads = crate::parallel_ops::get_num_threads();
573                let chunk_size = self.data.len() / num_threads.max(1);
574                let numa_chunks: Vec<_> = (0..num_threads)
575                    .map(|i| {
576                        let start = i * chunk_size;
577                        let end = if i == num_threads - 1 {
578                            self.data.len()
579                        } else {
580                            (i + 1) * chunk_size
581                        };
582                        start..end
583                    })
584                    .collect();
585                let chunks = self.get_chunks();
586
587                let results: Vec<B> = numa_chunks
588                    .into_par_iter()
589                    .enumerate()
590                    .map(|(i, range)| {
591                        if !chunks.is_empty() {
592                            f(&chunks[0])
593                        } else {
594                            // Handle edge case - this shouldn't happen but provide fallback
595                            f(&chunks[chunks.len() - 1])
596                        }
597                    })
598                    .collect();
599
600                return Array::from_vec(results);
601            }
602        }
603
604        // Fallback to regular parallel processing
605        self.par_map(f)
606    }
607
608    /// Apply a function to each chunk with cache-optimized processing
609    pub fn map_cache_optimized<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
610    where
611        F: Fn(&Array<A, D>) -> B + Sync,
612        B: Clone,
613    {
614        let chunks = self.get_chunks();
615        let mut results = Vec::with_capacity(chunks.len());
616
617        if let Some(ref optimizer) = self.optimizer {
618            // Process chunks in cache-friendly order
619            let cache_aware_chunk_size =
620                optimizer.calculate_cache_aware_chunk_size::<A>(self.data.len());
621
622            // If our current chunks are larger than cache-aware size, process sub-chunks
623            if self.chunk_size > cache_aware_chunk_size {
624                for chunk in chunks {
625                    // Process each chunk in cache-friendly sub-chunks
626                    let chunk_len = chunk.len();
627                    let sub_chunk_size = cache_aware_chunk_size.min(chunk_len);
628
629                    if chunk_len <= sub_chunk_size {
630                        results.push(f(&chunk));
631                    } else {
632                        // For now, just process the whole chunk - in a full implementation
633                        // we would break it down further
634                        results.push(f(&chunk));
635                    }
636                }
637            } else {
638                // Chunks are already cache-friendly, process normally
639                for chunk in chunks {
640                    results.push(f(&chunk));
641                }
642            }
643        } else {
644            // No optimizer available, process normally
645            for chunk in chunks {
646                results.push(f(&chunk));
647            }
648        }
649
650        Array::from_vec(results)
651    }
652
653    /// Apply a function to each chunk with bandwidth-aware processing
654    pub fn mapbandwidth_aware<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
655    where
656        F: Fn(&Array<A, D>) -> B + Sync,
657        B: Clone,
658    {
659        let chunks = self.get_chunks();
660        let mut results = Vec::with_capacity(chunks.len());
661
662        if let Some(ref optimizer) = self.optimizer {
663            let bandwidth = optimizer.get_memorybandwidth();
664
665            if bandwidth > 0 {
666                // Calculate optimal chunk size for current bandwidth
667                let bandwidth_chunk_size =
668                    optimizer.calculatebandwidth_optimized_chunk_size::<A>(self.data.len());
669
670                // Process with bandwidth considerations
671                for chunk in chunks {
672                    let chunk_start = Instant::now();
673                    let result = f(&chunk);
674                    let processing_time = chunk_start.elapsed();
675
676                    // Add small delay if we're processing too fast for optimal bandwidth utilization
677                    let expected_time_ms = (chunk.len() * std::mem::size_of::<A>()) as f64
678                        / (bandwidth as f64 * 1000.0);
679                    let expected_duration = Duration::from_millis(expected_time_ms as u64);
680
681                    if processing_time < expected_duration {
682                        std::thread::sleep(expected_duration - processing_time);
683                    }
684
685                    results.push(result);
686                }
687            } else {
688                // No bandwidth info, process normally
689                for chunk in chunks {
690                    results.push(f(&chunk));
691                }
692            }
693        } else {
694            // No optimizer available, process normally
695            for chunk in chunks {
696                results.push(f(&chunk));
697            }
698        }
699
700        Array::from_vec(results)
701    }
702
703    /// Apply a function to each chunk with power-aware processing (for mobile/embedded)
704    pub fn map_power_aware<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
705    where
706        F: Fn(&Array<A, D>) -> B + Sync,
707        B: Clone,
708    {
709        let chunks = self.get_chunks();
710        let mut results = Vec::with_capacity(chunks.len());
711
712        // Power-aware processing uses smaller chunks and includes rest periods
713        for (i, chunk) in chunks.iter().enumerate() {
714            let result = f(chunk);
715            results.push(result);
716
717            // Add small rest period every few chunks to reduce power consumption
718            if i % 4 == 3 {
719                std::thread::sleep(Duration::from_millis(1));
720            }
721        }
722
723        Array::from_vec(results)
724    }
725
726    /// Measure and update memory bandwidth for the optimizer
727    pub fn measure_memorybandwidth(&mut self) -> Option<usize> {
728        if let Some(ref mut optimizer) = self.optimizer {
729            let start_time = Instant::now();
730            let chunk_size = 1024 * 1024; // 1MB test chunk
731            let test_data = vec![0u8; chunk_size];
732
733            // Perform a memory-intensive operation to measure bandwidth
734            let mut sum = 0u64;
735            for &byte in &test_data {
736                sum += byte as u64;
737            }
738
739            let _elapsed = start_time.elapsed();
740            let bandwidth_mbps = if std::time::Duration::from_secs(1).as_nanos() > 0 {
741                (chunk_size as u128 * 1000) / std::time::Duration::from_secs(1).as_nanos()
742            // MB/s
743            } else {
744                0
745            } as usize;
746
747            optimizer
748                .memorybandwidth
749                .store(bandwidth_mbps, std::sync::atomic::Ordering::Relaxed);
750
751            // Prevent optimization from removing the test
752            std::hint::black_box(sum);
753
754            Some(bandwidth_mbps)
755        } else {
756            None
757        }
758    }
759
760    /// Apply a function using the best available optimization strategy
761    pub fn map_optimized<F, B>(&mut self, f: F) -> Array<B, crate::ndarray::Ix1>
762    where
763        F: Fn(&Array<A, D>) -> B + Sync + Send,
764        B: Clone + Send + Sync,
765        A: Send + Sync,
766    {
767        if let Some(ref optimizer) = self.optimizer {
768            // Measure bandwidth if not already done
769            if optimizer.get_memorybandwidth() == 0 {
770                self.measure_memorybandwidth();
771            }
772
773            // Choose optimization strategy based on data characteristics
774            let data_size = self.data.len() * std::mem::size_of::<A>();
775
776            match self.strategy {
777                ChunkingStrategy::Advanced(AdvancedChunkingStrategy::NumaAware) => {
778                    self.map_numa_aware(f)
779                }
780                ChunkingStrategy::Advanced(AdvancedChunkingStrategy::CacheLineAligned) => {
781                    self.map_cache_optimized(f)
782                }
783                ChunkingStrategy::Advanced(AdvancedChunkingStrategy::BandwidthOptimized) => {
784                    self.mapbandwidth_aware(f)
785                }
786                ChunkingStrategy::Advanced(AdvancedChunkingStrategy::PowerAware) => {
787                    self.map_power_aware(f)
788                }
789                ChunkingStrategy::Advanced(AdvancedChunkingStrategy::Adaptive) => {
790                    // Use monitoring for adaptive optimization
791                    self.map_withmonitoring(f)
792                }
793                _ => {
794                    // Auto-select based on data size and system characteristics
795                    if data_size > 100 * 1024 * 1024 {
796                        // Large data - use NUMA-aware processing
797                        self.map_numa_aware(f)
798                    } else if data_size > 10 * 1024 * 1024 {
799                        // Medium data - use cache optimization
800                        self.map_cache_optimized(f)
801                    } else {
802                        // Small data - use regular parallel processing
803                        self.par_map(f)
804                    }
805                }
806            }
807        } else {
808            // No optimizer available, use regular parallel processing
809            self.par_map(f)
810        }
811    }
812
813    /// Update access pattern statistics based on processing performance
814    pub fn update_access_pattern_statistics(&mut self, processingtimes: &[Duration]) {
815        if let Some(ref mut optimizer) = self.optimizer {
816            // Calculate access pattern statistics from processing _times
817            let avg_time = if !processingtimes.is_empty() {
818                processingtimes.iter().map(|d| d.as_nanos()).sum::<u128>()
819                    / processingtimes.len() as u128
820            } else {
821                0
822            };
823
824            // Estimate cache hit ratio based on performance consistency
825            let time_variance = if processingtimes.len() > 1 {
826                let variance = processingtimes
827                    .iter()
828                    .map(|d| {
829                        let diff = d.as_nanos() as i128 - avg_time as i128;
830                        (diff * diff) as u128
831                    })
832                    .sum::<u128>()
833                    / (processingtimes.len() - 1) as u128;
834                variance as f64 / avg_time as f64
835            } else {
836                0.0
837            };
838
839            let cache_hit_ratio = (1.0 - time_variance.min(1.0)).max(0.0);
840
841            let stats = AccessPatternStats {
842                sequential_access_ratio: 0.8, // Assume mostly sequential for chunked processing
843                random_access_ratio: 0.1,
844                strided_access_ratio: 0.1,
845                cache_hit_ratio,
846                memorybandwidth_utilization: 0.7, // Conservative estimate
847                last_updated: Some(Instant::now()),
848            };
849
850            optimizer.update_access_pattern_stats(&stats);
851        }
852    }
853
854    /// Get the number of chunks
855    pub fn num_chunks(&self) -> usize {
856        self.num_chunks
857    }
858
859    /// Get the chunk size in elements
860    pub fn chunk_size(&self) -> usize {
861        self.chunk_size
862    }
863
864    /// Get chunks of the array as a vector of owned array chunks
865    pub fn get_chunks(&self) -> Vec<Array<A, D>>
866    where
867        D: Clone,
868    {
869        let mut result = Vec::with_capacity(self.num_chunks);
870
871        // Special handling for 1D arrays
872        if self.data.ndim() == 1 {
873            if let Some(slice) = self.data.as_slice() {
874                for i in 0..self.num_chunks {
875                    let start = i * self.chunk_size;
876                    let end = ((i + 1) * self.chunk_size).min(slice.len());
877                    let chunk_slice = &slice[start..end];
878
879                    // Create a new array from the chunk slice
880                    // We need to handle the dimension conversion carefully
881                    let chunk_1d = Array::from_vec(chunk_slice.to_vec());
882
883                    // Try to convert to the original dimension type
884                    // For 1D arrays, this should work directly
885                    if let Ok(reshaped) = chunk_1d.into_dimensionality::<D>() {
886                        result.push(reshaped);
887                    } else {
888                        // Fallback: return the whole array if conversion fails
889                        return vec![self.data.clone()];
890                    }
891                }
892                return result;
893            }
894        }
895
896        // For multi-dimensional arrays or if slicing fails, return the whole array as a single chunk
897        result.push(self.data.clone());
898        result
899    }
900}
901
902/// Perform an operation on an array in a chunk-wise manner to reduce memory usage
903///
904/// # Arguments
905///
906/// * `array` - The input array
907/// * `op` - The operation to apply to each chunk
908/// * `strategy` - The chunking strategy
909///
910/// # Returns
911///
912/// The result array after applying the operation to all chunks
913#[allow(dead_code)]
914pub fn chunk_wise_op<A, F, B, S, D>(
915    array: &ArrayBase<S, D>,
916    op: F,
917    strategy: ChunkingStrategy,
918) -> Result<Array<B, D>, CoreError>
919where
920    A: Clone,
921    S: Data<Elem = A>,
922    F: Fn(&ArrayBase<S, D>) -> Array<B, D>,
923    B: Clone,
924    D: Dimension + Clone,
925{
926    validation::check_not_empty(array)?;
927
928    // If the array is small, just apply the operation directly
929    if array.len() <= 1000 {
930        return Ok(op(array));
931    }
932
933    let _chunked = ChunkedArray::new(array.to_owned(), strategy);
934
935    // For now, we'll use a simple implementation that processes the whole array
936    // In a real implementation, we would process each chunk separately and combine the results
937
938    // Get a shallow copy of the array data
939    let resultshape = array.raw_dim().clone();
940    let result = op(array);
941
942    // Verify the result has the expected shape
943    if result.raw_dim() != array.raw_dim() {
944        return Err(CoreError::ValidationError(
945            ErrorContext::new(format!(
946                "Operation changed shape from {:?} to {:?}",
947                array.shape(),
948                result.shape()
949            ))
950            .with_location(ErrorLocation::new(file!(), line!())),
951        ));
952    }
953
954    Ok(result)
955}
956
957/// Perform a binary operation on two arrays in a chunk-wise manner
958///
959/// # Arguments
960///
961/// * `lhs` - The left-hand side array
962/// * `rhs` - The right-hand side array
963/// * `op` - The binary operation to apply to each pair of chunks
964/// * `strategy` - The chunking strategy
965///
966/// # Returns
967///
968/// The result array after applying the binary operation to all chunk pairs
969#[allow(dead_code)]
970pub fn chunk_wise_binary_op<A, B, F, C, S1, S2, D>(
971    lhs: &ArrayBase<S1, D>,
972    rhs: &ArrayBase<S2, D>,
973    op: F,
974    strategy: ChunkingStrategy,
975) -> Result<Array<C, D>, CoreError>
976where
977    A: Clone,
978    B: Clone,
979    S1: Data<Elem = A>,
980    S2: Data<Elem = B>,
981    F: Fn(&ArrayBase<S1, D>, &ArrayBase<S2, D>) -> Array<C, D>,
982    C: Clone,
983    D: Dimension + Clone,
984{
985    validation::checkshapes_match(lhs.shape(), rhs.shape())?;
986    validation::check_not_empty(lhs)?;
987
988    // If the arrays are small, just apply the operation directly
989    if lhs.len() <= 1000 {
990        return Ok(op(lhs, rhs));
991    }
992
993    // Create chunked arrays for both inputs
994    let chunked_lhs = ChunkedArray::new(lhs.to_owned(), strategy);
995    let chunked_rhs = ChunkedArray::new(rhs.to_owned(), strategy);
996
997    // For now, we'll use a simple implementation that processes the whole arrays
998    // In a real implementation, we would process each chunk pair separately and combine the results
999    let result = op(lhs, rhs);
1000
1001    // Verify the result has the expected shape
1002    if result.shape() != lhs.shape() {
1003        return Err(CoreError::ValidationError(
1004            ErrorContext::new(format!(
1005                "Binary operation changed shape from {:?} to {:?}",
1006                lhs.shape(),
1007                result.shape()
1008            ))
1009            .with_location(ErrorLocation::new(file!(), line!())),
1010        ));
1011    }
1012
1013    Ok(result)
1014}
1015
1016/// Perform a reduction operation on an array in a chunk-wise manner
1017///
1018/// # Arguments
1019///
1020/// * `array` - The input array
1021/// * `op` - The reduction operation to apply to each chunk
1022/// * `combine` - The function to combine the results from each chunk
1023/// * `strategy` - The chunking strategy
1024///
1025/// # Returns
1026///
1027/// The result of applying the reduction operation to all chunks
1028#[allow(dead_code)]
1029pub fn chunk_wise_reduce<A, F, G, B, S, D>(
1030    array: &ArrayBase<S, D>,
1031    chunk_op: F,
1032    combine: G,
1033    strategy: ChunkingStrategy,
1034) -> Result<B, CoreError>
1035where
1036    A: Clone,
1037    S: Data<Elem = A>,
1038    F: Fn(&ArrayBase<S, D>) -> B + Sync + Send,
1039    G: Fn(Vec<B>) -> B,
1040    B: Clone + Send + Sync,
1041    D: Dimension + Clone,
1042{
1043    validation::check_not_empty(array)?;
1044
1045    // If the array is small, just apply the operation directly
1046    if array.len() <= 1000 {
1047        return Ok(chunk_op(array));
1048    }
1049
1050    let chunked = ChunkedArray::new(array.to_owned(), strategy);
1051
1052    // For now, we'll use a simple implementation for the initial version
1053    // In a real implementation, we would process each chunk separately
1054    // and _combine the results, using Rayon for parallel execution
1055
1056    // Process the whole array directly for now
1057    let result = chunk_op(array);
1058    Ok(result)
1059}