1use super::validation;
2use crate::error::{CoreError, ErrorContext, ErrorLocation};
3use ::ndarray::{Array, ArrayBase, Data, Dimension};
4use std::marker::PhantomData;
5use std::mem;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::time::{Duration, Instant};
8
9#[derive(Debug)]
11pub struct MemoryPatternOptimizer {
12 pub cache_line_size: usize,
14 pub l1_cache_size: usize,
16 pub l2_cache_size: usize,
18 pub l3_cache_size: usize,
20 pub numa_nodes: Vec<NumaNodeInfo>,
22 pub memorybandwidth: AtomicUsize, pub processing_times: Vec<Duration>,
26 pub access_pattern_stats: AccessPatternStats,
28}
29
30#[derive(Debug, Clone)]
32pub struct NumaNodeInfo {
33 pub nodeid: usize,
34 pub available_memory: usize,
35 pub cpu_cores: Vec<usize>,
36 pub memorybandwidth: usize, }
38
39#[derive(Debug, Clone, Default)]
41pub struct AccessPatternStats {
42 pub sequential_access_ratio: f64,
43 pub random_access_ratio: f64,
44 pub strided_access_ratio: f64,
45 pub cache_hit_ratio: f64,
46 pub memorybandwidth_utilization: f64,
47 pub last_updated: Option<Instant>,
48}
49
50#[derive(Debug, Clone, Copy, PartialEq)]
52pub enum AdvancedChunkingStrategy {
53 CacheLineAligned,
55 NumaAware,
57 BandwidthOptimized,
59 LatencyOptimized,
61 Adaptive,
63 PowerAware,
65 LargeData,
67 HugeDataStreaming {
69 max_memory_mb: usize,
71 },
72 AdaptiveLarge,
74}
75
76impl MemoryPatternOptimizer {
77 pub fn new() -> Self {
79 Self {
80 cache_line_size: Self::detect_cache_line_size(),
81 l1_cache_size: Self::detect_l1_cache_size(),
82 l2_cache_size: Self::detect_l2_cache_size(),
83 l3_cache_size: Self::detect_l3_cache_size(),
84 numa_nodes: Self::detect_numa_topology(),
85 memorybandwidth: AtomicUsize::new(0),
86 processing_times: Vec::new(),
87 access_pattern_stats: AccessPatternStats::default(),
88 }
89 }
90
91 fn detect_cache_line_size() -> usize {
93 #[cfg(target_arch = "x86_64")]
95 {
96 64 }
98 #[cfg(target_arch = "aarch64")]
99 {
100 128 }
102 #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
103 {
104 64 }
106 }
107
108 fn detect_l1_cache_size() -> usize {
110 32 * 1024 }
113
114 fn detect_l2_cache_size() -> usize {
116 256 * 1024 }
118
119 fn detect_l3_cache_size() -> usize {
121 8 * 1024 * 1024 }
123
124 fn detect_numa_topology() -> Vec<NumaNodeInfo> {
126 let cores = std::thread::available_parallelism()
128 .map(|n| n.get())
129 .unwrap_or(1);
130
131 vec![NumaNodeInfo {
132 nodeid: 0,
133 available_memory: 4 * 1024 * 1024 * 1024, cpu_cores: (0..cores).collect(),
135 memorybandwidth: 25000, }]
137 }
138
139 pub fn calculate_cache_aware_chunk_size<T>(&self, totalelements: usize) -> usize {
141 let element_size = std::mem::size_of::<T>();
142
143 let target_cache_size = self.l2_cache_size / 2; let max_elements_in_cache = target_cache_size / element_size;
146
147 let cache_line_elements = self.cache_line_size / element_size;
149 let aligned_chunk_size =
150 (max_elements_in_cache / cache_line_elements) * cache_line_elements;
151
152 std::cmp::max(aligned_chunk_size, cache_line_elements)
154 }
155
156 pub fn threads(&self, total_elements: usize, numthreads: usize) -> Vec<(usize, usize)> {
158 let mut chunks = Vec::new();
159
160 if self.numa_nodes.len() <= 1 {
161 let chunk_size = total_elements / numthreads;
163 for i in 0..numthreads {
164 let start = i * chunk_size;
165 let end = if i == numthreads - 1 {
166 total_elements
167 } else {
168 (i + 1) * chunk_size
169 };
170 chunks.push((start, end));
171 }
172 } else {
173 let threads_per_node = numthreads / self.numa_nodes.len();
175 let elements_per_node = total_elements / self.numa_nodes.len();
176
177 for (nodeidx, node) in self.numa_nodes.iter().enumerate() {
178 let node_start = nodeidx * elements_per_node;
179 let node_end = if nodeidx == self.numa_nodes.len() - 1 {
180 total_elements
181 } else {
182 (nodeidx + 1) * elements_per_node
183 };
184
185 let node_elements = node_end - node_start;
186 let node_chunk_size = node_elements / threads_per_node;
187
188 for thread_idx in 0..threads_per_node {
189 let start = node_start + thread_idx * node_chunk_size;
190 let end = if thread_idx == threads_per_node - 1 {
191 node_end
192 } else {
193 node_start + (thread_idx + 1) * node_chunk_size
194 };
195 chunks.push((start, end));
196 }
197 }
198 }
199
200 chunks
201 }
202
203 pub fn elements(&self, totalelements: usize) -> usize {
205 if self.processing_times.is_empty() {
206 return self.calculate_cache_aware_chunk_size::<u64>(totalelements);
208 }
209
210 let recent_times: Vec<_> = self.processing_times.iter().rev().take(10).collect();
212 let avg_time =
213 recent_times.iter().map(|t| t.as_nanos()).sum::<u128>() / recent_times.len() as u128;
214
215 let target_time_ns = 75_000_000; if avg_time > target_time_ns {
219 let current_default = self.calculate_cache_aware_chunk_size::<u64>(totalelements);
221 current_default * 3 / 4
222 } else if avg_time < target_time_ns / 2 {
223 let current_default = self.calculate_cache_aware_chunk_size::<u64>(totalelements);
225 current_default * 5 / 4
226 } else {
227 self.calculate_cache_aware_chunk_size::<u64>(totalelements)
229 }
230 }
231
232 pub fn record_processing_time(&mut self, duration: Duration) {
234 self.processing_times.push(duration);
235
236 if self.processing_times.len() > 100 {
238 self.processing_times.drain(0..50);
239 }
240 }
241
242 pub fn update_access_pattern_stats(&mut self, pattern: &AccessPatternStats) {
244 self.access_pattern_stats = pattern.clone();
245 self.access_pattern_stats.last_updated = Some(Instant::now());
246 }
247
248 pub fn get_memorybandwidth(&self) -> usize {
250 self.memorybandwidth.load(Ordering::Relaxed)
251 }
252
253 pub fn calculatebandwidth_optimized_chunk_size<T>(&self, totalelements: usize) -> usize {
255 let element_size = std::mem::size_of::<T>();
256 let bandwidth_mbps = self.get_memorybandwidth();
257
258 if bandwidth_mbps == 0 {
259 return self.calculate_cache_aware_chunk_size::<T>(totalelements);
260 }
261
262 let target_bytes_per_100ms = (bandwidth_mbps * 100) / 1000; let target_elements = (target_bytes_per_100ms * 1024 * 1024) / element_size;
265
266 let min_chunk = self.cache_line_size / element_size;
268 let max_chunk = totalelements / 4; target_elements.clamp(min_chunk, max_chunk)
271 }
272}
273
274impl Clone for MemoryPatternOptimizer {
275 fn clone(&self) -> Self {
276 Self {
277 cache_line_size: self.cache_line_size,
278 l1_cache_size: self.l1_cache_size,
279 l2_cache_size: self.l2_cache_size,
280 l3_cache_size: self.l3_cache_size,
281 numa_nodes: self.numa_nodes.clone(),
282 memorybandwidth: AtomicUsize::new(self.memorybandwidth.load(Ordering::Relaxed)),
283 processing_times: self.processing_times.clone(),
284 access_pattern_stats: self.access_pattern_stats.clone(),
285 }
286 }
287}
288
289impl Default for MemoryPatternOptimizer {
290 fn default() -> Self {
291 Self::new()
292 }
293}
294
295pub const OPTIMAL_CHUNK_SIZE: usize = 16 * 1024 * 1024;
298
299#[derive(Debug, Clone, Copy, PartialEq)]
301pub enum ChunkingStrategy {
302 Auto,
304 Fixed(usize),
306 FixedBytes(usize),
308 NumChunks(usize),
310 Advanced(AdvancedChunkingStrategy),
312}
313
314#[derive(Debug)]
316pub struct ChunkedArray<A, D>
317where
318 A: Clone,
319 D: Dimension,
320{
321 pub data: Array<A, D>,
323 pub strategy: ChunkingStrategy,
325 #[allow(dead_code)]
327 chunk_size: usize,
328 num_chunks: usize,
330 optimizer: Option<MemoryPatternOptimizer>,
332 phantom: PhantomData<A>,
334}
335
336impl<A, D> ChunkedArray<A, D>
337where
338 A: Clone,
339 D: Dimension,
340{
341 pub fn new<S: Data<Elem = A>>(data: ArrayBase<S, D>, strategy: ChunkingStrategy) -> Self {
343 let owned_data = data.to_owned();
344 let total_elements = data.len();
345 let elem_size = mem::size_of::<A>();
346
347 let mut optimizer = if matches!(strategy, ChunkingStrategy::Advanced(_)) {
349 Some(MemoryPatternOptimizer::new())
350 } else {
351 None
352 };
353
354 let (chunk_size, num_chunks) = match strategy {
356 ChunkingStrategy::Auto => {
357 let chunk_size_bytes = OPTIMAL_CHUNK_SIZE;
359 let chunk_size = chunk_size_bytes / elem_size;
360 let num_chunks = total_elements.div_ceil(chunk_size);
361 (chunk_size, num_chunks)
362 }
363 ChunkingStrategy::Fixed(size) => {
364 let num_chunks = total_elements.div_ceil(size);
365 (size, num_chunks)
366 }
367 ChunkingStrategy::FixedBytes(bytes) => {
368 let elements = bytes / elem_size;
369 let chunk_size = if elements == 0 { 1 } else { elements };
370 let num_chunks = total_elements.div_ceil(chunk_size);
371 (chunk_size, num_chunks)
372 }
373 ChunkingStrategy::NumChunks(n) => {
374 let num_chunks = if n == 0 { 1 } else { n };
375 let chunk_size = total_elements.div_ceil(num_chunks);
376 (chunk_size, num_chunks)
377 }
378 ChunkingStrategy::Advanced(advanced_strategy) => {
379 let opt = optimizer.as_mut().expect("Operation failed");
380 let chunk_size = match advanced_strategy {
381 AdvancedChunkingStrategy::CacheLineAligned => {
382 opt.calculate_cache_aware_chunk_size::<A>(total_elements)
383 }
384 AdvancedChunkingStrategy::NumaAware => {
385 opt.calculate_cache_aware_chunk_size::<A>(total_elements)
387 }
388 AdvancedChunkingStrategy::BandwidthOptimized => {
389 opt.calculatebandwidth_optimized_chunk_size::<A>(total_elements)
390 }
391 AdvancedChunkingStrategy::LatencyOptimized => {
392 opt.calculate_cache_aware_chunk_size::<A>(total_elements) / 2
394 }
395 AdvancedChunkingStrategy::Adaptive => {
396 opt.calculate_cache_aware_chunk_size::<A>(total_elements)
397 }
398 AdvancedChunkingStrategy::PowerAware => {
399 opt.calculate_cache_aware_chunk_size::<A>(total_elements) / 4
401 }
402 AdvancedChunkingStrategy::LargeData => {
403 let target_bytes = 512 * 1024 * 1024;
406 (target_bytes / elem_size).max(1).min(total_elements)
407 }
408 AdvancedChunkingStrategy::HugeDataStreaming { max_memory_mb } => {
409 let target_bytes = max_memory_mb * 1024 * 1024;
411 (target_bytes / elem_size).max(1).min(total_elements)
412 }
413 AdvancedChunkingStrategy::AdaptiveLarge => {
414 use super::platform_memory::PlatformMemoryInfo;
416
417 let available_mem = PlatformMemoryInfo::detect()
418 .map(|info| info.available_memory)
419 .unwrap_or(512 * 1024 * 1024);
420
421 let target_bytes =
423 (available_mem / 4).clamp(256 * 1024 * 1024, 2 * 1024 * 1024 * 1024); (target_bytes / elem_size).max(1).min(total_elements)
426 }
427 };
428 let num_chunks = total_elements.div_ceil(chunk_size);
429 (chunk_size, num_chunks)
430 }
431 };
432
433 Self {
434 data: owned_data,
435 strategy,
436 chunk_size,
437 num_chunks,
438 optimizer: Some(MemoryPatternOptimizer::new()),
439 phantom: PhantomData,
440 }
441 }
442
443 pub fn with_memory_optimization<S: Data<Elem = A>>(
445 data: ArrayBase<S, D>,
446 strategy: AdvancedChunkingStrategy,
447 ) -> Self {
448 Self::new(data, ChunkingStrategy::Advanced(strategy))
449 }
450
451 pub fn optimizer(&self) -> Option<&MemoryPatternOptimizer> {
453 self.optimizer.as_ref()
454 }
455
456 pub fn optimizer_mut(&mut self) -> Option<&mut MemoryPatternOptimizer> {
458 self.optimizer.as_mut()
459 }
460
461 pub fn record_processing_time(&mut self, duration: Duration) {
463 if let Some(ref mut optimizer) = self.optimizer {
464 optimizer.record_processing_time(duration);
465 }
466 }
467
468 pub fn update_access_pattern_stats(&mut self, stats: &AccessPatternStats) {
470 if let Some(ref mut optimizer) = self.optimizer {
471 optimizer.update_access_pattern_stats(stats);
472 }
473 }
474
475 pub fn map<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
479 where
480 F: Fn(&Array<A, D>) -> B + Sync,
481 B: Clone,
482 {
483 let chunks = self.get_chunks();
485 let results: Vec<B> = chunks.iter().map(f).collect();
486
487 Array::from_vec(results)
489 }
490
491 pub fn par_map<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
495 where
496 F: Fn(&Array<A, D>) -> B + Sync + Send,
497 B: Clone + Send + Sync,
498 A: Send + Sync,
499 {
500 #[cfg(feature = "parallel")]
501 {
502 use crate::parallel_ops::*;
503 use std::sync::Arc;
504
505 let chunks = self.get_chunks();
507 let chunks_arc = Arc::new(chunks);
508
509 let num_chunks = chunks_arc.len();
511 let results: Vec<B> = (0..num_chunks)
512 .into_par_iter()
513 .map(move |i| {
514 let chunks_ref = Arc::clone(&chunks_arc);
515 f(&chunks_ref[i])
516 })
517 .collect();
518
519 Array::from_vec(results)
521 }
522
523 #[cfg(not(feature = "parallel"))]
524 {
525 self.map(f)
527 }
528 }
529
530 pub fn map_withmonitoring<F, B>(&mut self, f: F) -> Array<B, crate::ndarray::Ix1>
532 where
533 F: Fn(&Array<A, D>) -> B + Sync,
534 B: Clone,
535 {
536 let start_time = Instant::now();
537 let chunks = self.get_chunks();
538 let mut results = Vec::with_capacity(chunks.len());
539
540 for chunk in chunks {
541 let chunk_start = Instant::now();
542 let result = f(&chunk);
543 let chunk_duration = chunk_start.elapsed();
544
545 if let Some(ref mut optimizer) = self.optimizer {
547 optimizer.record_processing_time(chunk_duration);
548 }
549
550 results.push(result);
551 }
552
553 let total_duration = start_time.elapsed();
554 self.record_processing_time(total_duration);
555
556 Array::from_vec(results)
557 }
558
559 pub fn map_numa_aware<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
561 where
562 F: Fn(&Array<A, D>) -> B + Sync + Send,
563 B: Clone + Send + Sync,
564 A: Send + Sync,
565 {
566 #[cfg(feature = "parallel")]
567 {
568 if let Some(ref optimizer) = self.optimizer {
569 use crate::parallel_ops::*;
570
571 let num_threads = crate::parallel_ops::get_num_threads();
573 let chunk_size = self.data.len() / num_threads.max(1);
574 let numa_chunks: Vec<_> = (0..num_threads)
575 .map(|i| {
576 let start = i * chunk_size;
577 let end = if i == num_threads - 1 {
578 self.data.len()
579 } else {
580 (i + 1) * chunk_size
581 };
582 start..end
583 })
584 .collect();
585 let chunks = self.get_chunks();
586
587 let results: Vec<B> = numa_chunks
588 .into_par_iter()
589 .enumerate()
590 .map(|(i, range)| {
591 if !chunks.is_empty() {
592 f(&chunks[0])
593 } else {
594 f(&chunks[chunks.len() - 1])
596 }
597 })
598 .collect();
599
600 return Array::from_vec(results);
601 }
602 }
603
604 self.par_map(f)
606 }
607
608 pub fn map_cache_optimized<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
610 where
611 F: Fn(&Array<A, D>) -> B + Sync,
612 B: Clone,
613 {
614 let chunks = self.get_chunks();
615 let mut results = Vec::with_capacity(chunks.len());
616
617 if let Some(ref optimizer) = self.optimizer {
618 let cache_aware_chunk_size =
620 optimizer.calculate_cache_aware_chunk_size::<A>(self.data.len());
621
622 if self.chunk_size > cache_aware_chunk_size {
624 for chunk in chunks {
625 let chunk_len = chunk.len();
627 let sub_chunk_size = cache_aware_chunk_size.min(chunk_len);
628
629 if chunk_len <= sub_chunk_size {
630 results.push(f(&chunk));
631 } else {
632 results.push(f(&chunk));
635 }
636 }
637 } else {
638 for chunk in chunks {
640 results.push(f(&chunk));
641 }
642 }
643 } else {
644 for chunk in chunks {
646 results.push(f(&chunk));
647 }
648 }
649
650 Array::from_vec(results)
651 }
652
653 pub fn mapbandwidth_aware<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
655 where
656 F: Fn(&Array<A, D>) -> B + Sync,
657 B: Clone,
658 {
659 let chunks = self.get_chunks();
660 let mut results = Vec::with_capacity(chunks.len());
661
662 if let Some(ref optimizer) = self.optimizer {
663 let bandwidth = optimizer.get_memorybandwidth();
664
665 if bandwidth > 0 {
666 let bandwidth_chunk_size =
668 optimizer.calculatebandwidth_optimized_chunk_size::<A>(self.data.len());
669
670 for chunk in chunks {
672 let chunk_start = Instant::now();
673 let result = f(&chunk);
674 let processing_time = chunk_start.elapsed();
675
676 let expected_time_ms = (chunk.len() * std::mem::size_of::<A>()) as f64
678 / (bandwidth as f64 * 1000.0);
679 let expected_duration = Duration::from_millis(expected_time_ms as u64);
680
681 if processing_time < expected_duration {
682 std::thread::sleep(expected_duration - processing_time);
683 }
684
685 results.push(result);
686 }
687 } else {
688 for chunk in chunks {
690 results.push(f(&chunk));
691 }
692 }
693 } else {
694 for chunk in chunks {
696 results.push(f(&chunk));
697 }
698 }
699
700 Array::from_vec(results)
701 }
702
703 pub fn map_power_aware<F, B>(&self, f: F) -> Array<B, crate::ndarray::Ix1>
705 where
706 F: Fn(&Array<A, D>) -> B + Sync,
707 B: Clone,
708 {
709 let chunks = self.get_chunks();
710 let mut results = Vec::with_capacity(chunks.len());
711
712 for (i, chunk) in chunks.iter().enumerate() {
714 let result = f(chunk);
715 results.push(result);
716
717 if i % 4 == 3 {
719 std::thread::sleep(Duration::from_millis(1));
720 }
721 }
722
723 Array::from_vec(results)
724 }
725
726 pub fn measure_memorybandwidth(&mut self) -> Option<usize> {
728 if let Some(ref mut optimizer) = self.optimizer {
729 let start_time = Instant::now();
730 let chunk_size = 1024 * 1024; let test_data = vec![0u8; chunk_size];
732
733 let mut sum = 0u64;
735 for &byte in &test_data {
736 sum += byte as u64;
737 }
738
739 let _elapsed = start_time.elapsed();
740 let bandwidth_mbps = if std::time::Duration::from_secs(1).as_nanos() > 0 {
741 (chunk_size as u128 * 1000) / std::time::Duration::from_secs(1).as_nanos()
742 } else {
744 0
745 } as usize;
746
747 optimizer
748 .memorybandwidth
749 .store(bandwidth_mbps, std::sync::atomic::Ordering::Relaxed);
750
751 std::hint::black_box(sum);
753
754 Some(bandwidth_mbps)
755 } else {
756 None
757 }
758 }
759
760 pub fn map_optimized<F, B>(&mut self, f: F) -> Array<B, crate::ndarray::Ix1>
762 where
763 F: Fn(&Array<A, D>) -> B + Sync + Send,
764 B: Clone + Send + Sync,
765 A: Send + Sync,
766 {
767 if let Some(ref optimizer) = self.optimizer {
768 if optimizer.get_memorybandwidth() == 0 {
770 self.measure_memorybandwidth();
771 }
772
773 let data_size = self.data.len() * std::mem::size_of::<A>();
775
776 match self.strategy {
777 ChunkingStrategy::Advanced(AdvancedChunkingStrategy::NumaAware) => {
778 self.map_numa_aware(f)
779 }
780 ChunkingStrategy::Advanced(AdvancedChunkingStrategy::CacheLineAligned) => {
781 self.map_cache_optimized(f)
782 }
783 ChunkingStrategy::Advanced(AdvancedChunkingStrategy::BandwidthOptimized) => {
784 self.mapbandwidth_aware(f)
785 }
786 ChunkingStrategy::Advanced(AdvancedChunkingStrategy::PowerAware) => {
787 self.map_power_aware(f)
788 }
789 ChunkingStrategy::Advanced(AdvancedChunkingStrategy::Adaptive) => {
790 self.map_withmonitoring(f)
792 }
793 _ => {
794 if data_size > 100 * 1024 * 1024 {
796 self.map_numa_aware(f)
798 } else if data_size > 10 * 1024 * 1024 {
799 self.map_cache_optimized(f)
801 } else {
802 self.par_map(f)
804 }
805 }
806 }
807 } else {
808 self.par_map(f)
810 }
811 }
812
813 pub fn update_access_pattern_statistics(&mut self, processingtimes: &[Duration]) {
815 if let Some(ref mut optimizer) = self.optimizer {
816 let avg_time = if !processingtimes.is_empty() {
818 processingtimes.iter().map(|d| d.as_nanos()).sum::<u128>()
819 / processingtimes.len() as u128
820 } else {
821 0
822 };
823
824 let time_variance = if processingtimes.len() > 1 {
826 let variance = processingtimes
827 .iter()
828 .map(|d| {
829 let diff = d.as_nanos() as i128 - avg_time as i128;
830 (diff * diff) as u128
831 })
832 .sum::<u128>()
833 / (processingtimes.len() - 1) as u128;
834 variance as f64 / avg_time as f64
835 } else {
836 0.0
837 };
838
839 let cache_hit_ratio = (1.0 - time_variance.min(1.0)).max(0.0);
840
841 let stats = AccessPatternStats {
842 sequential_access_ratio: 0.8, random_access_ratio: 0.1,
844 strided_access_ratio: 0.1,
845 cache_hit_ratio,
846 memorybandwidth_utilization: 0.7, last_updated: Some(Instant::now()),
848 };
849
850 optimizer.update_access_pattern_stats(&stats);
851 }
852 }
853
854 pub fn num_chunks(&self) -> usize {
856 self.num_chunks
857 }
858
859 pub fn chunk_size(&self) -> usize {
861 self.chunk_size
862 }
863
864 pub fn get_chunks(&self) -> Vec<Array<A, D>>
866 where
867 D: Clone,
868 {
869 let mut result = Vec::with_capacity(self.num_chunks);
870
871 if self.data.ndim() == 1 {
873 if let Some(slice) = self.data.as_slice() {
874 for i in 0..self.num_chunks {
875 let start = i * self.chunk_size;
876 let end = ((i + 1) * self.chunk_size).min(slice.len());
877 let chunk_slice = &slice[start..end];
878
879 let chunk_1d = Array::from_vec(chunk_slice.to_vec());
882
883 if let Ok(reshaped) = chunk_1d.into_dimensionality::<D>() {
886 result.push(reshaped);
887 } else {
888 return vec![self.data.clone()];
890 }
891 }
892 return result;
893 }
894 }
895
896 result.push(self.data.clone());
898 result
899 }
900}
901
902#[allow(dead_code)]
914pub fn chunk_wise_op<A, F, B, S, D>(
915 array: &ArrayBase<S, D>,
916 op: F,
917 strategy: ChunkingStrategy,
918) -> Result<Array<B, D>, CoreError>
919where
920 A: Clone,
921 S: Data<Elem = A>,
922 F: Fn(&ArrayBase<S, D>) -> Array<B, D>,
923 B: Clone,
924 D: Dimension + Clone,
925{
926 validation::check_not_empty(array)?;
927
928 if array.len() <= 1000 {
930 return Ok(op(array));
931 }
932
933 let _chunked = ChunkedArray::new(array.to_owned(), strategy);
934
935 let resultshape = array.raw_dim().clone();
940 let result = op(array);
941
942 if result.raw_dim() != array.raw_dim() {
944 return Err(CoreError::ValidationError(
945 ErrorContext::new(format!(
946 "Operation changed shape from {:?} to {:?}",
947 array.shape(),
948 result.shape()
949 ))
950 .with_location(ErrorLocation::new(file!(), line!())),
951 ));
952 }
953
954 Ok(result)
955}
956
957#[allow(dead_code)]
970pub fn chunk_wise_binary_op<A, B, F, C, S1, S2, D>(
971 lhs: &ArrayBase<S1, D>,
972 rhs: &ArrayBase<S2, D>,
973 op: F,
974 strategy: ChunkingStrategy,
975) -> Result<Array<C, D>, CoreError>
976where
977 A: Clone,
978 B: Clone,
979 S1: Data<Elem = A>,
980 S2: Data<Elem = B>,
981 F: Fn(&ArrayBase<S1, D>, &ArrayBase<S2, D>) -> Array<C, D>,
982 C: Clone,
983 D: Dimension + Clone,
984{
985 validation::checkshapes_match(lhs.shape(), rhs.shape())?;
986 validation::check_not_empty(lhs)?;
987
988 if lhs.len() <= 1000 {
990 return Ok(op(lhs, rhs));
991 }
992
993 let chunked_lhs = ChunkedArray::new(lhs.to_owned(), strategy);
995 let chunked_rhs = ChunkedArray::new(rhs.to_owned(), strategy);
996
997 let result = op(lhs, rhs);
1000
1001 if result.shape() != lhs.shape() {
1003 return Err(CoreError::ValidationError(
1004 ErrorContext::new(format!(
1005 "Binary operation changed shape from {:?} to {:?}",
1006 lhs.shape(),
1007 result.shape()
1008 ))
1009 .with_location(ErrorLocation::new(file!(), line!())),
1010 ));
1011 }
1012
1013 Ok(result)
1014}
1015
1016#[allow(dead_code)]
1029pub fn chunk_wise_reduce<A, F, G, B, S, D>(
1030 array: &ArrayBase<S, D>,
1031 chunk_op: F,
1032 combine: G,
1033 strategy: ChunkingStrategy,
1034) -> Result<B, CoreError>
1035where
1036 A: Clone,
1037 S: Data<Elem = A>,
1038 F: Fn(&ArrayBase<S, D>) -> B + Sync + Send,
1039 G: Fn(Vec<B>) -> B,
1040 B: Clone + Send + Sync,
1041 D: Dimension + Clone,
1042{
1043 validation::check_not_empty(array)?;
1044
1045 if array.len() <= 1000 {
1047 return Ok(chunk_op(array));
1048 }
1049
1050 let chunked = ChunkedArray::new(array.to_owned(), strategy);
1051
1052 let result = chunk_op(array);
1058 Ok(result)
1059}