Skip to main content

scirs2_core/memory_efficient/
adaptive_chunking.rs

1//! Adaptive chunking strategies for memory-efficient operations.
2//!
3//! This module provides algorithms that dynamically determine optimal
4//! chunk sizes based on workload characteristics, memory constraints,
5//! and data distribution patterns. Adaptive chunking can significantly
6//! improve performance by balancing memory usage with processing efficiency.
7
8use super::adaptive_feedback::SharedPredictor;
9use super::chunked::ChunkingStrategy;
10use super::memmap::MemoryMappedArray;
11use super::memmap_chunks::MemoryMappedChunks;
12use crate::error::{CoreError, CoreResult, ErrorContext, ErrorLocation};
13// use ::ndarray::Dimension; // Currently unused
14use std::time::Duration;
15
16/// Beta 2: Workload types for optimized chunking strategies
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum WorkloadType {
19    /// Memory-intensive workloads that need smaller chunks
20    MemoryIntensive,
21    /// Compute-intensive workloads that can benefit from larger chunks
22    ComputeIntensive,
23    /// I/O-intensive workloads that need optimized for throughput
24    IoIntensive,
25    /// Balanced workloads with mixed requirements
26    Balanced,
27}
28
29/// Parameters for configuring adaptive chunking behavior.
30#[derive(Clone)]
31pub struct AdaptiveChunkingParams {
32    /// Target memory usage per chunk (in bytes)
33    pub target_memory_usage: usize,
34
35    /// Maximum chunk size (in elements)
36    pub max_chunksize: usize,
37
38    /// Minimum chunk size (in elements)
39    pub min_chunksize: usize,
40
41    /// Target processing time per chunk (for time-based adaptation)
42    pub target_chunk_duration: Option<Duration>,
43
44    /// Whether to consider data distribution (can be expensive to calculate)
45    pub consider_distribution: bool,
46
47    /// Whether to adjust for parallel processing
48    pub optimize_for_parallel: bool,
49
50    /// Number of worker threads to optimize for (when parallel is enabled)
51    pub numworkers: Option<usize>,
52
53    /// Optional chunk size predictor for adaptive feedback
54    pub predictor: Option<SharedPredictor>,
55}
56
57impl std::fmt::Debug for AdaptiveChunkingParams {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("AdaptiveChunkingParams")
60            .field("target_memory_usage", &self.target_memory_usage)
61            .field("max_chunksize", &self.max_chunksize)
62            .field("min_chunksize", &self.min_chunksize)
63            .field("target_chunk_duration", &self.target_chunk_duration)
64            .field("consider_distribution", &self.consider_distribution)
65            .field("optimize_for_parallel", &self.optimize_for_parallel)
66            .field("numworkers", &self.numworkers)
67            .field(
68                "predictor",
69                &self.predictor.as_ref().map(|_| "Some(SharedPredictor)"),
70            )
71            .finish()
72    }
73}
74
75impl Default for AdaptiveChunkingParams {
76    fn default() -> Self {
77        // Beta 2: Enhanced defaults based on system detection
78        let available_memory = Self::detect_available_memory();
79        let cpu_cores = std::thread::available_parallelism()
80            .map(|n| n.get())
81            .unwrap_or(1);
82
83        // Target 1/8 of available memory per chunk, with reasonable bounds
84        let target_memory = if let Some(mem) = available_memory {
85            (mem / 8).clamp(16 * 1024 * 1024, 256 * 1024 * 1024) // 16MB to 256MB
86        } else {
87            64 * 1024 * 1024 // Default to 64MB
88        };
89
90        Self {
91            target_memory_usage: target_memory,
92            max_chunksize: usize::MAX,
93            min_chunksize: 1024,
94            target_chunk_duration: Some(Duration::from_millis(100)), // Beta 2: Default target 100ms per chunk
95            consider_distribution: true,                             // Beta 2: Enable by default
96            optimize_for_parallel: cpu_cores > 1,                    // Beta 2: Auto-detect
97            numworkers: Some(cpu_cores),
98            predictor: None, // No predictor by default - user can add if desired
99        }
100    }
101}
102
103impl AdaptiveChunkingParams {
104    /// Beta 2: Detect available system memory using cross-platform detection
105    fn detect_available_memory() -> Option<usize> {
106        use super::platform_memory::PlatformMemoryInfo;
107
108        PlatformMemoryInfo::detect().map(|info| info.available_memory)
109    }
110
111    /// Beta 2: Create optimized parameters for specific workload types
112    pub fn for_workload(workload: WorkloadType) -> Self {
113        let mut params = Self::default();
114
115        match workload {
116            WorkloadType::MemoryIntensive => {
117                params.target_memory_usage /= 2; // Use smaller chunks
118                params.consider_distribution = false; // Skip expensive analysis
119            }
120            WorkloadType::ComputeIntensive => {
121                params.target_chunk_duration = Some(Duration::from_millis(500)); // Longer chunks
122                params.optimize_for_parallel = true;
123            }
124            WorkloadType::IoIntensive => {
125                params.target_memory_usage *= 2; // Larger chunks for I/O
126                params.min_chunksize = 64 * 1024; // Larger minimum for I/O efficiency
127            }
128            WorkloadType::Balanced => {
129                // Use defaults
130            }
131        }
132
133        params
134    }
135}
136
137/// Result of adaptive chunking analysis.
138#[derive(Debug, Clone)]
139pub struct AdaptiveChunkingResult {
140    /// Recommended chunking strategy
141    pub strategy: ChunkingStrategy,
142
143    /// Estimated memory usage per chunk (in bytes)
144    pub estimated_memory_per_chunk: usize,
145
146    /// Factors that influenced the chunking decision
147    pub decision_factors: Vec<String>,
148}
149
150/// Trait for adaptive chunking capabilities.
151pub trait AdaptiveChunking<A: Clone + Copy + 'static + Send + Sync> {
152    /// Calculate an optimal chunking strategy based on array characteristics.
153    ///
154    /// # Arguments
155    ///
156    /// * `params` - Parameters to guide the adaptive chunking process
157    ///
158    /// # Returns
159    ///
160    /// A result containing the recommended chunking strategy and metadata
161    fn adaptive_chunking(
162        &self,
163        params: AdaptiveChunkingParams,
164    ) -> CoreResult<AdaptiveChunkingResult>;
165
166    /// Process chunks using an automatically determined optimal chunking strategy.
167    ///
168    /// # Arguments
169    ///
170    /// * `params` - Parameters to guide the adaptive chunking process
171    /// * `f` - Function to process each chunk
172    ///
173    /// # Returns
174    ///
175    /// A vector of results, one for each chunk
176    fn process_chunks_adaptive<F, R>(
177        &self,
178        params: AdaptiveChunkingParams,
179        f: F,
180    ) -> CoreResult<Vec<R>>
181    where
182        F: Fn(&[A], usize) -> R;
183
184    /// Process chunks mutably using an automatically determined optimal chunking strategy.
185    ///
186    /// # Arguments
187    ///
188    /// * `params` - Parameters to guide the adaptive chunking process
189    /// * `f` - Function to process each chunk
190    fn process_chunks_mut_adaptive<F>(
191        &mut self,
192        params: AdaptiveChunkingParams,
193        f: F,
194    ) -> CoreResult<()>
195    where
196        F: Fn(&mut [A], usize);
197
198    /// Process chunks in parallel using an automatically determined optimal chunking strategy.
199    ///
200    /// # Arguments
201    ///
202    /// * `params` - Parameters to guide the adaptive chunking process
203    /// * `f` - Function to process each chunk
204    ///
205    /// # Returns
206    ///
207    /// A vector of results, one for each chunk
208    #[cfg(feature = "parallel")]
209    fn process_chunks_parallel_adaptive<F, R>(
210        &self,
211        params: AdaptiveChunkingParams,
212        f: F,
213    ) -> CoreResult<Vec<R>>
214    where
215        F: Fn(&[A], usize) -> R + Send + Sync,
216        R: Send,
217        A: Send + Sync;
218}
219
220impl<A: Clone + Copy + 'static + Send + Sync> AdaptiveChunking<A> for MemoryMappedArray<A> {
221    fn adaptive_chunking(
222        &self,
223        params: AdaptiveChunkingParams,
224    ) -> CoreResult<AdaptiveChunkingResult> {
225        // Get total number of elements in the array
226        let total_elements = self.size;
227
228        // Calculate element size
229        let elementsize = std::mem::size_of::<A>();
230
231        // Prevent division by zero for zero-sized types
232        if elementsize == 0 {
233            return Err(CoreError::InvalidArgument(
234                ErrorContext::new("Cannot chunk zero-sized type".to_string())
235                    .with_location(ErrorLocation::new(file!(), line!())),
236            ));
237        }
238
239        // Calculate initial chunk size based on target memory usage
240        // Use checked division to prevent arithmetic overflow
241        let mut chunksize = params
242            .target_memory_usage
243            .checked_div(elementsize)
244            .ok_or_else(|| {
245                CoreError::ComputationError(
246                    ErrorContext::new("Arithmetic overflow in chunk size calculation".to_string())
247                        .with_location(ErrorLocation::new(file!(), line!())),
248                )
249            })?;
250
251        // Apply min/max constraints
252        chunksize = chunksize.clamp(params.min_chunksize, params.max_chunksize);
253
254        // Ensure we don't exceed total elements
255        chunksize = chunksize.min(total_elements);
256
257        // Consider dimensionality-specific adjustments
258        let (chunksize, decision_factors) = self.optimize_for_dimensionality(chunksize, &params)?;
259
260        // Factor in parallel processing if requested
261        let (chunksize, decision_factors) = if params.optimize_for_parallel {
262            let (parallel_chunksize, parallel_factors) =
263                self.optimize_for_parallel_processing(chunksize, decision_factors, &params);
264            // Re-apply dimensionality optimization after parallel adjustment
265            let (final_chunksize, mut final_factors) =
266                self.optimize_for_dimensionality(parallel_chunksize, &params)?;
267            final_factors.extend(parallel_factors);
268            (final_chunksize, final_factors)
269        } else {
270            (chunksize, decision_factors)
271        };
272
273        // Create final chunking strategy
274        let strategy = ChunkingStrategy::Fixed(chunksize);
275
276        // Calculate estimated memory per chunk using checked multiplication
277        let estimated_memory = chunksize.checked_mul(elementsize).ok_or_else(|| {
278            CoreError::ComputationError(
279                ErrorContext::new("Arithmetic overflow in memory estimation".to_string())
280                    .with_location(ErrorLocation::new(file!(), line!())),
281            )
282        })?;
283
284        Ok(AdaptiveChunkingResult {
285            strategy,
286            estimated_memory_per_chunk: estimated_memory,
287            decision_factors,
288        })
289    }
290
291    fn process_chunks_adaptive<F, R>(
292        &self,
293        params: AdaptiveChunkingParams,
294        f: F,
295    ) -> CoreResult<Vec<R>>
296    where
297        F: Fn(&[A], usize) -> R,
298    {
299        // Determine optimal chunking strategy
300        let adaptive_result = self.adaptive_chunking(params)?;
301
302        // Use determined strategy to process chunks - wrap with Ok
303        Ok(self.process_chunks(adaptive_result.strategy, f))
304    }
305
306    fn process_chunks_mut_adaptive<F>(
307        &mut self,
308        params: AdaptiveChunkingParams,
309        f: F,
310    ) -> CoreResult<()>
311    where
312        F: Fn(&mut [A], usize),
313    {
314        // Determine optimal chunking strategy
315        let adaptive_result = self.adaptive_chunking(params)?;
316
317        // Use determined strategy to process chunks - wrap with Ok
318        self.process_chunks_mut(adaptive_result.strategy, f);
319        Ok(())
320    }
321
322    #[cfg(feature = "parallel")]
323    fn process_chunks_parallel_adaptive<F, R>(
324        &self,
325        params: AdaptiveChunkingParams,
326        f: F,
327    ) -> CoreResult<Vec<R>>
328    where
329        F: Fn(&[A], usize) -> R + Send + Sync,
330        R: Send,
331        A: Send + Sync,
332    {
333        // Make sure parameters are optimized for parallel processing
334        let mut parallel_params = params;
335        parallel_params.optimize_for_parallel = true;
336
337        // Set default number of workers if not specified
338        if parallel_params.numworkers.is_none() {
339            parallel_params.numworkers = Some(rayon::current_num_threads());
340        }
341
342        // Determine optimal chunking strategy for parallel processing
343        let adaptive_result = self.adaptive_chunking(parallel_params)?;
344
345        // Use determined strategy to process chunks in parallel
346        use super::memmap_chunks::MemoryMappedChunksParallel;
347        Ok(self.process_chunks_parallel(adaptive_result.strategy, f))
348    }
349}
350
351impl<A: Clone + Copy + 'static + Send + Sync> MemoryMappedArray<A> {
352    /// Optimize chunking based on array dimensionality.
353    fn optimize_for_dimensionality(
354        &self,
355        initial_chunksize: usize,
356        params: &AdaptiveChunkingParams,
357    ) -> CoreResult<(usize, Vec<String>)> {
358        let mut decision_factors = Vec::new();
359        let mut chunksize = initial_chunksize;
360
361        match self.shape.len() {
362            1 => {
363                // For 1D arrays, we can use the initial chunk size directly
364                decision_factors.push("1D array: Using direct chunking".to_string());
365            }
366            2 => {
367                // For 2D arrays, try to align with rows when possible
368                let row_length = self.shape[1];
369
370                if chunksize >= row_length {
371                    // If chunk size is larger than row length, adjust to be a multiple
372                    if chunksize % row_length != 0 {
373                        // Adjust to a multiple of row length for better cache behavior using checked arithmetic
374                        let newsize = (chunksize / row_length)
375                            .checked_mul(row_length)
376                            .unwrap_or(chunksize); // Fallback to original size on overflow
377                        if newsize >= params.min_chunksize {
378                            chunksize = newsize;
379                            decision_factors.push(format!(
380                                "2D array: Adjusted chunk size to {chunksize} (multiple of row length {row_length})"
381                            ));
382                        }
383                    }
384                } else {
385                    // If chunk size is smaller than row length, round up to row length
386                    // to ensure we process complete rows
387                    if row_length <= params.max_chunksize {
388                        chunksize = row_length;
389                        decision_factors.push(format!(
390                            "2D array: Adjusted chunk size to row length {row_length}"
391                        ));
392                    } else {
393                        // Row length exceeds max chunk size, keep original chunk size
394                        decision_factors.push(format!(
395                            "2D array: Row length {row_length} exceeds max chunk size, keeping chunk size {chunksize}"
396                        ));
397                    }
398                }
399            }
400            3 => {
401                // For 3D arrays, try to align with planes or rows using checked arithmetic
402                let planesize = self.shape[1].checked_mul(self.shape[2]).unwrap_or_else(|| {
403                    decision_factors.push(
404                        "3D array: Overflow in plane size calculation, using row alignment"
405                            .to_string(),
406                    );
407                    self.shape[2] // Fallback to row-based chunking
408                });
409                let row_length = self.shape[2];
410
411                if chunksize >= planesize && chunksize % planesize != 0 {
412                    // Adjust to a multiple of plane size for better cache behavior using checked arithmetic
413                    let newsize = (chunksize / planesize)
414                        .checked_mul(planesize)
415                        .unwrap_or(chunksize); // Fallback to original size on overflow
416                    if newsize >= params.min_chunksize {
417                        chunksize = newsize;
418                        decision_factors.push(format!(
419                            "3D array: Adjusted chunk size to {chunksize} (multiple of plane size {planesize})"
420                        ));
421                    }
422                } else if chunksize >= row_length && chunksize % row_length != 0 {
423                    // Adjust to a multiple of row length using checked arithmetic
424                    let newsize = (chunksize / row_length)
425                        .checked_mul(row_length)
426                        .unwrap_or(chunksize); // Fallback to original size on overflow
427                    if newsize >= params.min_chunksize {
428                        chunksize = newsize;
429                        decision_factors.push(format!(
430                            "3D array: Adjusted chunk size to {chunksize} (multiple of row length {row_length})"
431                        ));
432                    }
433                }
434            }
435            n => {
436                decision_factors.push(format!("{n}D array: Using default chunking strategy"));
437            }
438        }
439
440        Ok((chunksize, decision_factors))
441    }
442
443    /// Optimize chunking for parallel processing.
444    fn optimize_for_parallel_processing(
445        &self,
446        initial_chunksize: usize,
447        mut decision_factors: Vec<String>,
448        params: &AdaptiveChunkingParams,
449    ) -> (usize, Vec<String>) {
450        let mut chunksize = initial_chunksize;
451
452        if let Some(numworkers) = params.numworkers {
453            let total_elements = self.size;
454
455            // Ideally, we want at least numworkers * 2 chunks for good load balancing
456            // Use checked arithmetic to prevent overflow
457            let target_num_chunks = numworkers.checked_mul(2).unwrap_or(numworkers);
458            let ideal_chunksize = if target_num_chunks > 0 {
459                total_elements / target_num_chunks
460            } else {
461                total_elements // Fallback for edge cases
462            };
463
464            if ideal_chunksize >= params.min_chunksize && ideal_chunksize <= params.max_chunksize {
465                // Use the ideal chunk size for parallel processing
466                chunksize = ideal_chunksize;
467                decision_factors.push(format!(
468                    "Parallel optimization: Adjusted chunk size to {chunksize} for {numworkers} workers"
469                ));
470            } else if ideal_chunksize < params.min_chunksize {
471                // If ideal size is too small, use minimum size
472                chunksize = params.min_chunksize;
473                let actual_chunks = total_elements / chunksize
474                    + if total_elements % chunksize != 0 {
475                        1
476                    } else {
477                        0
478                    };
479                decision_factors.push(format!(
480                    "Parallel optimization: Using minimum chunk size {chunksize}, resulting in {actual_chunks} chunks for {numworkers} workers"
481                ));
482            }
483        } else {
484            decision_factors.push(
485                "Parallel optimization requested but no worker count specified, using default chunking".to_string()
486            );
487        }
488
489        (chunksize, decision_factors)
490    }
491}
492
493/// Builder for creating adaptive chunking parameters with a fluent API.
494#[derive(Debug, Clone)]
495pub struct AdaptiveChunkingBuilder {
496    params: AdaptiveChunkingParams,
497}
498
499impl AdaptiveChunkingBuilder {
500    /// Create a new builder with default parameters.
501    pub fn new() -> Self {
502        Self {
503            params: AdaptiveChunkingParams::default(),
504        }
505    }
506
507    /// Set the target memory usage per chunk.
508    pub const fn with_target_memory(mut self, bytes: usize) -> Self {
509        self.params.target_memory_usage = bytes;
510        self
511    }
512
513    /// Set the maximum chunk size.
514    pub const fn with_max_chunksize(mut self, size: usize) -> Self {
515        self.params.max_chunksize = size;
516        self
517    }
518
519    /// Set the minimum chunk size.
520    pub const fn with_min_chunksize(mut self, size: usize) -> Self {
521        self.params.min_chunksize = size;
522        self
523    }
524
525    /// Set the target chunk processing duration.
526    pub fn with_target_duration(mut self, duration: Duration) -> Self {
527        self.params.target_chunk_duration = Some(duration);
528        self
529    }
530
531    /// Enable consideration of data distribution.
532    pub const fn consider_distribution(mut self, enable: bool) -> Self {
533        self.params.consider_distribution = enable;
534        self
535    }
536
537    /// Enable optimization for parallel processing.
538    pub const fn optimize_for_parallel(mut self, enable: bool) -> Self {
539        self.params.optimize_for_parallel = enable;
540        self
541    }
542
543    /// Set the number of worker threads to optimize for.
544    pub fn with_numworkers(mut self, workers: usize) -> Self {
545        self.params.numworkers = Some(workers);
546        self
547    }
548
549    /// Build the parameters.
550    pub fn build(self) -> AdaptiveChunkingParams {
551        self.params
552    }
553}
554
555impl Default for AdaptiveChunkingBuilder {
556    fn default() -> Self {
557        Self::new()
558    }
559}
560
561/// Beta 2: Advanced adaptive chunking algorithms and load balancing
562pub mod beta2_enhancements {
563    use super::*;
564    use std::sync::atomic::AtomicUsize;
565    use std::sync::Arc;
566
567    /// Performance metrics collector for adaptive optimization
568    #[derive(Debug, Clone, Default)]
569    #[allow(dead_code)]
570    pub struct ChunkingPerformanceMetrics {
571        pub chunk_processing_times: Vec<Duration>,
572        pub memory_usage_per_chunk: Vec<usize>,
573        pub throughput_mbps: Vec<f64>,
574        pub cpu_utilization: Vec<f64>,
575    }
576
577    /// Beta 2: Dynamic load balancer for heterogeneous computing environments
578    #[allow(dead_code)]
579    pub struct DynamicLoadBalancer {
580        worker_performance: Vec<f64>,         // Relative performance scores
581        current_loads: Arc<Vec<AtomicUsize>>, // Current load per worker
582        target_efficiency: f64,               // Target CPU utilization (0.0 to 1.0)
583    }
584
585    #[allow(dead_code)]
586    impl DynamicLoadBalancer {
587        /// Create a new load balancer for the specified number of workers
588        pub fn new(numworkers: usize) -> Self {
589            Self {
590                worker_performance: vec![1.0; numworkers], // Start with equal performance
591                current_loads: Arc::new((0..numworkers).map(|_| AtomicUsize::new(0)).collect()),
592                target_efficiency: 0.85, // Target 85% CPU utilization
593            }
594        }
595
596        /// Calculate optimal chunk distribution based on worker performance
597        pub fn distribute_work(&self, totalwork: usize) -> Vec<usize> {
598            let total_performance: f64 = self.worker_performance.iter().sum();
599            let mut distribution = Vec::new();
600            let mut remaining_work = totalwork;
601
602            // Distribute work proportionally to performance, except for the last worker
603            for (i, &performance) in self.worker_performance.iter().enumerate() {
604                if i == self.worker_performance.len() - 1 {
605                    // Give all remaining work to the last worker
606                    distribution.push(remaining_work);
607                } else {
608                    let work_share = (totalwork as f64 * performance / total_performance) as usize;
609                    distribution.push(work_share);
610                    remaining_work = remaining_work.saturating_sub(work_share);
611                }
612            }
613
614            distribution
615        }
616
617        /// Update worker performance metrics based on observed execution times
618        pub fn update_performance(
619            &mut self,
620            workerid: usize,
621            work_amount: usize,
622            execution_time: Duration,
623        ) {
624            if workerid < self.worker_performance.len() {
625                // Calculate performance as work/time (higher is better)
626                let performance = work_amount as f64 / execution_time.as_secs_f64();
627
628                // Exponential moving average to adapt to changing conditions
629                let alpha = 0.1; // Learning rate
630                self.worker_performance[workerid] =
631                    (1.0 - alpha) * self.worker_performance[workerid] + alpha * performance;
632            }
633        }
634    }
635
636    /// Beta 2: Intelligent chunk size predictor using historical data
637    #[allow(dead_code)]
638    pub struct ChunkSizePredictor {
639        historical_metrics: Vec<ChunkingPerformanceMetrics>,
640        workload_characteristics: Vec<(WorkloadType, usize)>, // (workload_type, optimal_chunksize)
641    }
642
643    #[allow(dead_code)]
644    impl ChunkSizePredictor {
645        pub fn new() -> Self {
646            Self {
647                historical_metrics: Vec::new(),
648                workload_characteristics: Vec::new(),
649            }
650        }
651
652        /// Predict optimal chunk size based on workload characteristics and history
653        pub fn predict_chunk_size(
654            &self,
655            workload: WorkloadType,
656            memory_available: usize,
657            data_size: usize,
658        ) -> usize {
659            // Start with base predictions from historical data
660            let historical_prediction = self.get_historical_prediction(workload);
661
662            // Apply memory constraints
663            let memory_constrained = (memory_available / 4).max(1024); // Use 1/4 of available memory
664
665            // Apply data size constraints
666            let data_constrained = (data_size / 8).max(1024); // At least 8 chunks
667
668            // Combine predictions with weighting
669            let base_prediction = historical_prediction.unwrap_or(64 * 1024); // 64KB default
670            let memory_weight = 0.4;
671            let data_weight = 0.4;
672            let historical_weight = 0.2;
673
674            let predicted_size = (memory_weight * memory_constrained as f64
675                + data_weight * data_constrained as f64
676                + historical_weight * base_prediction as f64)
677                as usize;
678
679            // Ensure reasonable bounds
680            predicted_size.clamp(1024, 256 * 1024 * 1024) // 1KB to 256MB
681        }
682
683        fn get_historical_prediction(&self, workload: WorkloadType) -> Option<usize> {
684            // Find the most recent matching workload
685            self.workload_characteristics
686                .iter()
687                .rev() // Start from most recent
688                .find(|(wl, _)| *wl == workload)
689                .map(|(_, size)| *size)
690        }
691
692        /// Record performance metrics for future predictions
693        pub fn record_performance(
694            &mut self,
695            workload: WorkloadType,
696            chunk_size: usize,
697            metrics: ChunkingPerformanceMetrics,
698        ) {
699            self.historical_metrics.push(metrics);
700            self.workload_characteristics.push((workload, chunk_size));
701
702            // Keep only the last 100 entries to prevent unbounded growth
703            if self.historical_metrics.len() > 100 {
704                self.historical_metrics.remove(0);
705                self.workload_characteristics.remove(0);
706            }
707        }
708    }
709
710    /// Beta 2: NUMA-aware chunking for large multi-socket systems
711    #[allow(dead_code)]
712    pub fn numa_aware_chunking(data_size: usize, num_numanodes: usize) -> ChunkingStrategy {
713        if num_numanodes <= 1 {
714            return ChunkingStrategy::Auto;
715        }
716
717        // Try to align chunks with NUMA boundaries
718        let base_chunk_size = data_size / (num_numanodes * 2); // 2 chunks per NUMA node
719        let aligned_chunk_size = align_to_cache_line(base_chunk_size);
720
721        ChunkingStrategy::Fixed(aligned_chunk_size)
722    }
723
724    /// Align size to cache line boundaries for better performance
725    fn align_to_cache_line(size: usize) -> usize {
726        const CACHE_LINE_SIZE: usize = 64; // Common cache line size
727        size.div_ceil(CACHE_LINE_SIZE) * CACHE_LINE_SIZE
728    }
729}
730
731#[cfg(test)]
732mod tests {
733    use super::*;
734    use ::ndarray::Array2;
735    use std::fs::File;
736    use std::io::Write;
737    use tempfile::tempdir;
738
739    #[test]
740    fn test_adaptive_chunking_1d() {
741        // Create a temporary directory for our test files
742        let dir = tempdir().expect("Operation failed");
743        let file_path = dir.path().join("test_adaptive_1d.bin");
744
745        // Create a test array and save it to a file
746        let data: Vec<f64> = (0..100_000).map(|i| i as f64).collect();
747        let mut file = File::create(&file_path).expect("Operation failed");
748        for val in &data {
749            file.write_all(&val.to_ne_bytes())
750                .expect("Operation failed");
751        }
752        drop(file);
753
754        // Create a memory-mapped array
755        let mmap =
756            MemoryMappedArray::<f64>::path(&file_path, &[100_000]).expect("Operation failed");
757
758        // Create adaptive chunking parameters
759        let params = AdaptiveChunkingBuilder::new()
760            .with_target_memory(1024 * 1024) // 1MB chunks
761            .with_min_chunksize(1000)
762            .with_max_chunksize(50000)
763            .optimize_for_parallel(false) // Disable parallel optimization for this test
764            .build();
765
766        // Calculate adaptive chunking
767        let result = mmap.adaptive_chunking(params).expect("Operation failed");
768
769        // Verify results
770        match result.strategy {
771            ChunkingStrategy::Fixed(chunksize) => {
772                // The chunk size should be close to 1MB / 8 bytes = 131072 elements,
773                // but capped at our max of 50000
774                assert_eq!(chunksize, 50000);
775            }
776            _ => panic!("Expected fixed chunking strategy"),
777        }
778
779        // Verify that the estimated memory per chunk is reasonable
780        assert!(result.estimated_memory_per_chunk > 0);
781
782        // The decision factors should mention that it's a 1D array
783        assert!(result
784            .decision_factors
785            .iter()
786            .any(|s| s.contains("1D array")));
787    }
788
789    #[test]
790    fn test_adaptive_chunking_2d() {
791        // Create a temporary directory for our test files
792        let dir = tempdir().expect("Operation failed");
793        let file_path = dir.path().join("test_adaptive_2d.bin");
794
795        // Create dimensions that will test row alignment
796        let rows = 1000;
797        let cols = 120;
798
799        // Create a test 2D array and save it to a file
800        let data = Array2::<f64>::from_shape_fn((rows, cols), |(i, j)| (i * cols + j) as f64);
801        let mut file = File::create(&file_path).expect("Operation failed");
802        for val in data.iter() {
803            file.write_all(&val.to_ne_bytes())
804                .expect("Operation failed");
805        }
806        drop(file);
807
808        // Create a memory-mapped array
809        let mmap =
810            MemoryMappedArray::<f64>::path(&file_path, &[rows, cols]).expect("Operation failed");
811
812        // Create adaptive chunking parameters
813        let params = AdaptiveChunkingBuilder::new()
814            .with_target_memory(100 * 1024) // 100KB chunks
815            .with_min_chunksize(1000)
816            .with_max_chunksize(50000)
817            .build();
818
819        // Calculate adaptive chunking
820        let result = mmap.adaptive_chunking(params).expect("Operation failed");
821
822        // Verify results
823        match result.strategy {
824            ChunkingStrategy::Fixed(chunksize) => {
825                // The chunk size should be adjusted to be a multiple of the row length (120)
826                assert_eq!(
827                    chunksize % cols,
828                    0,
829                    "Chunk size should be a multiple of row length"
830                );
831            }
832            _ => panic!("Expected fixed chunking strategy"),
833        }
834
835        // The decision factors should mention that it's a 2D array
836        assert!(result
837            .decision_factors
838            .iter()
839            .any(|s| s.contains("2D array")));
840    }
841
842    #[test]
843    fn test_adaptive_chunking_parallel() {
844        // Create a temporary directory for our test files
845        let dir = tempdir().expect("Operation failed");
846        let file_path = dir.path().join("test_adaptive_parallel.bin");
847
848        // Create a large test array
849        let data: Vec<f64> = (0..1_000_000).map(|i| i as f64).collect();
850        let mut file = File::create(&file_path).expect("Operation failed");
851        for val in &data {
852            file.write_all(&val.to_ne_bytes())
853                .expect("Operation failed");
854        }
855        drop(file);
856
857        // Create a memory-mapped array
858        let mmap =
859            MemoryMappedArray::<f64>::path(&file_path, &[1_000_000]).expect("Operation failed");
860
861        // Create adaptive chunking parameters optimized for parallel processing
862        let params = AdaptiveChunkingBuilder::new()
863            .with_target_memory(10 * 1024 * 1024) // 10MB chunks
864            .optimize_for_parallel(true)
865            .with_numworkers(4)
866            .build();
867
868        // Calculate adaptive chunking
869        let result = mmap.adaptive_chunking(params).expect("Operation failed");
870
871        // Verify results
872        match result.strategy {
873            ChunkingStrategy::Fixed(chunksize) => {
874                // With 4 workers and desiring 8 chunks (2*workers), each chunk should handle ~125,000 elements
875                // But it might be adjusted based on other factors
876                assert!(chunksize > 0, "Chunk size should be positive");
877            }
878            _ => panic!("Expected fixed chunking strategy"),
879        }
880
881        // The decision factors should mention parallel optimization
882        assert!(result
883            .decision_factors
884            .iter()
885            .any(|s| s.contains("Parallel optimization")));
886    }
887}