1use super::chunked::ChunkingStrategy;
9use super::memmap::MemoryMappedArray;
10use super::memmap_chunks::MemoryMappedChunks;
11use crate::error::{CoreError, CoreResult, ErrorContext, ErrorLocation};
12use std::time::Duration;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum WorkloadType {
18 MemoryIntensive,
20 ComputeIntensive,
22 IoIntensive,
24 Balanced,
26}
27
28#[derive(Debug, Clone)]
30pub struct AdaptiveChunkingParams {
31 pub target_memory_usage: usize,
33
34 pub max_chunksize: usize,
36
37 pub min_chunksize: usize,
39
40 pub target_chunk_duration: Option<Duration>,
42
43 pub consider_distribution: bool,
45
46 pub optimize_for_parallel: bool,
48
49 pub numworkers: Option<usize>,
51}
52
53impl Default for AdaptiveChunkingParams {
54 fn default() -> Self {
55 let available_memory = Self::detect_available_memory();
57 let cpu_cores = std::thread::available_parallelism()
58 .map(|n| n.get())
59 .unwrap_or(1);
60
61 let target_memory = if let Some(mem) = available_memory {
63 (mem / 8).clamp(16 * 1024 * 1024, 256 * 1024 * 1024) } else {
65 64 * 1024 * 1024 };
67
68 Self {
69 target_memory_usage: target_memory,
70 max_chunksize: usize::MAX,
71 min_chunksize: 1024,
72 target_chunk_duration: Some(Duration::from_millis(100)), consider_distribution: true, optimize_for_parallel: cpu_cores > 1, numworkers: Some(cpu_cores),
76 }
77 }
78}
79
80impl AdaptiveChunkingParams {
81 fn detect_available_memory() -> Option<usize> {
83 #[cfg(unix)]
85 {
86 if let Ok(output) = std::process::Command::new("sh")
87 .args([
88 "-c",
89 "cat /proc/meminfo | grep MemAvailable | awk '{print $2}'",
90 ])
91 .output()
92 {
93 if let Ok(mem_str) = String::from_utf8(output.stdout) {
94 if let Ok(mem_kb) = mem_str.trim().parse::<usize>() {
95 return Some(mem_kb * 1024); }
97 }
98 }
99 }
100 None
101 }
102
103 pub fn for_workload(workload: WorkloadType) -> Self {
105 let mut params = Self::default();
106
107 match workload {
108 WorkloadType::MemoryIntensive => {
109 params.target_memory_usage /= 2; params.consider_distribution = false; }
112 WorkloadType::ComputeIntensive => {
113 params.target_chunk_duration = Some(Duration::from_millis(500)); params.optimize_for_parallel = true;
115 }
116 WorkloadType::IoIntensive => {
117 params.target_memory_usage *= 2; params.min_chunksize = 64 * 1024; }
120 WorkloadType::Balanced => {
121 }
123 }
124
125 params
126 }
127}
128
129#[derive(Debug, Clone)]
131pub struct AdaptiveChunkingResult {
132 pub strategy: ChunkingStrategy,
134
135 pub estimated_memory_per_chunk: usize,
137
138 pub decision_factors: Vec<String>,
140}
141
142pub trait AdaptiveChunking<A: Clone + Copy + 'static + Send + Sync> {
144 fn adaptive_chunking(
154 &self,
155 params: AdaptiveChunkingParams,
156 ) -> CoreResult<AdaptiveChunkingResult>;
157
158 fn process_chunks_adaptive<F, R>(
169 &self,
170 params: AdaptiveChunkingParams,
171 f: F,
172 ) -> CoreResult<Vec<R>>
173 where
174 F: Fn(&[A], usize) -> R;
175
176 fn process_chunks_mut_adaptive<F>(
183 &mut self,
184 params: AdaptiveChunkingParams,
185 f: F,
186 ) -> CoreResult<()>
187 where
188 F: Fn(&mut [A], usize);
189
190 #[cfg(feature = "parallel")]
201 fn process_chunks_parallel_adaptive<F, R>(
202 &self,
203 params: AdaptiveChunkingParams,
204 f: F,
205 ) -> CoreResult<Vec<R>>
206 where
207 F: Fn(&[A], usize) -> R + Send + Sync,
208 R: Send,
209 A: Send + Sync;
210}
211
212impl<A: Clone + Copy + 'static + Send + Sync> AdaptiveChunking<A> for MemoryMappedArray<A> {
213 fn adaptive_chunking(
214 &self,
215 params: AdaptiveChunkingParams,
216 ) -> CoreResult<AdaptiveChunkingResult> {
217 let total_elements = self.size;
219
220 let elementsize = std::mem::size_of::<A>();
222
223 if elementsize == 0 {
225 return Err(CoreError::InvalidArgument(
226 ErrorContext::new("Cannot chunk zero-sized type".to_string())
227 .with_location(ErrorLocation::new(file!(), line!())),
228 ));
229 }
230
231 let mut chunksize = params
234 .target_memory_usage
235 .checked_div(elementsize)
236 .ok_or_else(|| {
237 CoreError::ComputationError(
238 ErrorContext::new("Arithmetic overflow in chunk size calculation".to_string())
239 .with_location(ErrorLocation::new(file!(), line!())),
240 )
241 })?;
242
243 chunksize = chunksize.clamp(params.min_chunksize, params.max_chunksize);
245
246 chunksize = chunksize.min(total_elements);
248
249 let (chunksize, decision_factors) = self.optimize_for_dimensionality(chunksize, ¶ms)?;
251
252 let (chunksize, decision_factors) = if params.optimize_for_parallel {
254 let (parallel_chunksize, parallel_factors) =
255 self.optimize_for_parallel_processing(chunksize, decision_factors, ¶ms);
256 let (final_chunksize, mut final_factors) =
258 self.optimize_for_dimensionality(parallel_chunksize, ¶ms)?;
259 final_factors.extend(parallel_factors);
260 (final_chunksize, final_factors)
261 } else {
262 (chunksize, decision_factors)
263 };
264
265 let strategy = ChunkingStrategy::Fixed(chunksize);
267
268 let estimated_memory = chunksize.checked_mul(elementsize).ok_or_else(|| {
270 CoreError::ComputationError(
271 ErrorContext::new("Arithmetic overflow in memory estimation".to_string())
272 .with_location(ErrorLocation::new(file!(), line!())),
273 )
274 })?;
275
276 Ok(AdaptiveChunkingResult {
277 strategy,
278 estimated_memory_per_chunk: estimated_memory,
279 decision_factors,
280 })
281 }
282
283 fn process_chunks_adaptive<F, R>(
284 &self,
285 params: AdaptiveChunkingParams,
286 f: F,
287 ) -> CoreResult<Vec<R>>
288 where
289 F: Fn(&[A], usize) -> R,
290 {
291 let adaptive_result = self.adaptive_chunking(params)?;
293
294 Ok(self.process_chunks(adaptive_result.strategy, f))
296 }
297
298 fn process_chunks_mut_adaptive<F>(
299 &mut self,
300 params: AdaptiveChunkingParams,
301 f: F,
302 ) -> CoreResult<()>
303 where
304 F: Fn(&mut [A], usize),
305 {
306 let adaptive_result = self.adaptive_chunking(params)?;
308
309 self.process_chunks_mut(adaptive_result.strategy, f);
311 Ok(())
312 }
313
314 #[cfg(feature = "parallel")]
315 fn process_chunks_parallel_adaptive<F, R>(
316 &self,
317 params: AdaptiveChunkingParams,
318 f: F,
319 ) -> CoreResult<Vec<R>>
320 where
321 F: Fn(&[A], usize) -> R + Send + Sync,
322 R: Send,
323 A: Send + Sync,
324 {
325 let mut parallel_params = params;
327 parallel_params.optimize_for_parallel = true;
328
329 if parallel_params.numworkers.is_none() {
331 parallel_params.numworkers = Some(rayon::current_num_threads());
332 }
333
334 let adaptive_result = self.adaptive_chunking(parallel_params)?;
336
337 use super::memmap_chunks::MemoryMappedChunksParallel;
339 Ok(self.process_chunks_parallel(adaptive_result.strategy, f))
340 }
341}
342
343impl<A: Clone + Copy + 'static + Send + Sync> MemoryMappedArray<A> {
344 fn optimize_for_dimensionality(
346 &self,
347 initial_chunksize: usize,
348 params: &AdaptiveChunkingParams,
349 ) -> CoreResult<(usize, Vec<String>)> {
350 let mut decision_factors = Vec::new();
351 let mut chunksize = initial_chunksize;
352
353 match self.shape.len() {
354 1 => {
355 decision_factors.push("1D array: Using direct chunking".to_string());
357 }
358 2 => {
359 let row_length = self.shape[1];
361
362 if chunksize >= row_length {
363 if chunksize % row_length != 0 {
365 let newsize = (chunksize / row_length)
367 .checked_mul(row_length)
368 .unwrap_or(chunksize); if newsize >= params.min_chunksize {
370 chunksize = newsize;
371 decision_factors.push(format!(
372 "2D array: Adjusted chunk size to {chunksize} (multiple of row length {row_length})"
373 ));
374 }
375 }
376 } else {
377 if row_length <= params.max_chunksize {
380 chunksize = row_length;
381 decision_factors.push(format!(
382 "2D array: Adjusted chunk size to row length {row_length}"
383 ));
384 } else {
385 decision_factors.push(format!(
387 "2D array: Row length {row_length} exceeds max chunk size, keeping chunk size {chunksize}"
388 ));
389 }
390 }
391 }
392 3 => {
393 let planesize = self.shape[1].checked_mul(self.shape[2]).unwrap_or_else(|| {
395 decision_factors.push(
396 "3D array: Overflow in plane size calculation, using row alignment"
397 .to_string(),
398 );
399 self.shape[2] });
401 let row_length = self.shape[2];
402
403 if chunksize >= planesize && chunksize % planesize != 0 {
404 let newsize = (chunksize / planesize)
406 .checked_mul(planesize)
407 .unwrap_or(chunksize); if newsize >= params.min_chunksize {
409 chunksize = newsize;
410 decision_factors.push(format!(
411 "3D array: Adjusted chunk size to {chunksize} (multiple of plane size {planesize})"
412 ));
413 }
414 } else if chunksize >= row_length && chunksize % row_length != 0 {
415 let newsize = (chunksize / row_length)
417 .checked_mul(row_length)
418 .unwrap_or(chunksize); if newsize >= params.min_chunksize {
420 chunksize = newsize;
421 decision_factors.push(format!(
422 "3D array: Adjusted chunk size to {chunksize} (multiple of row length {row_length})"
423 ));
424 }
425 }
426 }
427 n => {
428 decision_factors.push(format!("{n}D array: Using default chunking strategy"));
429 }
430 }
431
432 Ok((chunksize, decision_factors))
433 }
434
435 fn optimize_for_parallel_processing(
437 &self,
438 initial_chunksize: usize,
439 mut decision_factors: Vec<String>,
440 params: &AdaptiveChunkingParams,
441 ) -> (usize, Vec<String>) {
442 let mut chunksize = initial_chunksize;
443
444 if let Some(numworkers) = params.numworkers {
445 let total_elements = self.size;
446
447 let target_num_chunks = numworkers.checked_mul(2).unwrap_or(numworkers);
450 let ideal_chunksize = if target_num_chunks > 0 {
451 total_elements / target_num_chunks
452 } else {
453 total_elements };
455
456 if ideal_chunksize >= params.min_chunksize && ideal_chunksize <= params.max_chunksize {
457 chunksize = ideal_chunksize;
459 decision_factors.push(format!(
460 "Parallel optimization: Adjusted chunk size to {chunksize} for {numworkers} workers"
461 ));
462 } else if ideal_chunksize < params.min_chunksize {
463 chunksize = params.min_chunksize;
465 let actual_chunks = total_elements / chunksize
466 + if total_elements % chunksize != 0 {
467 1
468 } else {
469 0
470 };
471 decision_factors.push(format!(
472 "Parallel optimization: Using minimum chunk size {chunksize}, resulting in {actual_chunks} chunks for {numworkers} workers"
473 ));
474 }
475 } else {
476 decision_factors.push(
477 "Parallel optimization requested but no worker count specified, using default chunking".to_string()
478 );
479 }
480
481 (chunksize, decision_factors)
482 }
483}
484
485#[derive(Debug, Clone)]
487pub struct AdaptiveChunkingBuilder {
488 params: AdaptiveChunkingParams,
489}
490
491impl AdaptiveChunkingBuilder {
492 pub fn new() -> Self {
494 Self {
495 params: AdaptiveChunkingParams::default(),
496 }
497 }
498
499 pub const fn with_target_memory(mut self, bytes: usize) -> Self {
501 self.params.target_memory_usage = bytes;
502 self
503 }
504
505 pub const fn with_max_chunksize(mut self, size: usize) -> Self {
507 self.params.max_chunksize = size;
508 self
509 }
510
511 pub const fn with_min_chunksize(mut self, size: usize) -> Self {
513 self.params.min_chunksize = size;
514 self
515 }
516
517 pub fn with_target_duration(mut self, duration: Duration) -> Self {
519 self.params.target_chunk_duration = Some(duration);
520 self
521 }
522
523 pub const fn consider_distribution(mut self, enable: bool) -> Self {
525 self.params.consider_distribution = enable;
526 self
527 }
528
529 pub const fn optimize_for_parallel(mut self, enable: bool) -> Self {
531 self.params.optimize_for_parallel = enable;
532 self
533 }
534
535 pub fn with_numworkers(mut self, workers: usize) -> Self {
537 self.params.numworkers = Some(workers);
538 self
539 }
540
541 pub fn build(self) -> AdaptiveChunkingParams {
543 self.params
544 }
545}
546
547impl Default for AdaptiveChunkingBuilder {
548 fn default() -> Self {
549 Self::new()
550 }
551}
552
553pub mod beta2_enhancements {
555 use super::*;
556 use std::sync::atomic::AtomicUsize;
557 use std::sync::Arc;
558
559 #[derive(Debug, Clone, Default)]
561 #[allow(dead_code)]
562 pub struct ChunkingPerformanceMetrics {
563 pub chunk_processing_times: Vec<Duration>,
564 pub memory_usage_per_chunk: Vec<usize>,
565 pub throughput_mbps: Vec<f64>,
566 pub cpu_utilization: Vec<f64>,
567 }
568
569 #[allow(dead_code)]
571 pub struct DynamicLoadBalancer {
572 worker_performance: Vec<f64>, current_loads: Arc<Vec<AtomicUsize>>, target_efficiency: f64, }
576
577 #[allow(dead_code)]
578 impl DynamicLoadBalancer {
579 pub fn new(numworkers: usize) -> Self {
581 Self {
582 worker_performance: vec![1.0; numworkers], current_loads: Arc::new((0..numworkers).map(|_| AtomicUsize::new(0)).collect()),
584 target_efficiency: 0.85, }
586 }
587
588 pub fn distribute_work(&self, totalwork: usize) -> Vec<usize> {
590 let total_performance: f64 = self.worker_performance.iter().sum();
591 let mut distribution = Vec::new();
592 let mut remaining_work = totalwork;
593
594 for (i, &performance) in self.worker_performance.iter().enumerate() {
596 if i == self.worker_performance.len() - 1 {
597 distribution.push(remaining_work);
599 } else {
600 let work_share = (totalwork as f64 * performance / total_performance) as usize;
601 distribution.push(work_share);
602 remaining_work = remaining_work.saturating_sub(work_share);
603 }
604 }
605
606 distribution
607 }
608
609 pub fn update_performance(
611 &mut self,
612 workerid: usize,
613 work_amount: usize,
614 execution_time: Duration,
615 ) {
616 if workerid < self.worker_performance.len() {
617 let performance = work_amount as f64 / execution_time.as_secs_f64();
619
620 let alpha = 0.1; self.worker_performance[workerid] =
623 (1.0 - alpha) * self.worker_performance[workerid] + alpha * performance;
624 }
625 }
626 }
627
628 #[allow(dead_code)]
630 pub struct ChunkSizePredictor {
631 historical_metrics: Vec<ChunkingPerformanceMetrics>,
632 workload_characteristics: Vec<(WorkloadType, usize)>, }
634
635 #[allow(dead_code)]
636 impl ChunkSizePredictor {
637 pub fn new() -> Self {
638 Self {
639 historical_metrics: Vec::new(),
640 workload_characteristics: Vec::new(),
641 }
642 }
643
644 pub fn predict_chunk_size(
646 &self,
647 workload: WorkloadType,
648 memory_available: usize,
649 data_size: usize,
650 ) -> usize {
651 let historical_prediction = self.get_historical_prediction(workload);
653
654 let memory_constrained = (memory_available / 4).max(1024); let data_constrained = (data_size / 8).max(1024); let base_prediction = historical_prediction.unwrap_or(64 * 1024); let memory_weight = 0.4;
663 let data_weight = 0.4;
664 let historical_weight = 0.2;
665
666 let predicted_size = (memory_weight * memory_constrained as f64
667 + data_weight * data_constrained as f64
668 + historical_weight * base_prediction as f64)
669 as usize;
670
671 predicted_size.clamp(1024, 256 * 1024 * 1024) }
674
675 fn get_historical_prediction(&self, workload: WorkloadType) -> Option<usize> {
676 self.workload_characteristics
678 .iter()
679 .rev() .find(|(wl, _)| *wl == workload)
681 .map(|(_, size)| *size)
682 }
683
684 pub fn record_performance(
686 &mut self,
687 workload: WorkloadType,
688 chunk_size: usize,
689 metrics: ChunkingPerformanceMetrics,
690 ) {
691 self.historical_metrics.push(metrics);
692 self.workload_characteristics.push((workload, chunk_size));
693
694 if self.historical_metrics.len() > 100 {
696 self.historical_metrics.remove(0);
697 self.workload_characteristics.remove(0);
698 }
699 }
700 }
701
702 #[allow(dead_code)]
704 pub fn numa_aware_chunking(data_size: usize, num_numanodes: usize) -> ChunkingStrategy {
705 if num_numanodes <= 1 {
706 return ChunkingStrategy::Auto;
707 }
708
709 let base_chunk_size = data_size / (num_numanodes * 2); let aligned_chunk_size = align_to_cache_line(base_chunk_size);
712
713 ChunkingStrategy::Fixed(aligned_chunk_size)
714 }
715
716 fn align_to_cache_line(size: usize) -> usize {
718 const CACHE_LINE_SIZE: usize = 64; size.div_ceil(CACHE_LINE_SIZE) * CACHE_LINE_SIZE
720 }
721}
722
723#[cfg(test)]
724mod tests {
725 use super::*;
726 use ::ndarray::Array2;
727 use std::fs::File;
728 use std::io::Write;
729 use tempfile::tempdir;
730
731 #[test]
732 fn test_adaptive_chunking_1d() {
733 let dir = tempdir().expect("Operation failed");
735 let file_path = dir.path().join("test_adaptive_1d.bin");
736
737 let data: Vec<f64> = (0..100_000).map(|i| i as f64).collect();
739 let mut file = File::create(&file_path).expect("Operation failed");
740 for val in &data {
741 file.write_all(&val.to_ne_bytes())
742 .expect("Operation failed");
743 }
744 drop(file);
745
746 let mmap =
748 MemoryMappedArray::<f64>::path(&file_path, &[100_000]).expect("Operation failed");
749
750 let params = AdaptiveChunkingBuilder::new()
752 .with_target_memory(1024 * 1024) .with_min_chunksize(1000)
754 .with_max_chunksize(50000)
755 .optimize_for_parallel(false) .build();
757
758 let result = mmap.adaptive_chunking(params).expect("Operation failed");
760
761 match result.strategy {
763 ChunkingStrategy::Fixed(chunksize) => {
764 assert_eq!(chunksize, 50000);
767 }
768 _ => panic!("Expected fixed chunking strategy"),
769 }
770
771 assert!(result.estimated_memory_per_chunk > 0);
773
774 assert!(result
776 .decision_factors
777 .iter()
778 .any(|s| s.contains("1D array")));
779 }
780
781 #[test]
782 fn test_adaptive_chunking_2d() {
783 let dir = tempdir().expect("Operation failed");
785 let file_path = dir.path().join("test_adaptive_2d.bin");
786
787 let rows = 1000;
789 let cols = 120;
790
791 let data = Array2::<f64>::from_shape_fn((rows, cols), |(i, j)| (i * cols + j) as f64);
793 let mut file = File::create(&file_path).expect("Operation failed");
794 for val in data.iter() {
795 file.write_all(&val.to_ne_bytes())
796 .expect("Operation failed");
797 }
798 drop(file);
799
800 let mmap =
802 MemoryMappedArray::<f64>::path(&file_path, &[rows, cols]).expect("Operation failed");
803
804 let params = AdaptiveChunkingBuilder::new()
806 .with_target_memory(100 * 1024) .with_min_chunksize(1000)
808 .with_max_chunksize(50000)
809 .build();
810
811 let result = mmap.adaptive_chunking(params).expect("Operation failed");
813
814 match result.strategy {
816 ChunkingStrategy::Fixed(chunksize) => {
817 assert_eq!(
819 chunksize % cols,
820 0,
821 "Chunk size should be a multiple of row length"
822 );
823 }
824 _ => panic!("Expected fixed chunking strategy"),
825 }
826
827 assert!(result
829 .decision_factors
830 .iter()
831 .any(|s| s.contains("2D array")));
832 }
833
834 #[test]
835 fn test_adaptive_chunking_parallel() {
836 let dir = tempdir().expect("Operation failed");
838 let file_path = dir.path().join("test_adaptive_parallel.bin");
839
840 let data: Vec<f64> = (0..1_000_000).map(|i| i as f64).collect();
842 let mut file = File::create(&file_path).expect("Operation failed");
843 for val in &data {
844 file.write_all(&val.to_ne_bytes())
845 .expect("Operation failed");
846 }
847 drop(file);
848
849 let mmap =
851 MemoryMappedArray::<f64>::path(&file_path, &[1_000_000]).expect("Operation failed");
852
853 let params = AdaptiveChunkingBuilder::new()
855 .with_target_memory(10 * 1024 * 1024) .optimize_for_parallel(true)
857 .with_numworkers(4)
858 .build();
859
860 let result = mmap.adaptive_chunking(params).expect("Operation failed");
862
863 match result.strategy {
865 ChunkingStrategy::Fixed(chunksize) => {
866 assert!(chunksize > 0, "Chunk size should be positive");
869 }
870 _ => panic!("Expected fixed chunking strategy"),
871 }
872
873 assert!(result
875 .decision_factors
876 .iter()
877 .any(|s| s.contains("Parallel optimization")));
878 }
879}