1use super::adaptive_feedback::SharedPredictor;
9use super::chunked::ChunkingStrategy;
10use super::memmap::MemoryMappedArray;
11use super::memmap_chunks::MemoryMappedChunks;
12use crate::error::{CoreError, CoreResult, ErrorContext, ErrorLocation};
13use std::time::Duration;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum WorkloadType {
19 MemoryIntensive,
21 ComputeIntensive,
23 IoIntensive,
25 Balanced,
27}
28
29#[derive(Clone)]
31pub struct AdaptiveChunkingParams {
32 pub target_memory_usage: usize,
34
35 pub max_chunksize: usize,
37
38 pub min_chunksize: usize,
40
41 pub target_chunk_duration: Option<Duration>,
43
44 pub consider_distribution: bool,
46
47 pub optimize_for_parallel: bool,
49
50 pub numworkers: Option<usize>,
52
53 pub predictor: Option<SharedPredictor>,
55}
56
57impl std::fmt::Debug for AdaptiveChunkingParams {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("AdaptiveChunkingParams")
60 .field("target_memory_usage", &self.target_memory_usage)
61 .field("max_chunksize", &self.max_chunksize)
62 .field("min_chunksize", &self.min_chunksize)
63 .field("target_chunk_duration", &self.target_chunk_duration)
64 .field("consider_distribution", &self.consider_distribution)
65 .field("optimize_for_parallel", &self.optimize_for_parallel)
66 .field("numworkers", &self.numworkers)
67 .field(
68 "predictor",
69 &self.predictor.as_ref().map(|_| "Some(SharedPredictor)"),
70 )
71 .finish()
72 }
73}
74
75impl Default for AdaptiveChunkingParams {
76 fn default() -> Self {
77 let available_memory = Self::detect_available_memory();
79 let cpu_cores = std::thread::available_parallelism()
80 .map(|n| n.get())
81 .unwrap_or(1);
82
83 let target_memory = if let Some(mem) = available_memory {
85 (mem / 8).clamp(16 * 1024 * 1024, 256 * 1024 * 1024) } else {
87 64 * 1024 * 1024 };
89
90 Self {
91 target_memory_usage: target_memory,
92 max_chunksize: usize::MAX,
93 min_chunksize: 1024,
94 target_chunk_duration: Some(Duration::from_millis(100)), consider_distribution: true, optimize_for_parallel: cpu_cores > 1, numworkers: Some(cpu_cores),
98 predictor: None, }
100 }
101}
102
103impl AdaptiveChunkingParams {
104 fn detect_available_memory() -> Option<usize> {
106 use super::platform_memory::PlatformMemoryInfo;
107
108 PlatformMemoryInfo::detect().map(|info| info.available_memory)
109 }
110
111 pub fn for_workload(workload: WorkloadType) -> Self {
113 let mut params = Self::default();
114
115 match workload {
116 WorkloadType::MemoryIntensive => {
117 params.target_memory_usage /= 2; params.consider_distribution = false; }
120 WorkloadType::ComputeIntensive => {
121 params.target_chunk_duration = Some(Duration::from_millis(500)); params.optimize_for_parallel = true;
123 }
124 WorkloadType::IoIntensive => {
125 params.target_memory_usage *= 2; params.min_chunksize = 64 * 1024; }
128 WorkloadType::Balanced => {
129 }
131 }
132
133 params
134 }
135}
136
137#[derive(Debug, Clone)]
139pub struct AdaptiveChunkingResult {
140 pub strategy: ChunkingStrategy,
142
143 pub estimated_memory_per_chunk: usize,
145
146 pub decision_factors: Vec<String>,
148}
149
150pub trait AdaptiveChunking<A: Clone + Copy + 'static + Send + Sync> {
152 fn adaptive_chunking(
162 &self,
163 params: AdaptiveChunkingParams,
164 ) -> CoreResult<AdaptiveChunkingResult>;
165
166 fn process_chunks_adaptive<F, R>(
177 &self,
178 params: AdaptiveChunkingParams,
179 f: F,
180 ) -> CoreResult<Vec<R>>
181 where
182 F: Fn(&[A], usize) -> R;
183
184 fn process_chunks_mut_adaptive<F>(
191 &mut self,
192 params: AdaptiveChunkingParams,
193 f: F,
194 ) -> CoreResult<()>
195 where
196 F: Fn(&mut [A], usize);
197
198 #[cfg(feature = "parallel")]
209 fn process_chunks_parallel_adaptive<F, R>(
210 &self,
211 params: AdaptiveChunkingParams,
212 f: F,
213 ) -> CoreResult<Vec<R>>
214 where
215 F: Fn(&[A], usize) -> R + Send + Sync,
216 R: Send,
217 A: Send + Sync;
218}
219
220impl<A: Clone + Copy + 'static + Send + Sync> AdaptiveChunking<A> for MemoryMappedArray<A> {
221 fn adaptive_chunking(
222 &self,
223 params: AdaptiveChunkingParams,
224 ) -> CoreResult<AdaptiveChunkingResult> {
225 let total_elements = self.size;
227
228 let elementsize = std::mem::size_of::<A>();
230
231 if elementsize == 0 {
233 return Err(CoreError::InvalidArgument(
234 ErrorContext::new("Cannot chunk zero-sized type".to_string())
235 .with_location(ErrorLocation::new(file!(), line!())),
236 ));
237 }
238
239 let mut chunksize = params
242 .target_memory_usage
243 .checked_div(elementsize)
244 .ok_or_else(|| {
245 CoreError::ComputationError(
246 ErrorContext::new("Arithmetic overflow in chunk size calculation".to_string())
247 .with_location(ErrorLocation::new(file!(), line!())),
248 )
249 })?;
250
251 chunksize = chunksize.clamp(params.min_chunksize, params.max_chunksize);
253
254 chunksize = chunksize.min(total_elements);
256
257 let (chunksize, decision_factors) = self.optimize_for_dimensionality(chunksize, ¶ms)?;
259
260 let (chunksize, decision_factors) = if params.optimize_for_parallel {
262 let (parallel_chunksize, parallel_factors) =
263 self.optimize_for_parallel_processing(chunksize, decision_factors, ¶ms);
264 let (final_chunksize, mut final_factors) =
266 self.optimize_for_dimensionality(parallel_chunksize, ¶ms)?;
267 final_factors.extend(parallel_factors);
268 (final_chunksize, final_factors)
269 } else {
270 (chunksize, decision_factors)
271 };
272
273 let strategy = ChunkingStrategy::Fixed(chunksize);
275
276 let estimated_memory = chunksize.checked_mul(elementsize).ok_or_else(|| {
278 CoreError::ComputationError(
279 ErrorContext::new("Arithmetic overflow in memory estimation".to_string())
280 .with_location(ErrorLocation::new(file!(), line!())),
281 )
282 })?;
283
284 Ok(AdaptiveChunkingResult {
285 strategy,
286 estimated_memory_per_chunk: estimated_memory,
287 decision_factors,
288 })
289 }
290
291 fn process_chunks_adaptive<F, R>(
292 &self,
293 params: AdaptiveChunkingParams,
294 f: F,
295 ) -> CoreResult<Vec<R>>
296 where
297 F: Fn(&[A], usize) -> R,
298 {
299 let adaptive_result = self.adaptive_chunking(params)?;
301
302 Ok(self.process_chunks(adaptive_result.strategy, f))
304 }
305
306 fn process_chunks_mut_adaptive<F>(
307 &mut self,
308 params: AdaptiveChunkingParams,
309 f: F,
310 ) -> CoreResult<()>
311 where
312 F: Fn(&mut [A], usize),
313 {
314 let adaptive_result = self.adaptive_chunking(params)?;
316
317 self.process_chunks_mut(adaptive_result.strategy, f);
319 Ok(())
320 }
321
322 #[cfg(feature = "parallel")]
323 fn process_chunks_parallel_adaptive<F, R>(
324 &self,
325 params: AdaptiveChunkingParams,
326 f: F,
327 ) -> CoreResult<Vec<R>>
328 where
329 F: Fn(&[A], usize) -> R + Send + Sync,
330 R: Send,
331 A: Send + Sync,
332 {
333 let mut parallel_params = params;
335 parallel_params.optimize_for_parallel = true;
336
337 if parallel_params.numworkers.is_none() {
339 parallel_params.numworkers = Some(rayon::current_num_threads());
340 }
341
342 let adaptive_result = self.adaptive_chunking(parallel_params)?;
344
345 use super::memmap_chunks::MemoryMappedChunksParallel;
347 Ok(self.process_chunks_parallel(adaptive_result.strategy, f))
348 }
349}
350
351impl<A: Clone + Copy + 'static + Send + Sync> MemoryMappedArray<A> {
352 fn optimize_for_dimensionality(
354 &self,
355 initial_chunksize: usize,
356 params: &AdaptiveChunkingParams,
357 ) -> CoreResult<(usize, Vec<String>)> {
358 let mut decision_factors = Vec::new();
359 let mut chunksize = initial_chunksize;
360
361 match self.shape.len() {
362 1 => {
363 decision_factors.push("1D array: Using direct chunking".to_string());
365 }
366 2 => {
367 let row_length = self.shape[1];
369
370 if chunksize >= row_length {
371 if chunksize % row_length != 0 {
373 let newsize = (chunksize / row_length)
375 .checked_mul(row_length)
376 .unwrap_or(chunksize); if newsize >= params.min_chunksize {
378 chunksize = newsize;
379 decision_factors.push(format!(
380 "2D array: Adjusted chunk size to {chunksize} (multiple of row length {row_length})"
381 ));
382 }
383 }
384 } else {
385 if row_length <= params.max_chunksize {
388 chunksize = row_length;
389 decision_factors.push(format!(
390 "2D array: Adjusted chunk size to row length {row_length}"
391 ));
392 } else {
393 decision_factors.push(format!(
395 "2D array: Row length {row_length} exceeds max chunk size, keeping chunk size {chunksize}"
396 ));
397 }
398 }
399 }
400 3 => {
401 let planesize = self.shape[1].checked_mul(self.shape[2]).unwrap_or_else(|| {
403 decision_factors.push(
404 "3D array: Overflow in plane size calculation, using row alignment"
405 .to_string(),
406 );
407 self.shape[2] });
409 let row_length = self.shape[2];
410
411 if chunksize >= planesize && chunksize % planesize != 0 {
412 let newsize = (chunksize / planesize)
414 .checked_mul(planesize)
415 .unwrap_or(chunksize); if newsize >= params.min_chunksize {
417 chunksize = newsize;
418 decision_factors.push(format!(
419 "3D array: Adjusted chunk size to {chunksize} (multiple of plane size {planesize})"
420 ));
421 }
422 } else if chunksize >= row_length && chunksize % row_length != 0 {
423 let newsize = (chunksize / row_length)
425 .checked_mul(row_length)
426 .unwrap_or(chunksize); if newsize >= params.min_chunksize {
428 chunksize = newsize;
429 decision_factors.push(format!(
430 "3D array: Adjusted chunk size to {chunksize} (multiple of row length {row_length})"
431 ));
432 }
433 }
434 }
435 n => {
436 decision_factors.push(format!("{n}D array: Using default chunking strategy"));
437 }
438 }
439
440 Ok((chunksize, decision_factors))
441 }
442
443 fn optimize_for_parallel_processing(
445 &self,
446 initial_chunksize: usize,
447 mut decision_factors: Vec<String>,
448 params: &AdaptiveChunkingParams,
449 ) -> (usize, Vec<String>) {
450 let mut chunksize = initial_chunksize;
451
452 if let Some(numworkers) = params.numworkers {
453 let total_elements = self.size;
454
455 let target_num_chunks = numworkers.checked_mul(2).unwrap_or(numworkers);
458 let ideal_chunksize = if target_num_chunks > 0 {
459 total_elements / target_num_chunks
460 } else {
461 total_elements };
463
464 if ideal_chunksize >= params.min_chunksize && ideal_chunksize <= params.max_chunksize {
465 chunksize = ideal_chunksize;
467 decision_factors.push(format!(
468 "Parallel optimization: Adjusted chunk size to {chunksize} for {numworkers} workers"
469 ));
470 } else if ideal_chunksize < params.min_chunksize {
471 chunksize = params.min_chunksize;
473 let actual_chunks = total_elements / chunksize
474 + if total_elements % chunksize != 0 {
475 1
476 } else {
477 0
478 };
479 decision_factors.push(format!(
480 "Parallel optimization: Using minimum chunk size {chunksize}, resulting in {actual_chunks} chunks for {numworkers} workers"
481 ));
482 }
483 } else {
484 decision_factors.push(
485 "Parallel optimization requested but no worker count specified, using default chunking".to_string()
486 );
487 }
488
489 (chunksize, decision_factors)
490 }
491}
492
493#[derive(Debug, Clone)]
495pub struct AdaptiveChunkingBuilder {
496 params: AdaptiveChunkingParams,
497}
498
499impl AdaptiveChunkingBuilder {
500 pub fn new() -> Self {
502 Self {
503 params: AdaptiveChunkingParams::default(),
504 }
505 }
506
507 pub const fn with_target_memory(mut self, bytes: usize) -> Self {
509 self.params.target_memory_usage = bytes;
510 self
511 }
512
513 pub const fn with_max_chunksize(mut self, size: usize) -> Self {
515 self.params.max_chunksize = size;
516 self
517 }
518
519 pub const fn with_min_chunksize(mut self, size: usize) -> Self {
521 self.params.min_chunksize = size;
522 self
523 }
524
525 pub fn with_target_duration(mut self, duration: Duration) -> Self {
527 self.params.target_chunk_duration = Some(duration);
528 self
529 }
530
531 pub const fn consider_distribution(mut self, enable: bool) -> Self {
533 self.params.consider_distribution = enable;
534 self
535 }
536
537 pub const fn optimize_for_parallel(mut self, enable: bool) -> Self {
539 self.params.optimize_for_parallel = enable;
540 self
541 }
542
543 pub fn with_numworkers(mut self, workers: usize) -> Self {
545 self.params.numworkers = Some(workers);
546 self
547 }
548
549 pub fn build(self) -> AdaptiveChunkingParams {
551 self.params
552 }
553}
554
555impl Default for AdaptiveChunkingBuilder {
556 fn default() -> Self {
557 Self::new()
558 }
559}
560
561pub mod beta2_enhancements {
563 use super::*;
564 use std::sync::atomic::AtomicUsize;
565 use std::sync::Arc;
566
567 #[derive(Debug, Clone, Default)]
569 #[allow(dead_code)]
570 pub struct ChunkingPerformanceMetrics {
571 pub chunk_processing_times: Vec<Duration>,
572 pub memory_usage_per_chunk: Vec<usize>,
573 pub throughput_mbps: Vec<f64>,
574 pub cpu_utilization: Vec<f64>,
575 }
576
577 #[allow(dead_code)]
579 pub struct DynamicLoadBalancer {
580 worker_performance: Vec<f64>, current_loads: Arc<Vec<AtomicUsize>>, target_efficiency: f64, }
584
585 #[allow(dead_code)]
586 impl DynamicLoadBalancer {
587 pub fn new(numworkers: usize) -> Self {
589 Self {
590 worker_performance: vec![1.0; numworkers], current_loads: Arc::new((0..numworkers).map(|_| AtomicUsize::new(0)).collect()),
592 target_efficiency: 0.85, }
594 }
595
596 pub fn distribute_work(&self, totalwork: usize) -> Vec<usize> {
598 let total_performance: f64 = self.worker_performance.iter().sum();
599 let mut distribution = Vec::new();
600 let mut remaining_work = totalwork;
601
602 for (i, &performance) in self.worker_performance.iter().enumerate() {
604 if i == self.worker_performance.len() - 1 {
605 distribution.push(remaining_work);
607 } else {
608 let work_share = (totalwork as f64 * performance / total_performance) as usize;
609 distribution.push(work_share);
610 remaining_work = remaining_work.saturating_sub(work_share);
611 }
612 }
613
614 distribution
615 }
616
617 pub fn update_performance(
619 &mut self,
620 workerid: usize,
621 work_amount: usize,
622 execution_time: Duration,
623 ) {
624 if workerid < self.worker_performance.len() {
625 let performance = work_amount as f64 / execution_time.as_secs_f64();
627
628 let alpha = 0.1; self.worker_performance[workerid] =
631 (1.0 - alpha) * self.worker_performance[workerid] + alpha * performance;
632 }
633 }
634 }
635
636 #[allow(dead_code)]
638 pub struct ChunkSizePredictor {
639 historical_metrics: Vec<ChunkingPerformanceMetrics>,
640 workload_characteristics: Vec<(WorkloadType, usize)>, }
642
643 #[allow(dead_code)]
644 impl ChunkSizePredictor {
645 pub fn new() -> Self {
646 Self {
647 historical_metrics: Vec::new(),
648 workload_characteristics: Vec::new(),
649 }
650 }
651
652 pub fn predict_chunk_size(
654 &self,
655 workload: WorkloadType,
656 memory_available: usize,
657 data_size: usize,
658 ) -> usize {
659 let historical_prediction = self.get_historical_prediction(workload);
661
662 let memory_constrained = (memory_available / 4).max(1024); let data_constrained = (data_size / 8).max(1024); let base_prediction = historical_prediction.unwrap_or(64 * 1024); let memory_weight = 0.4;
671 let data_weight = 0.4;
672 let historical_weight = 0.2;
673
674 let predicted_size = (memory_weight * memory_constrained as f64
675 + data_weight * data_constrained as f64
676 + historical_weight * base_prediction as f64)
677 as usize;
678
679 predicted_size.clamp(1024, 256 * 1024 * 1024) }
682
683 fn get_historical_prediction(&self, workload: WorkloadType) -> Option<usize> {
684 self.workload_characteristics
686 .iter()
687 .rev() .find(|(wl, _)| *wl == workload)
689 .map(|(_, size)| *size)
690 }
691
692 pub fn record_performance(
694 &mut self,
695 workload: WorkloadType,
696 chunk_size: usize,
697 metrics: ChunkingPerformanceMetrics,
698 ) {
699 self.historical_metrics.push(metrics);
700 self.workload_characteristics.push((workload, chunk_size));
701
702 if self.historical_metrics.len() > 100 {
704 self.historical_metrics.remove(0);
705 self.workload_characteristics.remove(0);
706 }
707 }
708 }
709
710 #[allow(dead_code)]
712 pub fn numa_aware_chunking(data_size: usize, num_numanodes: usize) -> ChunkingStrategy {
713 if num_numanodes <= 1 {
714 return ChunkingStrategy::Auto;
715 }
716
717 let base_chunk_size = data_size / (num_numanodes * 2); let aligned_chunk_size = align_to_cache_line(base_chunk_size);
720
721 ChunkingStrategy::Fixed(aligned_chunk_size)
722 }
723
724 fn align_to_cache_line(size: usize) -> usize {
726 const CACHE_LINE_SIZE: usize = 64; size.div_ceil(CACHE_LINE_SIZE) * CACHE_LINE_SIZE
728 }
729}
730
731#[cfg(test)]
732mod tests {
733 use super::*;
734 use ::ndarray::Array2;
735 use std::fs::File;
736 use std::io::Write;
737 use tempfile::tempdir;
738
739 #[test]
740 fn test_adaptive_chunking_1d() {
741 let dir = tempdir().expect("Operation failed");
743 let file_path = dir.path().join("test_adaptive_1d.bin");
744
745 let data: Vec<f64> = (0..100_000).map(|i| i as f64).collect();
747 let mut file = File::create(&file_path).expect("Operation failed");
748 for val in &data {
749 file.write_all(&val.to_ne_bytes())
750 .expect("Operation failed");
751 }
752 drop(file);
753
754 let mmap =
756 MemoryMappedArray::<f64>::path(&file_path, &[100_000]).expect("Operation failed");
757
758 let params = AdaptiveChunkingBuilder::new()
760 .with_target_memory(1024 * 1024) .with_min_chunksize(1000)
762 .with_max_chunksize(50000)
763 .optimize_for_parallel(false) .build();
765
766 let result = mmap.adaptive_chunking(params).expect("Operation failed");
768
769 match result.strategy {
771 ChunkingStrategy::Fixed(chunksize) => {
772 assert_eq!(chunksize, 50000);
775 }
776 _ => panic!("Expected fixed chunking strategy"),
777 }
778
779 assert!(result.estimated_memory_per_chunk > 0);
781
782 assert!(result
784 .decision_factors
785 .iter()
786 .any(|s| s.contains("1D array")));
787 }
788
789 #[test]
790 fn test_adaptive_chunking_2d() {
791 let dir = tempdir().expect("Operation failed");
793 let file_path = dir.path().join("test_adaptive_2d.bin");
794
795 let rows = 1000;
797 let cols = 120;
798
799 let data = Array2::<f64>::from_shape_fn((rows, cols), |(i, j)| (i * cols + j) as f64);
801 let mut file = File::create(&file_path).expect("Operation failed");
802 for val in data.iter() {
803 file.write_all(&val.to_ne_bytes())
804 .expect("Operation failed");
805 }
806 drop(file);
807
808 let mmap =
810 MemoryMappedArray::<f64>::path(&file_path, &[rows, cols]).expect("Operation failed");
811
812 let params = AdaptiveChunkingBuilder::new()
814 .with_target_memory(100 * 1024) .with_min_chunksize(1000)
816 .with_max_chunksize(50000)
817 .build();
818
819 let result = mmap.adaptive_chunking(params).expect("Operation failed");
821
822 match result.strategy {
824 ChunkingStrategy::Fixed(chunksize) => {
825 assert_eq!(
827 chunksize % cols,
828 0,
829 "Chunk size should be a multiple of row length"
830 );
831 }
832 _ => panic!("Expected fixed chunking strategy"),
833 }
834
835 assert!(result
837 .decision_factors
838 .iter()
839 .any(|s| s.contains("2D array")));
840 }
841
842 #[test]
843 fn test_adaptive_chunking_parallel() {
844 let dir = tempdir().expect("Operation failed");
846 let file_path = dir.path().join("test_adaptive_parallel.bin");
847
848 let data: Vec<f64> = (0..1_000_000).map(|i| i as f64).collect();
850 let mut file = File::create(&file_path).expect("Operation failed");
851 for val in &data {
852 file.write_all(&val.to_ne_bytes())
853 .expect("Operation failed");
854 }
855 drop(file);
856
857 let mmap =
859 MemoryMappedArray::<f64>::path(&file_path, &[1_000_000]).expect("Operation failed");
860
861 let params = AdaptiveChunkingBuilder::new()
863 .with_target_memory(10 * 1024 * 1024) .optimize_for_parallel(true)
865 .with_numworkers(4)
866 .build();
867
868 let result = mmap.adaptive_chunking(params).expect("Operation failed");
870
871 match result.strategy {
873 ChunkingStrategy::Fixed(chunksize) => {
874 assert!(chunksize > 0, "Chunk size should be positive");
877 }
878 _ => panic!("Expected fixed chunking strategy"),
879 }
880
881 assert!(result
883 .decision_factors
884 .iter()
885 .any(|s| s.contains("Parallel optimization")));
886 }
887}