1use super::validation;
2use crate::error::{CoreError, ErrorContext, ErrorLocation};
3use ::ndarray::{Array, ArrayBase, Data, Dimension};
4use std::marker::PhantomData;
5use std::mem;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::time::{Duration, Instant};
8
9#[derive(Debug)]
11pub struct MemoryPatternOptimizer {
12 pub cache_line_size: usize,
14 pub l1_cache_size: usize,
16 pub l2_cache_size: usize,
18 pub l3_cache_size: usize,
20 pub numa_nodes: Vec<NumaNodeInfo>,
22 pub memorybandwidth: AtomicUsize, pub processing_times: Vec<Duration>,
26 pub access_pattern_stats: AccessPatternStats,
28}
29
30#[derive(Debug, Clone)]
32pub struct NumaNodeInfo {
33 pub nodeid: usize,
34 pub available_memory: usize,
35 pub cpu_cores: Vec<usize>,
36 pub memorybandwidth: usize, }
38
39#[derive(Debug, Clone, Default)]
41pub struct AccessPatternStats {
42 pub sequential_access_ratio: f64,
43 pub random_access_ratio: f64,
44 pub strided_access_ratio: f64,
45 pub cache_hit_ratio: f64,
46 pub memorybandwidth_utilization: f64,
47 pub last_updated: Option<Instant>,
48}
49
50#[derive(Debug, Clone, Copy, PartialEq)]
52pub enum AdvancedChunkingStrategy {
53 CacheLineAligned,
55 NumaAware,
57 BandwidthOptimized,
59 LatencyOptimized,
61 Adaptive,
63 PowerAware,
65}
66
67impl MemoryPatternOptimizer {
68 pub fn new() -> Self {
70 Self {
71 cache_line_size: Self::detect_cache_line_size(),
72 l1_cache_size: Self::detect_l1_cache_size(),
73 l2_cache_size: Self::detect_l2_cache_size(),
74 l3_cache_size: Self::detect_l3_cache_size(),
75 numa_nodes: Self::detect_numa_topology(),
76 memorybandwidth: AtomicUsize::new(0),
77 processing_times: Vec::new(),
78 access_pattern_stats: AccessPatternStats::default(),
79 }
80 }
81
82 fn detect_cache_line_size() -> usize {
84 #[cfg(target_arch = "x86_64")]
86 {
87 64 }
89 #[cfg(target_arch = "aarch64")]
90 {
91 128 }
93 #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
94 {
95 64 }
97 }
98
99 fn detect_l1_cache_size() -> usize {
101 32 * 1024 }
104
105 fn detect_l2_cache_size() -> usize {
107 256 * 1024 }
109
110 fn detect_l3_cache_size() -> usize {
112 8 * 1024 * 1024 }
114
115 fn detect_numa_topology() -> Vec<NumaNodeInfo> {
117 let cores = std::thread::available_parallelism()
119 .map(|n| n.get())
120 .unwrap_or(1);
121
122 vec![NumaNodeInfo {
123 nodeid: 0,
124 available_memory: 4 * 1024 * 1024 * 1024, cpu_cores: (0..cores).collect(),
126 memorybandwidth: 25000, }]
128 }
129
130 pub fn calculate_cache_aware_chunk_size<T>(&self, totalelements: usize) -> usize {
132 let element_size = std::mem::size_of::<T>();
133
134 let target_cache_size = self.l2_cache_size / 2; let max_elements_in_cache = target_cache_size / element_size;
137
138 let cache_line_elements = self.cache_line_size / element_size;
140 let aligned_chunk_size =
141 (max_elements_in_cache / cache_line_elements) * cache_line_elements;
142
143 std::cmp::max(aligned_chunk_size, cache_line_elements)
145 }
146
147 pub fn threads(&self, total_elements: usize, numthreads: usize) -> Vec<(usize, usize)> {
149 let mut chunks = Vec::new();
150
151 if self.numa_nodes.len() <= 1 {
152 let chunk_size = total_elements / numthreads;
154 for i in 0..numthreads {
155 let start = i * chunk_size;
156 let end = if i == numthreads - 1 {
157 total_elements
158 } else {
159 (i + 1) * chunk_size
160 };
161 chunks.push((start, end));
162 }
163 } else {
164 let threads_per_node = numthreads / self.numa_nodes.len();
166 let elements_per_node = total_elements / self.numa_nodes.len();
167
168 for (nodeidx, node) in self.numa_nodes.iter().enumerate() {
169 let node_start = nodeidx * elements_per_node;
170 let node_end = if nodeidx == self.numa_nodes.len() - 1 {
171 total_elements
172 } else {
173 (nodeidx + 1) * elements_per_node
174 };
175
176 let node_elements = node_end - node_start;
177 let node_chunk_size = node_elements / threads_per_node;
178
179 for thread_idx in 0..threads_per_node {
180 let start = node_start + thread_idx * node_chunk_size;
181 let end = if thread_idx == threads_per_node - 1 {
182 node_end
183 } else {
184 node_start + (thread_idx + 1) * node_chunk_size
185 };
186 chunks.push((start, end));
187 }
188 }
189 }
190
191 chunks
192 }
193
194 pub fn elements(&self, totalelements: usize) -> usize {
196 if self.processing_times.is_empty() {
197 return self.calculate_cache_aware_chunk_size::<u64>(totalelements);
199 }
200
201 let recent_times: Vec<_> = self.processing_times.iter().rev().take(10).collect();
203 let avg_time =
204 recent_times.iter().map(|t| t.as_nanos()).sum::<u128>() / recent_times.len() as u128;
205
206 let target_time_ns = 75_000_000; if avg_time > target_time_ns {
210 let current_default = self.calculate_cache_aware_chunk_size::<u64>(totalelements);
212 current_default * 3 / 4
213 } else if avg_time < target_time_ns / 2 {
214 let current_default = self.calculate_cache_aware_chunk_size::<u64>(totalelements);
216 current_default * 5 / 4
217 } else {
218 self.calculate_cache_aware_chunk_size::<u64>(totalelements)
220 }
221 }
222
223 pub fn record_processing_time(&mut self, duration: Duration) {
225 self.processing_times.push(duration);
226
227 if self.processing_times.len() > 100 {
229 self.processing_times.drain(0..50);
230 }
231 }
232
233 pub fn update_access_pattern_stats(&mut self, pattern: &AccessPatternStats) {
235 self.access_pattern_stats = pattern.clone();
236 self.access_pattern_stats.last_updated = Some(Instant::now());
237 }
238
239 pub fn get_memorybandwidth(&self) -> usize {
241 self.memorybandwidth.load(Ordering::Relaxed)
242 }
243
244 pub fn calculatebandwidth_optimized_chunk_size<T>(&self, totalelements: usize) -> usize {
246 let element_size = std::mem::size_of::<T>();
247 let bandwidth_mbps = self.get_memorybandwidth();
248
249 if bandwidth_mbps == 0 {
250 return self.calculate_cache_aware_chunk_size::<T>(totalelements);
251 }
252
253 let target_bytes_per_100ms = (bandwidth_mbps * 100) / 1000; let target_elements = (target_bytes_per_100ms * 1024 * 1024) / element_size;
256
257 let min_chunk = self.cache_line_size / element_size;
259 let max_chunk = totalelements / 4; target_elements.clamp(min_chunk, max_chunk)
262 }
263}
264
265impl Clone for MemoryPatternOptimizer {
266 fn clone(&self) -> Self {
267 Self {
268 cache_line_size: self.cache_line_size,
269 l1_cache_size: self.l1_cache_size,
270 l2_cache_size: self.l2_cache_size,
271 l3_cache_size: self.l3_cache_size,
272 numa_nodes: self.numa_nodes.clone(),
273 memorybandwidth: AtomicUsize::new(self.memorybandwidth.load(Ordering::Relaxed)),
274 processing_times: self.processing_times.clone(),
275 access_pattern_stats: self.access_pattern_stats.clone(),
276 }
277 }
278}
279
280impl Default for MemoryPatternOptimizer {
281 fn default() -> Self {
282 Self::new()
283 }
284}
285
286pub const OPTIMAL_CHUNK_SIZE: usize = 16 * 1024 * 1024;
289
290#[derive(Debug, Clone, Copy, PartialEq)]
292pub enum ChunkingStrategy {
293 Auto,
295 Fixed(usize),
297 FixedBytes(usize),
299 NumChunks(usize),
301 Advanced(AdvancedChunkingStrategy),
303}
304
305#[derive(Debug)]
307pub struct ChunkedArray<A, D>
308where
309 A: Clone,
310 D: Dimension,
311{
312 pub data: Array<A, D>,
314 pub strategy: ChunkingStrategy,
316 #[allow(dead_code)]
318 chunk_size: usize,
319 num_chunks: usize,
321 optimizer: Option<MemoryPatternOptimizer>,
323 phantom: PhantomData<A>,
325}
326
327impl<A, D> ChunkedArray<A, D>
328where
329 A: Clone,
330 D: Dimension,
331{
332 pub fn new<S: Data<Elem = A>>(data: ArrayBase<S, D>, strategy: ChunkingStrategy) -> Self {
334 let owned_data = data.to_owned();
335 let total_elements = data.len();
336 let elem_size = mem::size_of::<A>();
337
338 let mut optimizer = if matches!(strategy, ChunkingStrategy::Advanced(_)) {
340 Some(MemoryPatternOptimizer::new())
341 } else {
342 None
343 };
344
345 let (chunk_size, num_chunks) = match strategy {
347 ChunkingStrategy::Auto => {
348 let chunk_size_bytes = OPTIMAL_CHUNK_SIZE;
350 let chunk_size = chunk_size_bytes / elem_size;
351 let num_chunks = total_elements.div_ceil(chunk_size);
352 (chunk_size, num_chunks)
353 }
354 ChunkingStrategy::Fixed(size) => {
355 let num_chunks = total_elements.div_ceil(size);
356 (size, num_chunks)
357 }
358 ChunkingStrategy::FixedBytes(bytes) => {
359 let elements = bytes / elem_size;
360 let chunk_size = if elements == 0 { 1 } else { elements };
361 let num_chunks = total_elements.div_ceil(chunk_size);
362 (chunk_size, num_chunks)
363 }
364 ChunkingStrategy::NumChunks(n) => {
365 let num_chunks = if n == 0 { 1 } else { n };
366 let chunk_size = total_elements.div_ceil(num_chunks);
367 (chunk_size, num_chunks)
368 }
369 ChunkingStrategy::Advanced(advanced_strategy) => {
370 let opt = optimizer.as_mut().expect("Operation failed");
371 let chunk_size = match advanced_strategy {
372 AdvancedChunkingStrategy::CacheLineAligned => {
373 opt.calculate_cache_aware_chunk_size::<A>(total_elements)
374 }
375 AdvancedChunkingStrategy::NumaAware => {
376 opt.calculate_cache_aware_chunk_size::<A>(total_elements)
378 }
379 AdvancedChunkingStrategy::BandwidthOptimized => {
380 opt.calculatebandwidth_optimized_chunk_size::<A>(total_elements)
381 }
382 AdvancedChunkingStrategy::LatencyOptimized => {
383 opt.calculate_cache_aware_chunk_size::<A>(total_elements) / 2
385 }
386 AdvancedChunkingStrategy::Adaptive => {
387 opt.calculate_cache_aware_chunk_size::<A>(total_elements)
388 }
389 AdvancedChunkingStrategy::PowerAware => {
390 opt.calculate_cache_aware_chunk_size::<A>(total_elements) / 4
392 }
393 };
394 let num_chunks = total_elements.div_ceil(chunk_size);
395 (chunk_size, num_chunks)
396 }
397 };
398
399 Self {
400 data: owned_data,
401 strategy,
402 chunk_size,
403 num_chunks,
404 optimizer: Some(MemoryPatternOptimizer::new()),
405 phantom: PhantomData,
406 }
407 }
408
409 pub fn with_memory_optimization<S: Data<Elem = A>>(
411 data: ArrayBase<S, D>,
412 strategy: AdvancedChunkingStrategy,
413 ) -> Self {
414 Self::new(data, ChunkingStrategy::Advanced(strategy))
415 }
416
417 pub fn optimizer(&self) -> Option<&MemoryPatternOptimizer> {
419 self.optimizer.as_ref()
420 }
421
422 pub fn optimizer_mut(&mut self) -> Option<&mut MemoryPatternOptimizer> {
424 self.optimizer.as_mut()
425 }
426
427 pub fn record_processing_time(&mut self, duration: Duration) {
429 if let Some(ref mut optimizer) = self.optimizer {
430 optimizer.record_processing_time(duration);
431 }
432 }
433
434 pub fn update_access_pattern_stats(&mut self, stats: &AccessPatternStats) {
436 if let Some(ref mut optimizer) = self.optimizer {
437 optimizer.update_access_pattern_stats(stats);
438 }
439 }
440
441 pub fn map<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
445 where
446 F: Fn(&Array<A, D>) -> B + Sync,
447 B: Clone,
448 {
449 let chunks = self.get_chunks();
451 let results: Vec<B> = chunks.iter().map(f).collect();
452
453 Array::from_vec(results)
455 }
456
457 pub fn par_map<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
461 where
462 F: Fn(&Array<A, D>) -> B + Sync + Send,
463 B: Clone + Send + Sync,
464 A: Send + Sync,
465 {
466 #[cfg(feature = "parallel")]
467 {
468 use crate::parallel_ops::*;
469 use std::sync::Arc;
470
471 let chunks = self.get_chunks();
473 let chunks_arc = Arc::new(chunks);
474
475 let num_chunks = chunks_arc.len();
477 let results: Vec<B> = (0..num_chunks)
478 .into_par_iter()
479 .map(move |i| {
480 let chunks_ref = Arc::clone(&chunks_arc);
481 f(&chunks_ref[i])
482 })
483 .collect();
484
485 Array::from_vec(results)
487 }
488
489 #[cfg(not(feature = "parallel"))]
490 {
491 self.map(f)
493 }
494 }
495
496 pub fn map_withmonitoring<F, B>(&mut self, f: F) -> Array<B, crate::ndarray::Ix1>
498 where
499 F: Fn(&Array<A, D>) -> B + Sync,
500 B: Clone,
501 {
502 let start_time = Instant::now();
503 let chunks = self.get_chunks();
504 let mut results = Vec::with_capacity(chunks.len());
505
506 for chunk in chunks {
507 let chunk_start = Instant::now();
508 let result = f(&chunk);
509 let chunk_duration = chunk_start.elapsed();
510
511 if let Some(ref mut optimizer) = self.optimizer {
513 optimizer.record_processing_time(chunk_duration);
514 }
515
516 results.push(result);
517 }
518
519 let total_duration = start_time.elapsed();
520 self.record_processing_time(total_duration);
521
522 Array::from_vec(results)
523 }
524
525 pub fn map_numa_aware<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
527 where
528 F: Fn(&Array<A, D>) -> B + Sync + Send,
529 B: Clone + Send + Sync,
530 A: Send + Sync,
531 {
532 #[cfg(feature = "parallel")]
533 {
534 if let Some(ref optimizer) = self.optimizer {
535 use crate::parallel_ops::*;
536
537 let num_threads = crate::parallel_ops::get_num_threads();
539 let chunk_size = self.data.len() / num_threads.max(1);
540 let numa_chunks: Vec<_> = (0..num_threads)
541 .map(|i| {
542 let start = i * chunk_size;
543 let end = if i == num_threads - 1 {
544 self.data.len()
545 } else {
546 (i + 1) * chunk_size
547 };
548 start..end
549 })
550 .collect();
551 let chunks = self.get_chunks();
552
553 let results: Vec<B> = numa_chunks
554 .into_par_iter()
555 .enumerate()
556 .map(|(i, range)| {
557 if !chunks.is_empty() {
558 f(&chunks[0])
559 } else {
560 f(&chunks[chunks.len() - 1])
562 }
563 })
564 .collect();
565
566 return Array::from_vec(results);
567 }
568 }
569
570 self.par_map(f)
572 }
573
574 pub fn map_cache_optimized<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
576 where
577 F: Fn(&Array<A, D>) -> B + Sync,
578 B: Clone,
579 {
580 let chunks = self.get_chunks();
581 let mut results = Vec::with_capacity(chunks.len());
582
583 if let Some(ref optimizer) = self.optimizer {
584 let cache_aware_chunk_size =
586 optimizer.calculate_cache_aware_chunk_size::<A>(self.data.len());
587
588 if self.chunk_size > cache_aware_chunk_size {
590 for chunk in chunks {
591 let chunk_len = chunk.len();
593 let sub_chunk_size = cache_aware_chunk_size.min(chunk_len);
594
595 if chunk_len <= sub_chunk_size {
596 results.push(f(&chunk));
597 } else {
598 results.push(f(&chunk));
601 }
602 }
603 } else {
604 for chunk in chunks {
606 results.push(f(&chunk));
607 }
608 }
609 } else {
610 for chunk in chunks {
612 results.push(f(&chunk));
613 }
614 }
615
616 Array::from_vec(results)
617 }
618
619 pub fn mapbandwidth_aware<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
621 where
622 F: Fn(&Array<A, D>) -> B + Sync,
623 B: Clone,
624 {
625 let chunks = self.get_chunks();
626 let mut results = Vec::with_capacity(chunks.len());
627
628 if let Some(ref optimizer) = self.optimizer {
629 let bandwidth = optimizer.get_memorybandwidth();
630
631 if bandwidth > 0 {
632 let bandwidth_chunk_size =
634 optimizer.calculatebandwidth_optimized_chunk_size::<A>(self.data.len());
635
636 for chunk in chunks {
638 let chunk_start = Instant::now();
639 let result = f(&chunk);
640 let processing_time = chunk_start.elapsed();
641
642 let expected_time_ms = (chunk.len() * std::mem::size_of::<A>()) as f64
644 / (bandwidth as f64 * 1000.0);
645 let expected_duration = Duration::from_millis(expected_time_ms as u64);
646
647 if processing_time < expected_duration {
648 std::thread::sleep(expected_duration - processing_time);
649 }
650
651 results.push(result);
652 }
653 } else {
654 for chunk in chunks {
656 results.push(f(&chunk));
657 }
658 }
659 } else {
660 for chunk in chunks {
662 results.push(f(&chunk));
663 }
664 }
665
666 Array::from_vec(results)
667 }
668
669 pub fn map_power_aware<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
671 where
672 F: Fn(&Array<A, D>) -> B + Sync,
673 B: Clone,
674 {
675 let chunks = self.get_chunks();
676 let mut results = Vec::with_capacity(chunks.len());
677
678 for (i, chunk) in chunks.iter().enumerate() {
680 let result = f(chunk);
681 results.push(result);
682
683 if i % 4 == 3 {
685 std::thread::sleep(Duration::from_millis(1));
686 }
687 }
688
689 Array::from_vec(results)
690 }
691
692 pub fn measure_memorybandwidth(&mut self) -> Option<usize> {
694 if let Some(ref mut optimizer) = self.optimizer {
695 let start_time = Instant::now();
696 let chunk_size = 1024 * 1024; let test_data = vec![0u8; chunk_size];
698
699 let mut sum = 0u64;
701 for &byte in &test_data {
702 sum += byte as u64;
703 }
704
705 let _elapsed = start_time.elapsed();
706 let bandwidth_mbps = if std::time::Duration::from_secs(1).as_nanos() > 0 {
707 (chunk_size as u128 * 1000) / std::time::Duration::from_secs(1).as_nanos()
708 } else {
710 0
711 } as usize;
712
713 optimizer
714 .memorybandwidth
715 .store(bandwidth_mbps, std::sync::atomic::Ordering::Relaxed);
716
717 std::hint::black_box(sum);
719
720 Some(bandwidth_mbps)
721 } else {
722 None
723 }
724 }
725
726 pub fn map_optimized<F, B>(&mut self, f: F) -> Array<B, crate::ndarray::Ix1>
728 where
729 F: Fn(&Array<A, D>) -> B + Sync + Send,
730 B: Clone + Send + Sync,
731 A: Send + Sync,
732 {
733 if let Some(ref optimizer) = self.optimizer {
734 if optimizer.get_memorybandwidth() == 0 {
736 self.measure_memorybandwidth();
737 }
738
739 let data_size = self.data.len() * std::mem::size_of::<A>();
741
742 match self.strategy {
743 ChunkingStrategy::Advanced(AdvancedChunkingStrategy::NumaAware) => {
744 self.map_numa_aware(f)
745 }
746 ChunkingStrategy::Advanced(AdvancedChunkingStrategy::CacheLineAligned) => {
747 self.map_cache_optimized(f)
748 }
749 ChunkingStrategy::Advanced(AdvancedChunkingStrategy::BandwidthOptimized) => {
750 self.mapbandwidth_aware(f)
751 }
752 ChunkingStrategy::Advanced(AdvancedChunkingStrategy::PowerAware) => {
753 self.map_power_aware(f)
754 }
755 ChunkingStrategy::Advanced(AdvancedChunkingStrategy::Adaptive) => {
756 self.map_withmonitoring(f)
758 }
759 _ => {
760 if data_size > 100 * 1024 * 1024 {
762 self.map_numa_aware(f)
764 } else if data_size > 10 * 1024 * 1024 {
765 self.map_cache_optimized(f)
767 } else {
768 self.par_map(f)
770 }
771 }
772 }
773 } else {
774 self.par_map(f)
776 }
777 }
778
779 pub fn update_access_pattern_statistics(&mut self, processingtimes: &[Duration]) {
781 if let Some(ref mut optimizer) = self.optimizer {
782 let avg_time = if !processingtimes.is_empty() {
784 processingtimes.iter().map(|d| d.as_nanos()).sum::<u128>()
785 / processingtimes.len() as u128
786 } else {
787 0
788 };
789
790 let time_variance = if processingtimes.len() > 1 {
792 let variance = processingtimes
793 .iter()
794 .map(|d| {
795 let diff = d.as_nanos() as i128 - avg_time as i128;
796 (diff * diff) as u128
797 })
798 .sum::<u128>()
799 / (processingtimes.len() - 1) as u128;
800 variance as f64 / avg_time as f64
801 } else {
802 0.0
803 };
804
805 let cache_hit_ratio = (1.0 - time_variance.min(1.0)).max(0.0);
806
807 let stats = AccessPatternStats {
808 sequential_access_ratio: 0.8, random_access_ratio: 0.1,
810 strided_access_ratio: 0.1,
811 cache_hit_ratio,
812 memorybandwidth_utilization: 0.7, last_updated: Some(Instant::now()),
814 };
815
816 optimizer.update_access_pattern_stats(&stats);
817 }
818 }
819
820 pub fn num_chunks(&self) -> usize {
822 self.num_chunks
823 }
824
825 pub fn chunk_size(&self) -> usize {
827 self.chunk_size
828 }
829
830 pub fn get_chunks(&self) -> Vec<Array<A, D>>
832 where
833 D: Clone,
834 {
835 let mut result = Vec::with_capacity(self.num_chunks);
836
837 if self.data.ndim() == 1 {
839 if let Some(slice) = self.data.as_slice() {
840 for i in 0..self.num_chunks {
841 let start = i * self.chunk_size;
842 let end = ((i + 1) * self.chunk_size).min(slice.len());
843 let chunk_slice = &slice[start..end];
844
845 let chunk_1d = Array::from_vec(chunk_slice.to_vec());
848
849 if let Ok(reshaped) = chunk_1d.into_dimensionality::<D>() {
852 result.push(reshaped);
853 } else {
854 return vec![self.data.clone()];
856 }
857 }
858 return result;
859 }
860 }
861
862 result.push(self.data.clone());
864 result
865 }
866}
867
868#[allow(dead_code)]
880pub fn chunk_wise_op<A, F, B, S, D>(
881 array: &ArrayBase<S, D>,
882 op: F,
883 strategy: ChunkingStrategy,
884) -> Result<Array<B, D>, CoreError>
885where
886 A: Clone,
887 S: Data<Elem = A>,
888 F: Fn(&ArrayBase<S, D>) -> Array<B, D>,
889 B: Clone,
890 D: Dimension + Clone,
891{
892 validation::check_not_empty(array)?;
893
894 if array.len() <= 1000 {
896 return Ok(op(array));
897 }
898
899 let _chunked = ChunkedArray::new(array.to_owned(), strategy);
900
901 let resultshape = array.raw_dim().clone();
906 let result = op(array);
907
908 if result.raw_dim() != array.raw_dim() {
910 return Err(CoreError::ValidationError(
911 ErrorContext::new(format!(
912 "Operation changed shape from {:?} to {:?}",
913 array.shape(),
914 result.shape()
915 ))
916 .with_location(ErrorLocation::new(file!(), line!())),
917 ));
918 }
919
920 Ok(result)
921}
922
923#[allow(dead_code)]
936pub fn chunk_wise_binary_op<A, B, F, C, S1, S2, D>(
937 lhs: &ArrayBase<S1, D>,
938 rhs: &ArrayBase<S2, D>,
939 op: F,
940 strategy: ChunkingStrategy,
941) -> Result<Array<C, D>, CoreError>
942where
943 A: Clone,
944 B: Clone,
945 S1: Data<Elem = A>,
946 S2: Data<Elem = B>,
947 F: Fn(&ArrayBase<S1, D>, &ArrayBase<S2, D>) -> Array<C, D>,
948 C: Clone,
949 D: Dimension + Clone,
950{
951 validation::checkshapes_match(lhs.shape(), rhs.shape())?;
952 validation::check_not_empty(lhs)?;
953
954 if lhs.len() <= 1000 {
956 return Ok(op(lhs, rhs));
957 }
958
959 let chunked_lhs = ChunkedArray::new(lhs.to_owned(), strategy);
961 let chunked_rhs = ChunkedArray::new(rhs.to_owned(), strategy);
962
963 let result = op(lhs, rhs);
966
967 if result.shape() != lhs.shape() {
969 return Err(CoreError::ValidationError(
970 ErrorContext::new(format!(
971 "Binary operation changed shape from {:?} to {:?}",
972 lhs.shape(),
973 result.shape()
974 ))
975 .with_location(ErrorLocation::new(file!(), line!())),
976 ));
977 }
978
979 Ok(result)
980}
981
982#[allow(dead_code)]
995pub fn chunk_wise_reduce<A, F, G, B, S, D>(
996 array: &ArrayBase<S, D>,
997 chunk_op: F,
998 combine: G,
999 strategy: ChunkingStrategy,
1000) -> Result<B, CoreError>
1001where
1002 A: Clone,
1003 S: Data<Elem = A>,
1004 F: Fn(&ArrayBase<S, D>) -> B + Sync + Send,
1005 G: Fn(Vec<B>) -> B,
1006 B: Clone + Send + Sync,
1007 D: Dimension + Clone,
1008{
1009 validation::check_not_empty(array)?;
1010
1011 if array.len() <= 1000 {
1013 return Ok(chunk_op(array));
1014 }
1015
1016 let chunked = ChunkedArray::new(array.to_owned(), strategy);
1017
1018 let result = chunk_op(array);
1024 Ok(result)
1025}