1use std::collections::HashMap;
8use std::num::NonZeroUsize;
9use std::time::{Duration, Instant};
10
11#[cfg(feature = "parallel")]
12use rayon::prelude::*;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum ChunkStrategy {
17 Fixed(usize),
19 Adaptive,
21 CacheOptimized,
23 MemoryOptimized,
25 Dynamic,
27 WorkStealingBalanced,
29 NumaAware,
31 LinearAlgebra,
33 SparseMatrix,
35 SignalProcessing,
37 ImageProcessing,
39 MonteCarlo,
41 IterativeSolver,
43 GpuAware,
45 Custom(fn(usize, usize) -> usize),
47}
48
49#[derive(Debug, Clone)]
51pub struct ChunkConfig {
52 pub strategy: ChunkStrategy,
54 pub min_chunk_size: usize,
56 pub max_chunk_size: usize,
58 pub prefer_work_stealing: bool,
60 pub memory_pattern: MemoryPattern,
62 pub compute_intensity: ComputeIntensity,
64 pub enable_monitoring: bool,
66 pub load_balance_factor: f64,
68 pub cache_awareness: CacheAwareness,
70 pub numa_strategy: NumaStrategy,
72 pub gpu_settings: Option<GpuChunkSettings>,
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum MemoryPattern {
79 Sequential,
81 Random,
83 Strided(usize),
85 BlockWise,
87 Sparse,
89 CacheFriendly,
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub enum ComputeIntensity {
96 MemoryBound,
98 Balanced,
100 ComputeIntensive,
102 CpuBound,
104}
105
106#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub enum CacheAwareness {
109 None,
111 L1,
113 L2,
115 L3,
117 Full,
119}
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123pub enum NumaStrategy {
124 Ignore,
126 LocalPreferred,
128 StrictLocal,
130 Interleave,
132 Custom,
134}
135
136#[derive(Debug, Clone)]
138pub struct GpuChunkSettings {
139 pub gpu_memory_ratio: f64,
141 pub gpu_min_chunk: usize,
143 pub overlap_compute: bool,
145 pub gpu_bandwidth: Option<u64>,
147 pub transfer_bandwidth: Option<u64>,
149}
150
151impl Default for ChunkConfig {
152 fn default() -> Self {
153 Self {
154 strategy: ChunkStrategy::Adaptive,
155 min_chunk_size: 64,
156 max_chunk_size: 8192,
157 prefer_work_stealing: true,
158 memory_pattern: MemoryPattern::Sequential,
159 compute_intensity: ComputeIntensity::Balanced,
160 enable_monitoring: false,
161 load_balance_factor: 0.1,
162 cache_awareness: CacheAwareness::L2,
163 numa_strategy: NumaStrategy::LocalPreferred,
164 gpu_settings: None,
165 }
166 }
167}
168
169impl Default for GpuChunkSettings {
170 fn default() -> Self {
171 Self {
172 gpu_memory_ratio: 0.8,
173 gpu_min_chunk: 4096,
174 overlap_compute: true,
175 gpu_bandwidth: None,
176 transfer_bandwidth: None,
177 }
178 }
179}
180
181impl ChunkConfig {
182 pub fn compute_intensive() -> Self {
184 Self {
185 strategy: ChunkStrategy::Adaptive,
186 min_chunk_size: 32,
187 max_chunk_size: 1024,
188 prefer_work_stealing: true,
189 memory_pattern: MemoryPattern::Sequential,
190 compute_intensity: ComputeIntensity::ComputeIntensive,
191 enable_monitoring: false,
192 load_balance_factor: 0.05,
193 cache_awareness: CacheAwareness::L1,
194 numa_strategy: NumaStrategy::LocalPreferred,
195 gpu_settings: None,
196 }
197 }
198
199 pub fn memory_intensive() -> Self {
201 Self {
202 strategy: ChunkStrategy::MemoryOptimized,
203 min_chunk_size: 256,
204 max_chunk_size: 16384,
205 prefer_work_stealing: false,
206 memory_pattern: MemoryPattern::Sequential,
207 compute_intensity: ComputeIntensity::MemoryBound,
208 enable_monitoring: false,
209 load_balance_factor: 0.2,
210 cache_awareness: CacheAwareness::L3,
211 numa_strategy: NumaStrategy::StrictLocal,
212 gpu_settings: None,
213 }
214 }
215
216 pub fn cache_friendly() -> Self {
218 Self {
219 strategy: ChunkStrategy::CacheOptimized,
220 min_chunk_size: 64,
221 max_chunk_size: 4096,
222 prefer_work_stealing: true,
223 memory_pattern: MemoryPattern::CacheFriendly,
224 compute_intensity: ComputeIntensity::Balanced,
225 enable_monitoring: false,
226 load_balance_factor: 0.1,
227 cache_awareness: CacheAwareness::Full,
228 numa_strategy: NumaStrategy::LocalPreferred,
229 gpu_settings: None,
230 }
231 }
232
233 pub fn linear_algebra() -> Self {
235 Self {
236 strategy: ChunkStrategy::LinearAlgebra,
237 min_chunk_size: 256,
238 max_chunk_size: 8192,
239 prefer_work_stealing: false,
240 memory_pattern: MemoryPattern::BlockWise,
241 compute_intensity: ComputeIntensity::ComputeIntensive,
242 enable_monitoring: false,
243 load_balance_factor: 0.1,
244 cache_awareness: CacheAwareness::L3,
245 numa_strategy: NumaStrategy::LocalPreferred,
246 gpu_settings: None,
247 }
248 }
249
250 pub fn sparse_matrix() -> Self {
252 Self {
253 strategy: ChunkStrategy::SparseMatrix,
254 min_chunk_size: 128,
255 max_chunk_size: 4096,
256 prefer_work_stealing: true,
257 memory_pattern: MemoryPattern::Sparse,
258 compute_intensity: ComputeIntensity::MemoryBound,
259 enable_monitoring: true,
260 load_balance_factor: 0.3,
261 cache_awareness: CacheAwareness::L2,
262 numa_strategy: NumaStrategy::LocalPreferred,
263 gpu_settings: None,
264 }
265 }
266
267 pub fn signal_processing() -> Self {
269 Self {
270 strategy: ChunkStrategy::SignalProcessing,
271 min_chunk_size: 512,
272 max_chunk_size: 16384,
273 prefer_work_stealing: false,
274 memory_pattern: MemoryPattern::Sequential,
275 compute_intensity: ComputeIntensity::ComputeIntensive,
276 enable_monitoring: false,
277 load_balance_factor: 0.05,
278 cache_awareness: CacheAwareness::L2,
279 numa_strategy: NumaStrategy::LocalPreferred,
280 gpu_settings: None,
281 }
282 }
283
284 pub fn image_processing() -> Self {
286 Self {
287 strategy: ChunkStrategy::ImageProcessing,
288 min_chunk_size: 1024,
289 max_chunk_size: 32768,
290 prefer_work_stealing: true,
291 memory_pattern: MemoryPattern::BlockWise,
292 compute_intensity: ComputeIntensity::Balanced,
293 enable_monitoring: false,
294 load_balance_factor: 0.15,
295 cache_awareness: CacheAwareness::L3,
296 numa_strategy: NumaStrategy::LocalPreferred,
297 gpu_settings: None,
298 }
299 }
300
301 pub fn monte_carlo() -> Self {
303 Self {
304 strategy: ChunkStrategy::MonteCarlo,
305 min_chunk_size: 1024,
306 max_chunk_size: 65536,
307 prefer_work_stealing: true,
308 memory_pattern: MemoryPattern::Random,
309 compute_intensity: ComputeIntensity::ComputeIntensive,
310 enable_monitoring: true,
311 load_balance_factor: 0.2,
312 cache_awareness: CacheAwareness::L1,
313 numa_strategy: NumaStrategy::Interleave,
314 gpu_settings: None,
315 }
316 }
317
318 pub fn iterative_solver() -> Self {
320 Self {
321 strategy: ChunkStrategy::IterativeSolver,
322 min_chunk_size: 256,
323 max_chunk_size: 8192,
324 prefer_work_stealing: false,
325 memory_pattern: MemoryPattern::Sequential,
326 compute_intensity: ComputeIntensity::Balanced,
327 enable_monitoring: true,
328 load_balance_factor: 0.1,
329 cache_awareness: CacheAwareness::L2,
330 numa_strategy: NumaStrategy::StrictLocal,
331 gpu_settings: None,
332 }
333 }
334
335 pub fn gpu_hybrid() -> Self {
337 Self {
338 strategy: ChunkStrategy::GpuAware,
339 min_chunk_size: 4096,
340 max_chunk_size: 131072,
341 prefer_work_stealing: true,
342 memory_pattern: MemoryPattern::Sequential,
343 compute_intensity: ComputeIntensity::ComputeIntensive,
344 enable_monitoring: true,
345 load_balance_factor: 0.2,
346 cache_awareness: CacheAwareness::L3,
347 numa_strategy: NumaStrategy::LocalPreferred,
348 gpu_settings: Some(GpuChunkSettings::default()),
349 }
350 }
351
352 pub fn with_monitoring(mut self) -> Self {
354 self.enable_monitoring = true;
355 self
356 }
357
358 pub fn with_numa_strategy(mut self, strategy: NumaStrategy) -> Self {
360 self.numa_strategy = strategy;
361 self
362 }
363
364 pub fn with_memory_pattern(mut self, pattern: MemoryPattern) -> Self {
366 self.memory_pattern = pattern;
367 self
368 }
369
370 pub fn with_compute_intensity(mut self, intensity: ComputeIntensity) -> Self {
372 self.compute_intensity = intensity;
373 self
374 }
375
376 pub fn with_gpu_settings(mut self, settings: GpuChunkSettings) -> Self {
378 self.gpu_settings = Some(settings);
379 self
380 }
381}
382
383pub struct ChunkingUtils;
385
386impl ChunkingUtils {
387 pub fn optimal_chunk_size(data_size: usize, config: &ChunkConfig) -> usize {
389 let thread_count = Self::thread_count();
390 let cpu_info = Self::get_cpu_info();
391
392 let chunk_size = match config.strategy {
393 ChunkStrategy::Fixed(size) => size,
394 ChunkStrategy::Adaptive => Self::adaptive_chunk_size(data_size, thread_count),
395 ChunkStrategy::CacheOptimized => {
396 Self::cache_optimized_chunk_size(data_size, thread_count)
397 }
398 ChunkStrategy::MemoryOptimized => {
399 Self::memory_optimized_chunk_size(data_size, thread_count)
400 }
401 ChunkStrategy::Dynamic => Self::dynamic_chunk_size(data_size, thread_count),
402 ChunkStrategy::WorkStealingBalanced => {
403 Self::work_stealing_chunk_size(data_size, thread_count)
404 }
405 ChunkStrategy::NumaAware => {
406 Self::numa_aware_chunk_size(data_size, thread_count, &cpu_info)
407 }
408 ChunkStrategy::LinearAlgebra => {
409 Self::linear_algebra_chunk_size(data_size, thread_count, &cpu_info)
410 }
411 ChunkStrategy::SparseMatrix => Self::sparse_matrix_chunk_size(data_size, thread_count),
412 ChunkStrategy::SignalProcessing => {
413 Self::signal_processing_chunk_size(data_size, thread_count)
414 }
415 ChunkStrategy::ImageProcessing => {
416 Self::image_processing_chunk_size(data_size, thread_count)
417 }
418 ChunkStrategy::MonteCarlo => Self::monte_carlo_chunk_size(data_size, thread_count),
419 ChunkStrategy::IterativeSolver => {
420 Self::iterative_solver_chunk_size(data_size, thread_count)
421 }
422 ChunkStrategy::GpuAware => Self::gpu_aware_chunk_size(data_size, thread_count, config),
423 ChunkStrategy::Custom(func) => func(data_size, thread_count),
424 };
425
426 let cache_adjusted =
428 Self::apply_cache_awareness(chunk_size, config.cache_awareness, &cpu_info);
429
430 let pattern_adjusted = Self::apply_memory_pattern(cache_adjusted, config.memory_pattern);
432
433 let intensity_adjusted =
435 Self::apply_compute_intensity(pattern_adjusted, config.compute_intensity);
436
437 intensity_adjusted
439 .max(config.min_chunk_size)
440 .min(config.max_chunk_size)
441 }
442
443 fn adaptive_chunk_size(data_size: usize, thread_count: usize) -> usize {
445 if data_size < 1000 {
446 data_size
448 } else if data_size < 10000 {
449 (data_size / thread_count).max(64)
451 } else {
452 let base_chunk = data_size / (thread_count * 4); base_chunk.max(256).min(8192)
455 }
456 }
457
458 fn cache_optimized_chunk_size(data_size: usize, thread_count: usize) -> usize {
460 const CACHE_LINE_SIZE: usize = 64;
462 const L2_CACHE_SIZE: usize = 256 * 1024;
463
464 let cache_friendly_size = L2_CACHE_SIZE / std::mem::size_of::<f32>() / 4; let target_chunk = (data_size / thread_count).min(cache_friendly_size);
469 let aligned_chunk = (target_chunk / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
470
471 aligned_chunk.max(CACHE_LINE_SIZE)
472 }
473
474 fn memory_optimized_chunk_size(data_size: usize, thread_count: usize) -> usize {
476 let target_chunk = data_size / thread_count;
478
479 target_chunk.max(1024).min(32768)
481 }
482
483 fn dynamic_chunk_size(data_size: usize, thread_count: usize) -> usize {
485 let base_chunk = data_size / (thread_count * 8); base_chunk.max(128).min(2048)
488 }
489
490 fn thread_count() -> usize {
492 #[cfg(feature = "parallel")]
493 {
494 rayon::current_num_threads().max(1)
495 }
496 #[cfg(not(feature = "parallel"))]
497 {
498 1
499 }
500 }
501
502 fn get_cpu_info() -> CpuInfo {
504 CpuInfo {
505 l1_cache_size: 32 * 1024, l2_cache_size: 256 * 1024, l3_cache_size: 8 * 1024 * 1024, cache_line_size: 64, numa_nodes: 1, cores_per_numa: Self::thread_count(),
511 }
512 }
513
514 fn work_stealing_chunk_size(data_size: usize, thread_count: usize) -> usize {
516 let target_chunks = thread_count * 16; (data_size / target_chunks).max(32)
519 }
520
521 fn numa_aware_chunk_size(data_size: usize, thread_count: usize, cpu_info: &CpuInfo) -> usize {
523 let chunks_per_numa = thread_count / cpu_info.numa_nodes;
524 let chunk_size = data_size / (cpu_info.numa_nodes * chunks_per_numa);
525
526 let aligned = (chunk_size / cpu_info.cache_line_size) * cpu_info.cache_line_size;
528 aligned.max(cpu_info.cache_line_size)
529 }
530
531 fn linear_algebra_chunk_size(
533 data_size: usize,
534 thread_count: usize,
535 cpu_info: &CpuInfo,
536 ) -> usize {
537 let cache_size = cpu_info.l3_cache_size / 4; let elements_per_cache = cache_size / std::mem::size_of::<f64>();
540
541 let sqrt_elements = (elements_per_cache as f64).sqrt() as usize;
542 let block_size = sqrt_elements.max(64).min(512);
543
544 ((data_size / thread_count) / block_size) * block_size
546 }
547
548 fn sparse_matrix_chunk_size(data_size: usize, thread_count: usize) -> usize {
550 let base_chunk = data_size / (thread_count * 8);
552 base_chunk.max(128).min(2048)
553 }
554
555 fn signal_processing_chunk_size(data_size: usize, thread_count: usize) -> usize {
557 let target_chunk = data_size / thread_count;
559 let log2_chunk = (target_chunk as f64).log2().floor() as u32;
560 let power_of_2_chunk = 2_usize.pow(log2_chunk);
561
562 power_of_2_chunk.max(512).min(16384)
563 }
564
565 fn image_processing_chunk_size(data_size: usize, thread_count: usize) -> usize {
567 let pixels_per_thread = data_size / thread_count;
569
570 let block_side = (pixels_per_thread as f64).sqrt() as usize;
572 let block_size = block_side * block_side;
573
574 block_size.max(1024).min(32768)
575 }
576
577 fn monte_carlo_chunk_size(data_size: usize, thread_count: usize) -> usize {
579 let chunk_size = data_size / thread_count;
581 chunk_size.max(4096).min(131072)
582 }
583
584 fn iterative_solver_chunk_size(data_size: usize, thread_count: usize) -> usize {
586 let chunk_size = data_size / (thread_count * 2);
588 chunk_size.max(256).min(8192)
589 }
590
591 fn gpu_aware_chunk_size(data_size: usize, thread_count: usize, config: &ChunkConfig) -> usize {
593 if let Some(gpu_settings) = &config.gpu_settings {
594 let cpu_chunk = data_size / thread_count;
596 let gpu_preferred = gpu_settings.gpu_min_chunk;
597
598 if data_size > gpu_preferred * 4 {
600 gpu_preferred.max(cpu_chunk / 2)
601 } else {
602 cpu_chunk
603 }
604 } else {
605 Self::adaptive_chunk_size(data_size, thread_count)
606 }
607 }
608
609 fn apply_cache_awareness(
611 chunk_size: usize,
612 awareness: CacheAwareness,
613 cpu_info: &CpuInfo,
614 ) -> usize {
615 match awareness {
616 CacheAwareness::None => chunk_size,
617 CacheAwareness::L1 => {
618 let l1_elements = cpu_info.l1_cache_size / std::mem::size_of::<f64>();
619 chunk_size.min(l1_elements / 2)
620 }
621 CacheAwareness::L2 => {
622 let l2_elements = cpu_info.l2_cache_size / std::mem::size_of::<f64>();
623 chunk_size.min(l2_elements / 2)
624 }
625 CacheAwareness::L3 => {
626 let l3_elements = cpu_info.l3_cache_size / std::mem::size_of::<f64>();
627 chunk_size.min(l3_elements / 4)
628 }
629 CacheAwareness::Full => {
630 let l1_elements = cpu_info.l1_cache_size / std::mem::size_of::<f64>();
632 chunk_size.min(l1_elements / 4)
633 }
634 }
635 }
636
637 fn apply_memory_pattern(chunk_size: usize, pattern: MemoryPattern) -> usize {
639 match pattern {
640 MemoryPattern::Sequential => chunk_size,
641 MemoryPattern::Random => chunk_size / 2, MemoryPattern::Strided(stride) => {
643 ((chunk_size / stride) * stride).max(stride)
645 }
646 MemoryPattern::BlockWise => {
647 let block_size = 64; ((chunk_size / block_size) * block_size).max(block_size)
650 }
651 MemoryPattern::Sparse => chunk_size / 4, MemoryPattern::CacheFriendly => chunk_size, }
654 }
655
656 fn apply_compute_intensity(chunk_size: usize, intensity: ComputeIntensity) -> usize {
658 match intensity {
659 ComputeIntensity::MemoryBound => chunk_size * 2, ComputeIntensity::Balanced => chunk_size,
661 ComputeIntensity::ComputeIntensive => chunk_size / 2, ComputeIntensity::CpuBound => chunk_size / 4, }
664 }
665
666 pub fn chunked_map<T, R, F>(data: &[T], config: &ChunkConfig, map_fn: F) -> Vec<R>
668 where
669 T: Sync,
670 R: Send,
671 F: Fn(&T) -> R + Sync,
672 {
673 let chunk_size = Self::optimal_chunk_size(data.len(), config);
674
675 #[cfg(feature = "parallel")]
676 {
677 if config.prefer_work_stealing {
678 data.par_iter()
680 .with_min_len(chunk_size)
681 .map(|x| map_fn(x))
682 .collect()
683 } else {
684 data.par_chunks(chunk_size)
686 .map(|chunk| chunk.iter().map(|x| map_fn(x)).collect::<Vec<_>>())
687 .flatten()
688 .collect()
689 }
690 }
691 #[cfg(not(feature = "parallel"))]
692 {
693 data.iter().map(|x| map_fn(x)).collect()
694 }
695 }
696
697 pub fn chunked_zip_map<T, U, R, F>(
699 data_a: &[T],
700 data_b: &[U],
701 config: &ChunkConfig,
702 map_fn: F,
703 ) -> Vec<R>
704 where
705 T: Sync,
706 U: Sync,
707 R: Send,
708 F: Fn(&T, &U) -> R + Sync,
709 {
710 assert_eq!(
711 data_a.len(),
712 data_b.len(),
713 "Arrays must have the same length"
714 );
715
716 let chunk_size = Self::optimal_chunk_size(data_a.len(), config);
717
718 #[cfg(feature = "parallel")]
719 {
720 if config.prefer_work_stealing {
721 data_a
722 .par_iter()
723 .with_min_len(chunk_size)
724 .zip(data_b.par_iter())
725 .map(|(a, b)| map_fn(a, b))
726 .collect()
727 } else {
728 data_a
729 .par_chunks(chunk_size)
730 .zip(data_b.par_chunks(chunk_size))
731 .map(|(chunk_a, chunk_b)| {
732 chunk_a
733 .iter()
734 .zip(chunk_b.iter())
735 .map(|(a, b)| map_fn(a, b))
736 .collect::<Vec<_>>()
737 })
738 .flatten()
739 .collect()
740 }
741 }
742 #[cfg(not(feature = "parallel"))]
743 {
744 data_a
745 .iter()
746 .zip(data_b.iter())
747 .map(|(a, b)| map_fn(a, b))
748 .collect()
749 }
750 }
751
752 pub fn chunked_reduce<T, F>(data: &[T], config: &ChunkConfig, identity: T, reduce_fn: F) -> T
754 where
755 T: Clone + Send + Sync,
756 F: Fn(T, T) -> T + Sync,
757 {
758 let chunk_size = Self::optimal_chunk_size(data.len(), config);
759
760 #[cfg(feature = "parallel")]
761 {
762 data.par_chunks(chunk_size)
763 .map(|chunk| {
764 chunk
765 .iter()
766 .cloned()
767 .fold(identity.clone(), |acc, x| reduce_fn(acc, x))
768 })
769 .reduce(|| identity.clone(), |a, b| reduce_fn(a, b))
770 }
771 #[cfg(not(feature = "parallel"))]
772 {
773 data.iter().cloned().fold(identity, reduce_fn)
774 }
775 }
776}
777
778pub trait ParallelSliceExt<T> {
780 fn chunked_map<R, F>(&self, config: &ChunkConfig, map_fn: F) -> Vec<R>
782 where
783 T: Sync,
784 R: Send,
785 F: Fn(&T) -> R + Sync;
786
787 fn chunked_reduce<F>(&self, config: &ChunkConfig, identity: T, reduce_fn: F) -> T
789 where
790 T: Clone + Send + Sync,
791 F: Fn(T, T) -> T + Sync;
792}
793
794impl<T> ParallelSliceExt<T> for [T] {
795 fn chunked_map<R, F>(&self, config: &ChunkConfig, map_fn: F) -> Vec<R>
796 where
797 T: Sync,
798 R: Send,
799 F: Fn(&T) -> R + Sync,
800 {
801 ChunkingUtils::chunked_map(self, config, map_fn)
802 }
803
804 fn chunked_reduce<F>(&self, config: &ChunkConfig, identity: T, reduce_fn: F) -> T
805 where
806 T: Clone + Send + Sync,
807 F: Fn(T, T) -> T + Sync,
808 {
809 ChunkingUtils::chunked_reduce(self, config, identity, reduce_fn)
810 }
811}
812
813#[derive(Debug, Clone)]
815pub struct CpuInfo {
816 pub l1_cache_size: usize,
817 pub l2_cache_size: usize,
818 pub l3_cache_size: usize,
819 pub cache_line_size: usize,
820 pub numa_nodes: usize,
821 pub cores_per_numa: usize,
822}
823
824#[derive(Debug, Clone)]
826pub struct ChunkPerformanceMonitor {
827 measurements: Vec<ChunkMeasurement>,
828 optimal_sizes: std::collections::HashMap<String, usize>,
829}
830
831#[derive(Debug, Clone)]
833pub struct ChunkMeasurement {
834 pub chunk_size: usize,
835 pub data_size: usize,
836 pub execution_time: std::time::Duration,
837 pub throughput: f64,
838 pub operation_type: String,
839}
840
841impl ChunkPerformanceMonitor {
842 pub fn new() -> Self {
844 Self {
845 measurements: Vec::new(),
846 optimal_sizes: std::collections::HashMap::new(),
847 }
848 }
849
850 pub fn record_measurement(&mut self, measurement: ChunkMeasurement) {
852 self.measurements.push(measurement);
853
854 if self.measurements.len() > 1000 {
856 self.measurements.remove(0);
857 }
858
859 self.update_optimal_sizes();
860 }
861
862 fn update_optimal_sizes(&mut self) {
864 for measurement in &self.measurements {
865 let key = format!(
866 "{}_{}k",
867 measurement.operation_type,
868 measurement.data_size / 1000
869 );
870
871 let current_optimal = self
872 .optimal_sizes
873 .get(&key)
874 .unwrap_or(&measurement.chunk_size);
875
876 if measurement.throughput > self.get_throughput_for_size(*current_optimal, &key) {
878 self.optimal_sizes.insert(key, measurement.chunk_size);
879 }
880 }
881 }
882
883 fn get_throughput_for_size(&self, chunk_size: usize, operation_key: &str) -> f64 {
885 self.measurements
886 .iter()
887 .filter(|m| {
888 m.chunk_size == chunk_size
889 && format!("{}_{}k", m.operation_type, m.data_size / 1000) == operation_key
890 })
891 .map(|m| m.throughput)
892 .fold(0.0, f64::max)
893 }
894
895 pub fn get_optimal_size(&self, operation_type: &str, data_size: usize) -> Option<usize> {
897 let key = format!("{}_{}k", operation_type, data_size / 1000);
898 self.optimal_sizes.get(&key).copied()
899 }
900
901 pub fn get_statistics(&self) -> ChunkStatistics {
903 if self.measurements.is_empty() {
904 return ChunkStatistics::default();
905 }
906
907 let throughputs: Vec<f64> = self.measurements.iter().map(|m| m.throughput).collect();
908 let avg_throughput = throughputs.iter().sum::<f64>() / throughputs.len() as f64;
909 let max_throughput = throughputs.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
910 let min_throughput = throughputs.iter().fold(f64::INFINITY, |a, &b| a.min(b));
911
912 ChunkStatistics {
913 total_measurements: self.measurements.len(),
914 avg_throughput,
915 max_throughput,
916 min_throughput,
917 optimal_operations: self.optimal_sizes.len(),
918 }
919 }
920}
921
922impl Default for ChunkPerformanceMonitor {
923 fn default() -> Self {
924 Self::new()
925 }
926}
927
928#[derive(Debug, Clone, Default)]
930pub struct ChunkStatistics {
931 pub total_measurements: usize,
932 pub avg_throughput: f64,
933 pub max_throughput: f64,
934 pub min_throughput: f64,
935 pub optimal_operations: usize,
936}
937
938pub struct MatrixChunking;
940
941impl MatrixChunking {
942 pub fn matrix_multiply_chunks(
944 rows_a: usize,
945 cols_a: usize,
946 cols_b: usize,
947 ) -> (usize, usize, usize) {
948 let cache_size = 256 * 1024; let element_size = std::mem::size_of::<f64>();
951 let cache_elements = cache_size / element_size;
952
953 let block_size = ((cache_elements / 3) as f64).sqrt() as usize;
955 let block_size = block_size.max(64).min(512);
956
957 (
958 block_size.min(rows_a),
959 block_size.min(cols_a),
960 block_size.min(cols_b),
961 )
962 }
963
964 pub fn array_2d_chunks(rows: usize, cols: usize, thread_count: usize) -> (usize, usize) {
966 let total_elements = rows * cols;
968 let elements_per_thread = total_elements / thread_count;
969
970 if rows >= cols {
971 let chunk_rows = (elements_per_thread / cols).max(1);
973 (chunk_rows, cols)
974 } else {
975 let chunk_cols = (elements_per_thread / rows).max(1);
977 (rows, chunk_cols)
978 }
979 }
980
981 pub fn array_3d_chunks(
983 depth: usize,
984 rows: usize,
985 cols: usize,
986 thread_count: usize,
987 ) -> (usize, usize, usize) {
988 let total_elements = depth * rows * cols;
989 let elements_per_thread = total_elements / thread_count;
990
991 if depth >= rows && depth >= cols {
993 let chunk_depth = (elements_per_thread / (rows * cols)).max(1);
994 (chunk_depth, rows, cols)
995 } else if rows >= cols {
996 let chunk_rows = (elements_per_thread / (depth * cols)).max(1);
997 (depth, chunk_rows, cols)
998 } else {
999 let chunk_cols = (elements_per_thread / (depth * rows)).max(1);
1000 (depth, rows, chunk_cols)
1001 }
1002 }
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007 use super::*;
1008
1009 #[test]
1010 fn test_chunk_size_calculation() {
1011 let config = ChunkConfig::default();
1012
1013 let small_chunk = ChunkingUtils::optimal_chunk_size(100, &config);
1015 assert!(small_chunk >= config.min_chunk_size);
1016
1017 let large_chunk = ChunkingUtils::optimal_chunk_size(100000, &config);
1019 assert!(large_chunk >= config.min_chunk_size);
1020 assert!(large_chunk <= config.max_chunk_size);
1021 }
1022
1023 #[test]
1024 fn test_chunked_map() {
1025 let data: Vec<i32> = (0..1000).collect();
1026 let config = ChunkConfig::compute_intensive();
1027
1028 let result = data.chunked_map(&config, |&x| x * x);
1029 let expected: Vec<i32> = (0..1000).map(|x| x * x).collect();
1030
1031 assert_eq!(result, expected);
1032 }
1033
1034 #[test]
1035 fn test_chunked_zip_map() {
1036 let data_a: Vec<i32> = (0..1000).collect();
1037 let data_b: Vec<i32> = (1000..2000).collect();
1038 let config = ChunkConfig::memory_intensive();
1039
1040 let result = ChunkingUtils::chunked_zip_map(&data_a, &data_b, &config, |&a, &b| a + b);
1041 let expected: Vec<i32> = (0..1000).map(|x| x + (x + 1000)).collect();
1042
1043 assert_eq!(result, expected);
1044 }
1045
1046 #[test]
1047 fn test_chunked_reduce() {
1048 let data: Vec<i32> = (1..=100).collect();
1049 let config = ChunkConfig::cache_friendly();
1050
1051 let result = data.chunked_reduce(&config, 0, |a, b| a + b);
1052 let expected = (1..=100).sum::<i32>();
1053
1054 assert_eq!(result, expected);
1055 }
1056
1057 #[test]
1058 fn test_specialized_configs() {
1059 let linear_algebra = ChunkConfig::linear_algebra();
1060 assert_eq!(linear_algebra.strategy, ChunkStrategy::LinearAlgebra);
1061 assert_eq!(linear_algebra.memory_pattern, MemoryPattern::BlockWise);
1062
1063 let sparse_matrix = ChunkConfig::sparse_matrix();
1064 assert_eq!(sparse_matrix.strategy, ChunkStrategy::SparseMatrix);
1065 assert_eq!(sparse_matrix.memory_pattern, MemoryPattern::Sparse);
1066
1067 let monte_carlo = ChunkConfig::monte_carlo();
1068 assert_eq!(monte_carlo.strategy, ChunkStrategy::MonteCarlo);
1069 assert_eq!(monte_carlo.memory_pattern, MemoryPattern::Random);
1070 }
1071
1072 #[test]
1073 fn test_matrix_chunking() {
1074 let (row_chunk, col_chunk_a, col_chunk_b) =
1075 MatrixChunking::matrix_multiply_chunks(1000, 800, 600);
1076 assert!(row_chunk > 0 && row_chunk <= 1000);
1077 assert!(col_chunk_a > 0 && col_chunk_a <= 800);
1078 assert!(col_chunk_b > 0 && col_chunk_b <= 600);
1079
1080 let (chunk_rows, chunk_cols) = MatrixChunking::array_2d_chunks(100, 200, 4);
1081 assert!(chunk_rows > 0 && chunk_rows <= 100);
1082 assert!(chunk_cols > 0 && chunk_cols <= 200);
1083 }
1084
1085 #[test]
1086 fn test_performance_monitor() {
1087 let mut monitor = ChunkPerformanceMonitor::new();
1088
1089 let measurement = ChunkMeasurement {
1090 chunk_size: 1024,
1091 data_size: 10000,
1092 execution_time: std::time::Duration::from_millis(10),
1093 throughput: 1000.0,
1094 operation_type: "matrix_multiply".to_string(),
1095 };
1096
1097 monitor.record_measurement(measurement);
1098 let stats = monitor.get_statistics();
1099 assert_eq!(stats.total_measurements, 1);
1100 assert_eq!(stats.max_throughput, 1000.0);
1101 }
1102
1103 #[test]
1104 fn test_cache_awareness() {
1105 let cpu_info = CpuInfo {
1106 l1_cache_size: 32 * 1024,
1107 l2_cache_size: 256 * 1024,
1108 l3_cache_size: 8 * 1024 * 1024,
1109 cache_line_size: 64,
1110 numa_nodes: 1,
1111 cores_per_numa: 4,
1112 };
1113
1114 let chunk_size = 10000;
1115 let l1_adjusted =
1116 ChunkingUtils::apply_cache_awareness(chunk_size, CacheAwareness::L1, &cpu_info);
1117 let l3_adjusted =
1118 ChunkingUtils::apply_cache_awareness(chunk_size, CacheAwareness::L3, &cpu_info);
1119
1120 assert!(l1_adjusted <= chunk_size);
1121 assert!(l3_adjusted <= chunk_size);
1122 assert!(l1_adjusted <= l3_adjusted); }
1124
1125 #[test]
1126 fn test_memory_pattern_adjustment() {
1127 let base_size = 1000;
1128
1129 let random_adjusted = ChunkingUtils::apply_memory_pattern(base_size, MemoryPattern::Random);
1130 let sparse_adjusted = ChunkingUtils::apply_memory_pattern(base_size, MemoryPattern::Sparse);
1131 let sequential_adjusted =
1132 ChunkingUtils::apply_memory_pattern(base_size, MemoryPattern::Sequential);
1133
1134 assert!(random_adjusted <= base_size);
1135 assert!(sparse_adjusted <= base_size);
1136 assert_eq!(sequential_adjusted, base_size);
1137 assert!(sparse_adjusted < random_adjusted); }
1139
1140 #[test]
1141 fn test_compute_intensity_adjustment() {
1142 let base_size = 1000;
1143
1144 let memory_bound =
1145 ChunkingUtils::apply_compute_intensity(base_size, ComputeIntensity::MemoryBound);
1146 let compute_intensive =
1147 ChunkingUtils::apply_compute_intensity(base_size, ComputeIntensity::ComputeIntensive);
1148 let cpu_bound =
1149 ChunkingUtils::apply_compute_intensity(base_size, ComputeIntensity::CpuBound);
1150
1151 assert!(memory_bound >= base_size); assert!(compute_intensive <= base_size); assert!(cpu_bound <= base_size); assert!(cpu_bound <= compute_intensive);
1155 }
1156
1157 #[test]
1158 fn test_gpu_chunk_settings() {
1159 let gpu_settings = GpuChunkSettings::default();
1160 assert_eq!(gpu_settings.gpu_memory_ratio, 0.8);
1161 assert_eq!(gpu_settings.gpu_min_chunk, 4096);
1162 assert!(gpu_settings.overlap_compute);
1163
1164 let config = ChunkConfig::gpu_hybrid();
1165 assert!(config.gpu_settings.is_some());
1166 assert_eq!(config.strategy, ChunkStrategy::GpuAware);
1167 }
1168}