scirs2_core/memory_efficient/
adaptive_chunking.rs

1//! Adaptive chunking strategies for memory-efficient operations.
2//!
3//! This module provides algorithms that dynamically determine optimal
4//! chunk sizes based on workload characteristics, memory constraints,
5//! and data distribution patterns. Adaptive chunking can significantly
6//! improve performance by balancing memory usage with processing efficiency.
7
8use super::chunked::ChunkingStrategy;
9use super::memmap::MemoryMappedArray;
10use super::memmap_chunks::MemoryMappedChunks;
11use crate::error::{CoreError, CoreResult, ErrorContext, ErrorLocation};
12// use ::ndarray::Dimension; // Currently unused
13use std::time::Duration;
14
15/// Beta 2: Workload types for optimized chunking strategies
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum WorkloadType {
18    /// Memory-intensive workloads that need smaller chunks
19    MemoryIntensive,
20    /// Compute-intensive workloads that can benefit from larger chunks
21    ComputeIntensive,
22    /// I/O-intensive workloads that need optimized for throughput
23    IoIntensive,
24    /// Balanced workloads with mixed requirements
25    Balanced,
26}
27
28/// Parameters for configuring adaptive chunking behavior.
29#[derive(Debug, Clone)]
30pub struct AdaptiveChunkingParams {
31    /// Target memory usage per chunk (in bytes)
32    pub target_memory_usage: usize,
33
34    /// Maximum chunk size (in elements)
35    pub max_chunksize: usize,
36
37    /// Minimum chunk size (in elements)
38    pub min_chunksize: usize,
39
40    /// Target processing time per chunk (for time-based adaptation)
41    pub target_chunk_duration: Option<Duration>,
42
43    /// Whether to consider data distribution (can be expensive to calculate)
44    pub consider_distribution: bool,
45
46    /// Whether to adjust for parallel processing
47    pub optimize_for_parallel: bool,
48
49    /// Number of worker threads to optimize for (when parallel is enabled)
50    pub numworkers: Option<usize>,
51}
52
53impl Default for AdaptiveChunkingParams {
54    fn default() -> Self {
55        // Beta 2: Enhanced defaults based on system detection
56        let available_memory = Self::detect_available_memory();
57        let cpu_cores = std::thread::available_parallelism()
58            .map(|n| n.get())
59            .unwrap_or(1);
60
61        // Target 1/8 of available memory per chunk, with reasonable bounds
62        let target_memory = if let Some(mem) = available_memory {
63            (mem / 8).clamp(16 * 1024 * 1024, 256 * 1024 * 1024) // 16MB to 256MB
64        } else {
65            64 * 1024 * 1024 // Default to 64MB
66        };
67
68        Self {
69            target_memory_usage: target_memory,
70            max_chunksize: usize::MAX,
71            min_chunksize: 1024,
72            target_chunk_duration: Some(Duration::from_millis(100)), // Beta 2: Default target 100ms per chunk
73            consider_distribution: true,                             // Beta 2: Enable by default
74            optimize_for_parallel: cpu_cores > 1,                    // Beta 2: Auto-detect
75            numworkers: Some(cpu_cores),
76        }
77    }
78}
79
80impl AdaptiveChunkingParams {
81    /// Beta 2: Detect available system memory
82    fn detect_available_memory() -> Option<usize> {
83        // Simplified memory detection - in a real implementation this would be more robust
84        #[cfg(unix)]
85        {
86            if let Ok(output) = std::process::Command::new("sh")
87                .args([
88                    "-c",
89                    "cat /proc/meminfo | grep MemAvailable | awk '{print $2}'",
90                ])
91                .output()
92            {
93                if let Ok(mem_str) = String::from_utf8(output.stdout) {
94                    if let Ok(mem_kb) = mem_str.trim().parse::<usize>() {
95                        return Some(mem_kb * 1024); // Convert from KB to bytes
96                    }
97                }
98            }
99        }
100        None
101    }
102
103    /// Beta 2: Create optimized parameters for specific workload types
104    pub fn for_workload(workload: WorkloadType) -> Self {
105        let mut params = Self::default();
106
107        match workload {
108            WorkloadType::MemoryIntensive => {
109                params.target_memory_usage /= 2; // Use smaller chunks
110                params.consider_distribution = false; // Skip expensive analysis
111            }
112            WorkloadType::ComputeIntensive => {
113                params.target_chunk_duration = Some(Duration::from_millis(500)); // Longer chunks
114                params.optimize_for_parallel = true;
115            }
116            WorkloadType::IoIntensive => {
117                params.target_memory_usage *= 2; // Larger chunks for I/O
118                params.min_chunksize = 64 * 1024; // Larger minimum for I/O efficiency
119            }
120            WorkloadType::Balanced => {
121                // Use defaults
122            }
123        }
124
125        params
126    }
127}
128
129/// Result of adaptive chunking analysis.
130#[derive(Debug, Clone)]
131pub struct AdaptiveChunkingResult {
132    /// Recommended chunking strategy
133    pub strategy: ChunkingStrategy,
134
135    /// Estimated memory usage per chunk (in bytes)
136    pub estimated_memory_per_chunk: usize,
137
138    /// Factors that influenced the chunking decision
139    pub decision_factors: Vec<String>,
140}
141
142/// Trait for adaptive chunking capabilities.
143pub trait AdaptiveChunking<A: Clone + Copy + 'static + Send + Sync> {
144    /// Calculate an optimal chunking strategy based on array characteristics.
145    ///
146    /// # Arguments
147    ///
148    /// * `params` - Parameters to guide the adaptive chunking process
149    ///
150    /// # Returns
151    ///
152    /// A result containing the recommended chunking strategy and metadata
153    fn adaptive_chunking(
154        &self,
155        params: AdaptiveChunkingParams,
156    ) -> CoreResult<AdaptiveChunkingResult>;
157
158    /// Process chunks using an automatically determined optimal chunking strategy.
159    ///
160    /// # Arguments
161    ///
162    /// * `params` - Parameters to guide the adaptive chunking process
163    /// * `f` - Function to process each chunk
164    ///
165    /// # Returns
166    ///
167    /// A vector of results, one for each chunk
168    fn process_chunks_adaptive<F, R>(
169        &self,
170        params: AdaptiveChunkingParams,
171        f: F,
172    ) -> CoreResult<Vec<R>>
173    where
174        F: Fn(&[A], usize) -> R;
175
176    /// Process chunks mutably using an automatically determined optimal chunking strategy.
177    ///
178    /// # Arguments
179    ///
180    /// * `params` - Parameters to guide the adaptive chunking process
181    /// * `f` - Function to process each chunk
182    fn process_chunks_mut_adaptive<F>(
183        &mut self,
184        params: AdaptiveChunkingParams,
185        f: F,
186    ) -> CoreResult<()>
187    where
188        F: Fn(&mut [A], usize);
189
190    /// Process chunks in parallel using an automatically determined optimal chunking strategy.
191    ///
192    /// # Arguments
193    ///
194    /// * `params` - Parameters to guide the adaptive chunking process
195    /// * `f` - Function to process each chunk
196    ///
197    /// # Returns
198    ///
199    /// A vector of results, one for each chunk
200    #[cfg(feature = "parallel")]
201    fn process_chunks_parallel_adaptive<F, R>(
202        &self,
203        params: AdaptiveChunkingParams,
204        f: F,
205    ) -> CoreResult<Vec<R>>
206    where
207        F: Fn(&[A], usize) -> R + Send + Sync,
208        R: Send,
209        A: Send + Sync;
210}
211
212impl<A: Clone + Copy + 'static + Send + Sync> AdaptiveChunking<A> for MemoryMappedArray<A> {
213    fn adaptive_chunking(
214        &self,
215        params: AdaptiveChunkingParams,
216    ) -> CoreResult<AdaptiveChunkingResult> {
217        // Get total number of elements in the array
218        let total_elements = self.size;
219
220        // Calculate element size
221        let elementsize = std::mem::size_of::<A>();
222
223        // Prevent division by zero for zero-sized types
224        if elementsize == 0 {
225            return Err(CoreError::InvalidArgument(
226                ErrorContext::new("Cannot chunk zero-sized type".to_string())
227                    .with_location(ErrorLocation::new(file!(), line!())),
228            ));
229        }
230
231        // Calculate initial chunk size based on target memory usage
232        // Use checked division to prevent arithmetic overflow
233        let mut chunksize = params
234            .target_memory_usage
235            .checked_div(elementsize)
236            .ok_or_else(|| {
237                CoreError::ComputationError(
238                    ErrorContext::new("Arithmetic overflow in chunk size calculation".to_string())
239                        .with_location(ErrorLocation::new(file!(), line!())),
240                )
241            })?;
242
243        // Apply min/max constraints
244        chunksize = chunksize.clamp(params.min_chunksize, params.max_chunksize);
245
246        // Ensure we don't exceed total elements
247        chunksize = chunksize.min(total_elements);
248
249        // Consider dimensionality-specific adjustments
250        let (chunksize, decision_factors) = self.optimize_for_dimensionality(chunksize, &params)?;
251
252        // Factor in parallel processing if requested
253        let (chunksize, decision_factors) = if params.optimize_for_parallel {
254            let (parallel_chunksize, parallel_factors) =
255                self.optimize_for_parallel_processing(chunksize, decision_factors, &params);
256            // Re-apply dimensionality optimization after parallel adjustment
257            let (final_chunksize, mut final_factors) =
258                self.optimize_for_dimensionality(parallel_chunksize, &params)?;
259            final_factors.extend(parallel_factors);
260            (final_chunksize, final_factors)
261        } else {
262            (chunksize, decision_factors)
263        };
264
265        // Create final chunking strategy
266        let strategy = ChunkingStrategy::Fixed(chunksize);
267
268        // Calculate estimated memory per chunk using checked multiplication
269        let estimated_memory = chunksize.checked_mul(elementsize).ok_or_else(|| {
270            CoreError::ComputationError(
271                ErrorContext::new("Arithmetic overflow in memory estimation".to_string())
272                    .with_location(ErrorLocation::new(file!(), line!())),
273            )
274        })?;
275
276        Ok(AdaptiveChunkingResult {
277            strategy,
278            estimated_memory_per_chunk: estimated_memory,
279            decision_factors,
280        })
281    }
282
283    fn process_chunks_adaptive<F, R>(
284        &self,
285        params: AdaptiveChunkingParams,
286        f: F,
287    ) -> CoreResult<Vec<R>>
288    where
289        F: Fn(&[A], usize) -> R,
290    {
291        // Determine optimal chunking strategy
292        let adaptive_result = self.adaptive_chunking(params)?;
293
294        // Use determined strategy to process chunks - wrap with Ok
295        Ok(self.process_chunks(adaptive_result.strategy, f))
296    }
297
298    fn process_chunks_mut_adaptive<F>(
299        &mut self,
300        params: AdaptiveChunkingParams,
301        f: F,
302    ) -> CoreResult<()>
303    where
304        F: Fn(&mut [A], usize),
305    {
306        // Determine optimal chunking strategy
307        let adaptive_result = self.adaptive_chunking(params)?;
308
309        // Use determined strategy to process chunks - wrap with Ok
310        self.process_chunks_mut(adaptive_result.strategy, f);
311        Ok(())
312    }
313
314    #[cfg(feature = "parallel")]
315    fn process_chunks_parallel_adaptive<F, R>(
316        &self,
317        params: AdaptiveChunkingParams,
318        f: F,
319    ) -> CoreResult<Vec<R>>
320    where
321        F: Fn(&[A], usize) -> R + Send + Sync,
322        R: Send,
323        A: Send + Sync,
324    {
325        // Make sure parameters are optimized for parallel processing
326        let mut parallel_params = params;
327        parallel_params.optimize_for_parallel = true;
328
329        // Set default number of workers if not specified
330        if parallel_params.numworkers.is_none() {
331            parallel_params.numworkers = Some(rayon::current_num_threads());
332        }
333
334        // Determine optimal chunking strategy for parallel processing
335        let adaptive_result = self.adaptive_chunking(parallel_params)?;
336
337        // Use determined strategy to process chunks in parallel
338        use super::memmap_chunks::MemoryMappedChunksParallel;
339        Ok(self.process_chunks_parallel(adaptive_result.strategy, f))
340    }
341}
342
343impl<A: Clone + Copy + 'static + Send + Sync> MemoryMappedArray<A> {
344    /// Optimize chunking based on array dimensionality.
345    fn optimize_for_dimensionality(
346        &self,
347        initial_chunksize: usize,
348        params: &AdaptiveChunkingParams,
349    ) -> CoreResult<(usize, Vec<String>)> {
350        let mut decision_factors = Vec::new();
351        let mut chunksize = initial_chunksize;
352
353        match self.shape.len() {
354            1 => {
355                // For 1D arrays, we can use the initial chunk size directly
356                decision_factors.push("1D array: Using direct chunking".to_string());
357            }
358            2 => {
359                // For 2D arrays, try to align with rows when possible
360                let row_length = self.shape[1];
361
362                if chunksize >= row_length {
363                    // If chunk size is larger than row length, adjust to be a multiple
364                    if chunksize % row_length != 0 {
365                        // Adjust to a multiple of row length for better cache behavior using checked arithmetic
366                        let newsize = (chunksize / row_length)
367                            .checked_mul(row_length)
368                            .unwrap_or(chunksize); // Fallback to original size on overflow
369                        if newsize >= params.min_chunksize {
370                            chunksize = newsize;
371                            decision_factors.push(format!(
372                                "2D array: Adjusted chunk size to {chunksize} (multiple of row length {row_length})"
373                            ));
374                        }
375                    }
376                } else {
377                    // If chunk size is smaller than row length, round up to row length
378                    // to ensure we process complete rows
379                    if row_length <= params.max_chunksize {
380                        chunksize = row_length;
381                        decision_factors.push(format!(
382                            "2D array: Adjusted chunk size to row length {row_length}"
383                        ));
384                    } else {
385                        // Row length exceeds max chunk size, keep original chunk size
386                        decision_factors.push(format!(
387                            "2D array: Row length {row_length} exceeds max chunk size, keeping chunk size {chunksize}"
388                        ));
389                    }
390                }
391            }
392            3 => {
393                // For 3D arrays, try to align with planes or rows using checked arithmetic
394                let planesize = self.shape[1].checked_mul(self.shape[2]).unwrap_or_else(|| {
395                    decision_factors.push(
396                        "3D array: Overflow in plane size calculation, using row alignment"
397                            .to_string(),
398                    );
399                    self.shape[2] // Fallback to row-based chunking
400                });
401                let row_length = self.shape[2];
402
403                if chunksize >= planesize && chunksize % planesize != 0 {
404                    // Adjust to a multiple of plane size for better cache behavior using checked arithmetic
405                    let newsize = (chunksize / planesize)
406                        .checked_mul(planesize)
407                        .unwrap_or(chunksize); // Fallback to original size on overflow
408                    if newsize >= params.min_chunksize {
409                        chunksize = newsize;
410                        decision_factors.push(format!(
411                            "3D array: Adjusted chunk size to {chunksize} (multiple of plane size {planesize})"
412                        ));
413                    }
414                } else if chunksize >= row_length && chunksize % row_length != 0 {
415                    // Adjust to a multiple of row length using checked arithmetic
416                    let newsize = (chunksize / row_length)
417                        .checked_mul(row_length)
418                        .unwrap_or(chunksize); // Fallback to original size on overflow
419                    if newsize >= params.min_chunksize {
420                        chunksize = newsize;
421                        decision_factors.push(format!(
422                            "3D array: Adjusted chunk size to {chunksize} (multiple of row length {row_length})"
423                        ));
424                    }
425                }
426            }
427            n => {
428                decision_factors.push(format!("{n}D array: Using default chunking strategy"));
429            }
430        }
431
432        Ok((chunksize, decision_factors))
433    }
434
435    /// Optimize chunking for parallel processing.
436    fn optimize_for_parallel_processing(
437        &self,
438        initial_chunksize: usize,
439        mut decision_factors: Vec<String>,
440        params: &AdaptiveChunkingParams,
441    ) -> (usize, Vec<String>) {
442        let mut chunksize = initial_chunksize;
443
444        if let Some(numworkers) = params.numworkers {
445            let total_elements = self.size;
446
447            // Ideally, we want at least numworkers * 2 chunks for good load balancing
448            // Use checked arithmetic to prevent overflow
449            let target_num_chunks = numworkers.checked_mul(2).unwrap_or(numworkers);
450            let ideal_chunksize = if target_num_chunks > 0 {
451                total_elements / target_num_chunks
452            } else {
453                total_elements // Fallback for edge cases
454            };
455
456            if ideal_chunksize >= params.min_chunksize && ideal_chunksize <= params.max_chunksize {
457                // Use the ideal chunk size for parallel processing
458                chunksize = ideal_chunksize;
459                decision_factors.push(format!(
460                    "Parallel optimization: Adjusted chunk size to {chunksize} for {numworkers} workers"
461                ));
462            } else if ideal_chunksize < params.min_chunksize {
463                // If ideal size is too small, use minimum size
464                chunksize = params.min_chunksize;
465                let actual_chunks = total_elements / chunksize
466                    + if total_elements % chunksize != 0 {
467                        1
468                    } else {
469                        0
470                    };
471                decision_factors.push(format!(
472                    "Parallel optimization: Using minimum chunk size {chunksize}, resulting in {actual_chunks} chunks for {numworkers} workers"
473                ));
474            }
475        } else {
476            decision_factors.push(
477                "Parallel optimization requested but no worker count specified, using default chunking".to_string()
478            );
479        }
480
481        (chunksize, decision_factors)
482    }
483}
484
485/// Builder for creating adaptive chunking parameters with a fluent API.
486#[derive(Debug, Clone)]
487pub struct AdaptiveChunkingBuilder {
488    params: AdaptiveChunkingParams,
489}
490
491impl AdaptiveChunkingBuilder {
492    /// Create a new builder with default parameters.
493    pub fn new() -> Self {
494        Self {
495            params: AdaptiveChunkingParams::default(),
496        }
497    }
498
499    /// Set the target memory usage per chunk.
500    pub const fn with_target_memory(mut self, bytes: usize) -> Self {
501        self.params.target_memory_usage = bytes;
502        self
503    }
504
505    /// Set the maximum chunk size.
506    pub const fn with_max_chunksize(mut self, size: usize) -> Self {
507        self.params.max_chunksize = size;
508        self
509    }
510
511    /// Set the minimum chunk size.
512    pub const fn with_min_chunksize(mut self, size: usize) -> Self {
513        self.params.min_chunksize = size;
514        self
515    }
516
517    /// Set the target chunk processing duration.
518    pub fn with_target_duration(mut self, duration: Duration) -> Self {
519        self.params.target_chunk_duration = Some(duration);
520        self
521    }
522
523    /// Enable consideration of data distribution.
524    pub const fn consider_distribution(mut self, enable: bool) -> Self {
525        self.params.consider_distribution = enable;
526        self
527    }
528
529    /// Enable optimization for parallel processing.
530    pub const fn optimize_for_parallel(mut self, enable: bool) -> Self {
531        self.params.optimize_for_parallel = enable;
532        self
533    }
534
535    /// Set the number of worker threads to optimize for.
536    pub fn with_numworkers(mut self, workers: usize) -> Self {
537        self.params.numworkers = Some(workers);
538        self
539    }
540
541    /// Build the parameters.
542    pub fn build(self) -> AdaptiveChunkingParams {
543        self.params
544    }
545}
546
547impl Default for AdaptiveChunkingBuilder {
548    fn default() -> Self {
549        Self::new()
550    }
551}
552
553/// Beta 2: Advanced adaptive chunking algorithms and load balancing
554pub mod beta2_enhancements {
555    use super::*;
556    use std::sync::atomic::AtomicUsize;
557    use std::sync::Arc;
558
559    /// Performance metrics collector for adaptive optimization
560    #[derive(Debug, Clone, Default)]
561    #[allow(dead_code)]
562    pub struct ChunkingPerformanceMetrics {
563        pub chunk_processing_times: Vec<Duration>,
564        pub memory_usage_per_chunk: Vec<usize>,
565        pub throughput_mbps: Vec<f64>,
566        pub cpu_utilization: Vec<f64>,
567    }
568
569    /// Beta 2: Dynamic load balancer for heterogeneous computing environments
570    #[allow(dead_code)]
571    pub struct DynamicLoadBalancer {
572        worker_performance: Vec<f64>,         // Relative performance scores
573        current_loads: Arc<Vec<AtomicUsize>>, // Current load per worker
574        target_efficiency: f64,               // Target CPU utilization (0.0 to 1.0)
575    }
576
577    #[allow(dead_code)]
578    impl DynamicLoadBalancer {
579        /// Create a new load balancer for the specified number of workers
580        pub fn new(numworkers: usize) -> Self {
581            Self {
582                worker_performance: vec![1.0; numworkers], // Start with equal performance
583                current_loads: Arc::new((0..numworkers).map(|_| AtomicUsize::new(0)).collect()),
584                target_efficiency: 0.85, // Target 85% CPU utilization
585            }
586        }
587
588        /// Calculate optimal chunk distribution based on worker performance
589        pub fn distribute_work(&self, totalwork: usize) -> Vec<usize> {
590            let total_performance: f64 = self.worker_performance.iter().sum();
591            let mut distribution = Vec::new();
592            let mut remaining_work = totalwork;
593
594            // Distribute work proportionally to performance, except for the last worker
595            for (i, &performance) in self.worker_performance.iter().enumerate() {
596                if i == self.worker_performance.len() - 1 {
597                    // Give all remaining work to the last worker
598                    distribution.push(remaining_work);
599                } else {
600                    let work_share = (totalwork as f64 * performance / total_performance) as usize;
601                    distribution.push(work_share);
602                    remaining_work = remaining_work.saturating_sub(work_share);
603                }
604            }
605
606            distribution
607        }
608
609        /// Update worker performance metrics based on observed execution times
610        pub fn update_performance(
611            &mut self,
612            workerid: usize,
613            work_amount: usize,
614            execution_time: Duration,
615        ) {
616            if workerid < self.worker_performance.len() {
617                // Calculate performance as work/time (higher is better)
618                let performance = work_amount as f64 / execution_time.as_secs_f64();
619
620                // Exponential moving average to adapt to changing conditions
621                let alpha = 0.1; // Learning rate
622                self.worker_performance[workerid] =
623                    (1.0 - alpha) * self.worker_performance[workerid] + alpha * performance;
624            }
625        }
626    }
627
628    /// Beta 2: Intelligent chunk size predictor using historical data
629    #[allow(dead_code)]
630    pub struct ChunkSizePredictor {
631        historical_metrics: Vec<ChunkingPerformanceMetrics>,
632        workload_characteristics: Vec<(WorkloadType, usize)>, // (workload_type, optimal_chunksize)
633    }
634
635    #[allow(dead_code)]
636    impl ChunkSizePredictor {
637        pub fn new() -> Self {
638            Self {
639                historical_metrics: Vec::new(),
640                workload_characteristics: Vec::new(),
641            }
642        }
643
644        /// Predict optimal chunk size based on workload characteristics and history
645        pub fn predict_chunk_size(
646            &self,
647            workload: WorkloadType,
648            memory_available: usize,
649            data_size: usize,
650        ) -> usize {
651            // Start with base predictions from historical data
652            let historical_prediction = self.get_historical_prediction(workload);
653
654            // Apply memory constraints
655            let memory_constrained = (memory_available / 4).max(1024); // Use 1/4 of available memory
656
657            // Apply data size constraints
658            let data_constrained = (data_size / 8).max(1024); // At least 8 chunks
659
660            // Combine predictions with weighting
661            let base_prediction = historical_prediction.unwrap_or(64 * 1024); // 64KB default
662            let memory_weight = 0.4;
663            let data_weight = 0.4;
664            let historical_weight = 0.2;
665
666            let predicted_size = (memory_weight * memory_constrained as f64
667                + data_weight * data_constrained as f64
668                + historical_weight * base_prediction as f64)
669                as usize;
670
671            // Ensure reasonable bounds
672            predicted_size.clamp(1024, 256 * 1024 * 1024) // 1KB to 256MB
673        }
674
675        fn get_historical_prediction(&self, workload: WorkloadType) -> Option<usize> {
676            // Find the most recent matching workload
677            self.workload_characteristics
678                .iter()
679                .rev() // Start from most recent
680                .find(|(wl, _)| *wl == workload)
681                .map(|(_, size)| *size)
682        }
683
684        /// Record performance metrics for future predictions
685        pub fn record_performance(
686            &mut self,
687            workload: WorkloadType,
688            chunk_size: usize,
689            metrics: ChunkingPerformanceMetrics,
690        ) {
691            self.historical_metrics.push(metrics);
692            self.workload_characteristics.push((workload, chunk_size));
693
694            // Keep only the last 100 entries to prevent unbounded growth
695            if self.historical_metrics.len() > 100 {
696                self.historical_metrics.remove(0);
697                self.workload_characteristics.remove(0);
698            }
699        }
700    }
701
702    /// Beta 2: NUMA-aware chunking for large multi-socket systems
703    #[allow(dead_code)]
704    pub fn numa_aware_chunking(data_size: usize, num_numanodes: usize) -> ChunkingStrategy {
705        if num_numanodes <= 1 {
706            return ChunkingStrategy::Auto;
707        }
708
709        // Try to align chunks with NUMA boundaries
710        let base_chunk_size = data_size / (num_numanodes * 2); // 2 chunks per NUMA node
711        let aligned_chunk_size = align_to_cache_line(base_chunk_size);
712
713        ChunkingStrategy::Fixed(aligned_chunk_size)
714    }
715
716    /// Align size to cache line boundaries for better performance
717    fn align_to_cache_line(size: usize) -> usize {
718        const CACHE_LINE_SIZE: usize = 64; // Common cache line size
719        size.div_ceil(CACHE_LINE_SIZE) * CACHE_LINE_SIZE
720    }
721}
722
723#[cfg(test)]
724mod tests {
725    use super::*;
726    use ::ndarray::Array2;
727    use std::fs::File;
728    use std::io::Write;
729    use tempfile::tempdir;
730
731    #[test]
732    fn test_adaptive_chunking_1d() {
733        // Create a temporary directory for our test files
734        let dir = tempdir().expect("Operation failed");
735        let file_path = dir.path().join("test_adaptive_1d.bin");
736
737        // Create a test array and save it to a file
738        let data: Vec<f64> = (0..100_000).map(|i| i as f64).collect();
739        let mut file = File::create(&file_path).expect("Operation failed");
740        for val in &data {
741            file.write_all(&val.to_ne_bytes())
742                .expect("Operation failed");
743        }
744        drop(file);
745
746        // Create a memory-mapped array
747        let mmap =
748            MemoryMappedArray::<f64>::path(&file_path, &[100_000]).expect("Operation failed");
749
750        // Create adaptive chunking parameters
751        let params = AdaptiveChunkingBuilder::new()
752            .with_target_memory(1024 * 1024) // 1MB chunks
753            .with_min_chunksize(1000)
754            .with_max_chunksize(50000)
755            .optimize_for_parallel(false) // Disable parallel optimization for this test
756            .build();
757
758        // Calculate adaptive chunking
759        let result = mmap.adaptive_chunking(params).expect("Operation failed");
760
761        // Verify results
762        match result.strategy {
763            ChunkingStrategy::Fixed(chunksize) => {
764                // The chunk size should be close to 1MB / 8 bytes = 131072 elements,
765                // but capped at our max of 50000
766                assert_eq!(chunksize, 50000);
767            }
768            _ => panic!("Expected fixed chunking strategy"),
769        }
770
771        // Verify that the estimated memory per chunk is reasonable
772        assert!(result.estimated_memory_per_chunk > 0);
773
774        // The decision factors should mention that it's a 1D array
775        assert!(result
776            .decision_factors
777            .iter()
778            .any(|s| s.contains("1D array")));
779    }
780
781    #[test]
782    fn test_adaptive_chunking_2d() {
783        // Create a temporary directory for our test files
784        let dir = tempdir().expect("Operation failed");
785        let file_path = dir.path().join("test_adaptive_2d.bin");
786
787        // Create dimensions that will test row alignment
788        let rows = 1000;
789        let cols = 120;
790
791        // Create a test 2D array and save it to a file
792        let data = Array2::<f64>::from_shape_fn((rows, cols), |(i, j)| (i * cols + j) as f64);
793        let mut file = File::create(&file_path).expect("Operation failed");
794        for val in data.iter() {
795            file.write_all(&val.to_ne_bytes())
796                .expect("Operation failed");
797        }
798        drop(file);
799
800        // Create a memory-mapped array
801        let mmap =
802            MemoryMappedArray::<f64>::path(&file_path, &[rows, cols]).expect("Operation failed");
803
804        // Create adaptive chunking parameters
805        let params = AdaptiveChunkingBuilder::new()
806            .with_target_memory(100 * 1024) // 100KB chunks
807            .with_min_chunksize(1000)
808            .with_max_chunksize(50000)
809            .build();
810
811        // Calculate adaptive chunking
812        let result = mmap.adaptive_chunking(params).expect("Operation failed");
813
814        // Verify results
815        match result.strategy {
816            ChunkingStrategy::Fixed(chunksize) => {
817                // The chunk size should be adjusted to be a multiple of the row length (120)
818                assert_eq!(
819                    chunksize % cols,
820                    0,
821                    "Chunk size should be a multiple of row length"
822                );
823            }
824            _ => panic!("Expected fixed chunking strategy"),
825        }
826
827        // The decision factors should mention that it's a 2D array
828        assert!(result
829            .decision_factors
830            .iter()
831            .any(|s| s.contains("2D array")));
832    }
833
834    #[test]
835    fn test_adaptive_chunking_parallel() {
836        // Create a temporary directory for our test files
837        let dir = tempdir().expect("Operation failed");
838        let file_path = dir.path().join("test_adaptive_parallel.bin");
839
840        // Create a large test array
841        let data: Vec<f64> = (0..1_000_000).map(|i| i as f64).collect();
842        let mut file = File::create(&file_path).expect("Operation failed");
843        for val in &data {
844            file.write_all(&val.to_ne_bytes())
845                .expect("Operation failed");
846        }
847        drop(file);
848
849        // Create a memory-mapped array
850        let mmap =
851            MemoryMappedArray::<f64>::path(&file_path, &[1_000_000]).expect("Operation failed");
852
853        // Create adaptive chunking parameters optimized for parallel processing
854        let params = AdaptiveChunkingBuilder::new()
855            .with_target_memory(10 * 1024 * 1024) // 10MB chunks
856            .optimize_for_parallel(true)
857            .with_numworkers(4)
858            .build();
859
860        // Calculate adaptive chunking
861        let result = mmap.adaptive_chunking(params).expect("Operation failed");
862
863        // Verify results
864        match result.strategy {
865            ChunkingStrategy::Fixed(chunksize) => {
866                // With 4 workers and desiring 8 chunks (2*workers), each chunk should handle ~125,000 elements
867                // But it might be adjusted based on other factors
868                assert!(chunksize > 0, "Chunk size should be positive");
869            }
870            _ => panic!("Expected fixed chunking strategy"),
871        }
872
873        // The decision factors should mention parallel optimization
874        assert!(result
875            .decision_factors
876            .iter()
877            .any(|s| s.contains("Parallel optimization")));
878    }
879}