1use scirs2_core::ndarray::{
7 Array, ArrayView, ArrayViewMut, Dimension, IxDyn, Slice, SliceInfoElem,
8};
9use scirs2_core::numeric::{Float, FromPrimitive, Zero};
10use scirs2_core::parallel_ops::*;
12use std::fmt::Debug;
13use std::fs::File;
14use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
15use std::path::Path;
16
17use crate::error::{NdimageError, NdimageResult};
18use crate::filters::BorderMode;
19
20#[derive(Debug, Clone)]
22pub struct StreamConfig {
23 pub chunk_size: usize,
25 pub overlap: Vec<usize>,
27 pub use_mmap: bool,
29 pub cache_chunks: usize,
31 pub temp_dir: Option<String>,
33}
34
35impl Default for StreamConfig {
36 fn default() -> Self {
37 Self {
38 chunk_size: 128 * 1024 * 1024, overlap: vec![],
40 use_mmap: true,
41 cache_chunks: 4,
42 temp_dir: None,
43 }
44 }
45}
46
47pub trait StreamableOp<T, D>: Send + Sync
49where
50 T: Float + FromPrimitive + Debug + Clone,
51 D: Dimension,
52{
53 fn apply_chunk(&self, chunk: &ArrayView<T, D>) -> NdimageResult<Array<T, D>>;
55
56 fn required_overlap(&self) -> Vec<usize>;
58
59 fn merge_overlap(
61 &self,
62 output: &mut ArrayViewMut<T, D>,
63 new_chunk: &ArrayView<T, D>,
64 overlap_info: &OverlapInfo,
65 ) -> NdimageResult<()>;
66}
67
68#[derive(Debug, Clone)]
70pub struct OverlapInfo {
71 pub dimension: usize,
73 pub output_start: usize,
75 pub output_end: usize,
77 pub overlap_size: usize,
79}
80
81pub struct StreamProcessor<T> {
83 config: StreamConfig,
84 phantom: std::marker::PhantomData<T>,
85}
86
87impl<T> StreamProcessor<T>
88where
89 T: Float + FromPrimitive + Debug + Clone + Send + Sync + 'static,
90{
91 pub fn new(config: StreamConfig) -> Self {
92 Self {
93 config,
94 phantom: std::marker::PhantomData,
95 }
96 }
97
98 pub fn process_file<D, Op>(
100 &self,
101 input_path: &Path,
102 output_path: &Path,
103 shape: &[usize],
104 op: Op,
105 ) -> NdimageResult<()>
106 where
107 D: Dimension,
108 Op: StreamableOp<T, D>,
109 {
110 let element_size = std::mem::size_of::<T>();
111 let total_elements: usize = shape.iter().product();
112 let total_size = total_elements * element_size;
113
114 let chunk_dims = self.calculate_chunk_dimensions(shape, element_size)?;
116
117 let mut input_file = BufReader::new(File::open(input_path)?);
119 let mut output_file = BufWriter::new(File::create(output_path)?);
120
121 for chunk_info in self.chunk_iterator(shape, &chunk_dims) {
123 let chunk = self.read_chunk(&mut input_file, &chunk_info, shape)?;
125
126 let chunk_d = chunk.into_dimensionality::<D>().map_err(|_| {
128 NdimageError::ComputationError("Failed to convert chunk dimension".to_string())
129 })?;
130 let result = op.apply_chunk(&chunk_d.view())?;
131
132 self.write_chunk(&mut output_file, &result.view().into_dyn(), &chunk_info)?;
134 }
135
136 Ok(())
137 }
138
139 pub fn process_in_memory<D, Op>(
141 &self,
142 input: &ArrayView<T, D>,
143 op: Op,
144 ) -> NdimageResult<Array<T, D>>
145 where
146 D: Dimension,
147 Op: StreamableOp<T, D>,
148 {
149 let shape = input.shape();
150 let element_size = std::mem::size_of::<T>();
151
152 let chunk_dims = self.calculate_chunk_dimensions(shape, element_size)?;
154
155 let mut output = Array::zeros(input.raw_dim());
157
158 if is_parallel_enabled() {
160 let chunks: Vec<_> = self.chunk_iterator(shape, &chunk_dims).collect();
161
162 chunks.par_iter().try_for_each(|chunk_info| {
163 let chunk = self.extract_chunk(input, chunk_info)?;
164 let _result = op.apply_chunk(&chunk.view())?;
165
166 Ok::<(), NdimageError>(())
169 })?;
170 } else {
171 for chunk_info in self.chunk_iterator(shape, &chunk_dims) {
173 let chunk = self.extract_chunk(input, &chunk_info)?;
174 let result = op.apply_chunk(&chunk.view())?;
175 self.insert_chunk(&mut output.view_mut(), &result.view(), &chunk_info)?;
176 }
177 }
178
179 Ok(output)
180 }
181
182 fn calculate_chunk_dimensions(
184 &self,
185 shape: &[usize],
186 element_size: usize,
187 ) -> NdimageResult<Vec<usize>> {
188 let ndim = shape.len();
189 let mut chunk_dims = shape.to_vec();
190
191 let mut current_size = shape.iter().product::<usize>() * element_size;
193
194 while current_size > self.config.chunk_size && chunk_dims.iter().any(|&d| d > 1) {
195 let (max_idx_, _) = chunk_dims
197 .iter()
198 .enumerate()
199 .filter(|(_, &d)| d > 1)
200 .max_by_key(|(_, &d)| d)
201 .expect("Operation failed");
202
203 chunk_dims[max_idx_] /= 2;
204 current_size = chunk_dims.iter().product::<usize>() * element_size;
205 }
206
207 if !self.config.overlap.is_empty() {
209 for (i, &overlap) in self.config.overlap.iter().enumerate() {
210 if i < ndim {
211 chunk_dims[i] = chunk_dims[i].saturating_add(overlap * 2);
212 }
213 }
214 }
215
216 Ok(chunk_dims)
217 }
218
219 fn chunk_iterator<'a>(&'a self, shape: &'a [usize], chunkdims: &'a [usize]) -> ChunkIterator {
221 ChunkIterator::new(shape, chunkdims, &self.config.overlap)
222 }
223
224 fn extract_chunk<D>(
226 &self,
227 array: &ArrayView<T, D>,
228 chunk_info: &ChunkInfo,
229 ) -> NdimageResult<Array<T, D>>
230 where
231 D: Dimension,
232 {
233 let slices: Vec<_> = chunk_info
234 .ranges
235 .iter()
236 .map(|r| SliceInfoElem::Slice {
237 start: r.start as isize,
238 end: Some(r.end as isize),
239 step: 1,
240 })
241 .collect();
242
243 let array_dyn = array.view().into_dyn();
244 Ok(array_dyn
245 .slice(slices.as_slice())
246 .to_owned()
247 .into_dimensionality::<D>()
248 .map_err(|_| {
249 NdimageError::ComputationError(
250 "Failed to convert chunk back to original dimension".to_string(),
251 )
252 })?)
253 }
254
255 fn insert_chunk<D>(
257 &self,
258 output: &mut ArrayViewMut<T, D>,
259 chunk: &ArrayView<T, D>,
260 chunk_info: &ChunkInfo,
261 ) -> NdimageResult<()>
262 where
263 D: Dimension,
264 {
265 let slices: Vec<_> = chunk_info
266 .output_ranges
267 .iter()
268 .map(|r| SliceInfoElem::Slice {
269 start: r.start as isize,
270 end: Some(r.end as isize),
271 step: 1,
272 })
273 .collect();
274
275 let mut output_dyn = output.view_mut().into_dyn();
276 let mut output_slice = output_dyn.slice_mut(slices.as_slice());
277 output_slice.assign(&chunk.view().into_dyn());
278
279 Ok(())
280 }
281
282 fn read_chunk(
284 &self,
285 file: &mut BufReader<File>,
286 chunk_info: &ChunkInfo,
287 shape: &[usize],
288 ) -> NdimageResult<Array<T, IxDyn>> {
289 let element_size = std::mem::size_of::<T>();
290 let chunk_elements: usize = chunk_info.ranges.iter().map(|r| r.end - r.start).product();
291
292 let offset = self.calculate_file_offset(&chunk_info.ranges, shape, element_size);
294 file.seek(SeekFrom::Start(offset as u64))?;
295
296 let mut buffer = vec![T::zero(); chunk_elements];
298 let byte_buffer = unsafe {
299 std::slice::from_raw_parts_mut(
300 buffer.as_mut_ptr() as *mut u8,
301 chunk_elements * element_size,
302 )
303 };
304 file.read_exact(byte_buffer)?;
305
306 let chunkshape: Vec<_> = chunk_info.ranges.iter().map(|r| r.end - r.start).collect();
308 Ok(Array::from_shape_vec(IxDyn(&chunkshape), buffer)?)
309 }
310
311 fn write_chunk(
313 &self,
314 file: &mut BufWriter<File>,
315 chunk: &ArrayView<T, IxDyn>,
316 _chunk_info: &ChunkInfo,
317 ) -> NdimageResult<()> {
318 let element_size = std::mem::size_of::<T>();
319
320 let slice = chunk
322 .as_slice()
323 .ok_or_else(|| NdimageError::InvalidInput("Chunk is not contiguous".into()))?;
324
325 let byte_slice = unsafe {
326 std::slice::from_raw_parts(slice.as_ptr() as *const u8, slice.len() * element_size)
327 };
328
329 file.write_all(byte_slice)?;
330 Ok(())
331 }
332
333 fn calculate_file_offset(
335 &self,
336 ranges: &[std::ops::Range<usize>],
337 shape: &[usize],
338 element_size: usize,
339 ) -> usize {
340 let mut offset = 0;
341 let mut stride = element_size;
342
343 for (i, range) in ranges.iter().enumerate().rev() {
344 offset += range.start * stride;
345 if i > 0 {
346 stride *= shape[i];
347 }
348 }
349
350 offset
351 }
352}
353
354#[derive(Debug, Clone)]
356struct ChunkInfo {
357 ranges: Vec<std::ops::Range<usize>>,
359 output_ranges: Vec<std::ops::Range<usize>>,
361}
362
363struct ChunkIterator {
365 shape: Vec<usize>,
366 chunk_dims: Vec<usize>,
367 overlap: Vec<usize>,
368 current: Vec<usize>,
369 done: bool,
370}
371
372impl ChunkIterator {
373 fn new(shape: &[usize], chunkdims: &[usize], overlap: &[usize]) -> Self {
374 Self {
375 shape: shape.to_vec(),
376 chunk_dims: chunkdims.to_vec(),
377 overlap: overlap.to_vec(),
378 current: vec![0; shape.len()],
379 done: false,
380 }
381 }
382}
383
384impl Iterator for ChunkIterator {
385 type Item = ChunkInfo;
386
387 fn next(&mut self) -> Option<Self::Item> {
388 if self.done {
389 return None;
390 }
391
392 let mut ranges = Vec::new();
393 let mut output_ranges = Vec::new();
394
395 for i in 0..self.shape.len() {
396 let overlap = self.overlap.get(i).copied().unwrap_or(0);
397 let start = self.current[i].saturating_sub(overlap);
398 let end = (self.current[i] + self.chunk_dims[i]).min(self.shape[i]);
399
400 ranges.push(start..end);
401
402 let output_start = if self.current[i] == 0 { 0 } else { overlap };
404 let output_end = if self.current[i] + self.chunk_dims[i] >= self.shape[i] {
405 end - start
406 } else {
407 end - start - overlap
408 };
409
410 output_ranges.push(output_start..output_end);
411 }
412
413 let chunk_info = ChunkInfo {
414 ranges,
415 output_ranges,
416 };
417
418 let mut carry = true;
420 for i in (0..self.shape.len()).rev() {
421 if carry {
422 self.current[i] += self.chunk_dims[i] - self.overlap.get(i).copied().unwrap_or(0);
423 if self.current[i] < self.shape[i] {
424 carry = false;
425 } else {
426 self.current[i] = 0;
427 }
428 }
429 }
430
431 if carry {
432 self.done = true;
433 }
434
435 Some(chunk_info)
436 }
437}
438
439pub struct StreamingGaussianFilter<T> {
441 sigma: Vec<T>,
442 truncate: Option<T>,
443}
444
445impl<T: Float + FromPrimitive + Debug + Clone> StreamingGaussianFilter<T> {
446 pub fn new(sigma: Vec<T>, truncate: Option<T>) -> Self {
447 Self { sigma, truncate }
448 }
449}
450
451impl<T, D> StreamableOp<T, D> for StreamingGaussianFilter<T>
452where
453 T: Float + FromPrimitive + Debug + Clone + Send + Sync + 'static,
454 D: Dimension + 'static,
455{
456 fn apply_chunk(&self, chunk: &ArrayView<T, D>) -> NdimageResult<Array<T, D>> {
457 let chunk_f64 = chunk.mapv(|x| x.to_f64().unwrap_or(0.0));
459
460 let sigma = self
462 .sigma
463 .first()
464 .map(|s| s.to_f64().unwrap_or(1.0))
465 .unwrap_or(1.0);
466
467 let truncate = self.truncate.and_then(|t| t.to_f64());
468
469 let result_f64 = crate::filters::gaussian_filter(
470 &chunk_f64,
471 sigma,
472 Some(BorderMode::Reflect),
473 truncate,
474 )?;
475
476 Ok(result_f64.mapv(|x| T::from_f64(x).unwrap_or_else(|| T::zero())))
478 }
479
480 fn required_overlap(&self) -> Vec<usize> {
481 self.sigma
483 .iter()
484 .map(|&s| {
485 let truncate = self
486 .truncate
487 .unwrap_or(T::from_f64(4.0).expect("Operation failed"));
488 (truncate * s).to_usize().unwrap_or(4)
489 })
490 .collect()
491 }
492
493 fn merge_overlap(
494 &self,
495 output: &mut ArrayViewMut<T, D>,
496 new_chunk: &ArrayView<T, D>,
497 overlap_info: &OverlapInfo,
498 ) -> NdimageResult<()> {
499 let dim = overlap_info.dimension;
500 let overlap_size = overlap_info.overlap_size;
501
502 if overlap_size == 0 {
504 output.assign(new_chunk);
505 return Ok(());
506 }
507
508 let outputshape = output.shape().to_vec();
513 let chunkshape = new_chunk.shape().to_vec();
514
515 if outputshape != chunkshape {
517 return Err(NdimageError::DimensionError(
518 "Output and _chunk shapes must match for overlap merging".to_string(),
519 ));
520 }
521
522 let mut flat_idx = 0;
525
526 for (output_pixel, &chunk_pixel) in output.iter_mut().zip(new_chunk.iter()) {
527 let mut coord_in_dim = flat_idx;
529 for d in (dim + 1..outputshape.len()).rev() {
530 coord_in_dim /= outputshape[d];
531 }
532 coord_in_dim %= outputshape[dim];
533
534 if coord_in_dim < overlap_size {
535 let distance_from_edge = coord_in_dim;
537 let weight = T::from_f64(distance_from_edge as f64 / overlap_size as f64)
538 .expect("Operation failed");
539 *output_pixel = *output_pixel * (T::one() - weight) + chunk_pixel * weight;
540 } else if coord_in_dim >= outputshape[dim] - overlap_size {
541 let distance_from_end = outputshape[dim] - 1 - coord_in_dim;
543 let weight = T::from_f64(distance_from_end as f64 / overlap_size as f64)
544 .expect("Operation failed");
545 *output_pixel = *output_pixel * (T::one() - weight) + chunk_pixel * weight;
546 } else {
547 *output_pixel = chunk_pixel;
549 }
550
551 flat_idx += 1;
552 }
553
554 Ok(())
555 }
556}
557
558#[allow(dead_code)]
560pub fn stream_process_file<T, D, Op>(
561 input_path: &Path,
562 output_path: &Path,
563 shape: &[usize],
564 op: Op,
565 config: Option<StreamConfig>,
566) -> NdimageResult<()>
567where
568 T: Float + FromPrimitive + Debug + Clone + Send + Sync + 'static,
569 D: Dimension,
570 Op: StreamableOp<T, D>,
571{
572 let config = config.unwrap_or_default();
573 let processor = StreamProcessor::<T>::new(config);
574 processor.process_file::<D, Op>(input_path, output_path, shape, op)
575}
576
577#[cfg(test)]
578mod tests {
579 use super::*;
580 use scirs2_core::ndarray::arr2;
581
582 #[test]
583 fn test_chunk_iterator() {
584 let shape = vec![100, 100];
585 let chunk_dims = vec![30, 30];
586 let overlap = vec![5, 5];
587
588 let mut count = 0;
589 for chunk in ChunkIterator::new(&shape, &chunk_dims, &overlap) {
590 assert!(!chunk.ranges.is_empty());
591 count += 1;
592 }
593
594 assert!(count > 1);
596 }
597
598 #[test]
599 fn test_streaming_processor() {
600 let config = StreamConfig {
601 chunk_size: 1024,
602 overlap: vec![2, 2],
603 ..Default::default()
604 };
605
606 let processor = StreamProcessor::<f64>::new(config);
607 let input = arr2(&[[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]]);
608
609 let op = StreamingGaussianFilter::new(vec![1.0, 1.0], None);
610 let result = processor
611 .process_in_memory(&input.view(), op)
612 .expect("Operation failed");
613
614 assert_eq!(result.shape(), input.shape());
615 }
616}
617
618#[allow(dead_code)]
624pub struct AdaptiveStreamProcessor<T> {
625 base_config: StreamConfig,
626 performance_monitor: PerformanceMonitor,
627 memory_manager: MemoryManager,
628 _phantom: std::marker::PhantomData<T>,
629}
630
631impl<T> AdaptiveStreamProcessor<T>
632where
633 T: Float + FromPrimitive + Debug + Clone + Send + Sync + 'static,
634{
635 pub fn new(_baseconfig: StreamConfig) -> Self {
636 Self {
637 base_config: _baseconfig,
638 performance_monitor: PerformanceMonitor::new(),
639 memory_manager: MemoryManager::new(),
640 _phantom: std::marker::PhantomData,
641 }
642 }
643
644 pub fn process_adaptive<D, Op>(
646 &mut self,
647 input: &ArrayView<T, D>,
648 op: Op,
649 ) -> NdimageResult<Array<T, D>>
650 where
651 D: Dimension,
652 Op: StreamableOp<T, D> + AdaptiveOperation<T, D>,
653 {
654 let shape = input.shape();
655 let mut current_config = self.base_config.clone();
656
657 let complexity = op.estimate_complexity(shape);
659 current_config.chunk_size = self.memory_manager.calculate_optimal_chunk_size(
660 std::mem::size_of::<T>(),
661 shape,
662 complexity,
663 );
664
665 let mut output = Array::zeros(input.raw_dim());
667 let mut chunk_times = Vec::new();
668
669 for (chunk_idx, chunk_info) in self.chunk_iterator(shape, ¤t_config).enumerate() {
670 let start_time = std::time::Instant::now();
671
672 let chunk = self.extract_chunk_with_bounds(input, &chunk_info)?;
674
675 let result = op.apply_chunk(&chunk.view())?;
677
678 self.merge_chunk_result(&mut output.view_mut(), &result.view(), &chunk_info)?;
680
681 let chunk_time = start_time.elapsed();
683 chunk_times.push(chunk_time);
684
685 if chunk_idx > 0 && chunk_idx % 10 == 0 {
687 let avg_time =
688 chunk_times.iter().sum::<std::time::Duration>() / chunk_times.len() as u32;
689 let new_config = self
690 .performance_monitor
691 .adjust_config(¤t_config, avg_time);
692
693 if new_config.chunk_size != current_config.chunk_size {
694 current_config = new_config;
695 chunk_times.clear(); }
697 }
698 }
699
700 Ok(output)
701 }
702
703 #[cfg(feature = "gpu")]
705 pub fn process_gpu_accelerated<D, Op>(
706 &mut self,
707 input: &ArrayView<T, D>,
708 op: Op,
709 ) -> NdimageResult<Array<T, D>>
710 where
711 D: Dimension,
712 Op: StreamableOp<T, D> + GpuStreamableOp<T, D>,
713 {
714 #[cfg(feature = "gpu")]
716 if let Ok(device_manager) = crate::backend::device_detection::get_device_manager() {
717 if let Ok(dm) = device_manager.lock() {
718 if let Some((backend, device_id)) =
719 dm.get_best_device(input.len() * std::mem::size_of::<T>())
720 {
721 return self.process_gpu_chunks(input, op, backend);
722 }
723 }
724 }
725
726 let result = op.apply_chunk(&input)?;
729 Ok(result)
730 }
731
732 fn extract_chunk_with_bounds<D>(
734 &self,
735 input: &ArrayView<T, D>,
736 chunk_info: &ChunkInfo,
737 ) -> NdimageResult<Array<T, D>>
738 where
739 D: Dimension,
740 {
741 let mut slice_info = Vec::new();
742 for range in &chunk_info.ranges {
743 slice_info.push(Slice::from(range.clone()));
744 }
745
746 let chunk_view =
748 input.slice_each_axis(|ax| Slice::from(chunk_info.ranges[ax.axis.index()].clone()));
749 Ok(chunk_view.to_owned())
750 }
751
752 fn merge_chunk_result<D>(
754 &self,
755 output: &mut ArrayViewMut<T, D>,
756 result: &ArrayView<T, D>,
757 chunk_info: &ChunkInfo,
758 ) -> NdimageResult<()>
759 where
760 D: Dimension,
761 {
762 let mut result_slice_info = Vec::new();
767
768 for (i, output_range) in chunk_info.output_ranges.iter().enumerate() {
769 let input_range = &chunk_info.ranges[i];
770
771 let offset_start = output_range.start - input_range.start;
773 let offset_end = offset_start + (output_range.end - output_range.start);
774
775 result_slice_info.push(offset_start..offset_end);
776 }
777
778 let result_slice = result.slice_each_axis(|ax| {
780 let range = &result_slice_info[ax.axis.index()];
781 Slice::from(range.start..range.end)
782 });
783
784 let mut output_slice = output.slice_each_axis_mut(|ax| {
785 let range = &chunk_info.output_ranges[ax.axis.index()];
786 Slice::from(range.start..range.end)
787 });
788
789 if result_slice.shape() != output_slice.shape() {
791 return Err(NdimageError::DimensionError(format!(
792 "Shape mismatch in chunk merging: result slice {:?} vs output slice {:?}",
793 result_slice.shape(),
794 output_slice.shape()
795 )));
796 }
797
798 output_slice.assign(&result_slice);
801
802 Ok(())
803 }
804
805 fn chunk_iterator(
807 &self,
808 shape: &[usize],
809 config: &StreamConfig,
810 ) -> impl Iterator<Item = ChunkInfo> {
811 let element_size = std::mem::size_of::<T>();
812 let chunk_dims = self.calculate_optimal_chunk_dimensions(shape, element_size, config);
813 ChunkIterator::new(shape, &chunk_dims, &config.overlap)
814 }
815
816 fn calculate_optimal_chunk_dimensions(
818 &self,
819 shape: &[usize],
820 element_size: usize,
821 config: &StreamConfig,
822 ) -> Vec<usize> {
823 let target_elements = config.chunk_size / element_size;
824 let ndim = shape.len();
825
826 let base_size = (target_elements as f64).powf(1.0 / ndim as f64) as usize;
828
829 shape
830 .iter()
831 .map(|&dim_size| base_size.min(dim_size))
832 .collect()
833 }
834
835 #[cfg(feature = "gpu")]
836 fn process_gpu_chunks<D, Op>(
837 &mut self,
838 input: &ArrayView<T, D>,
839 op: Op,
840 gpu_backend: crate::backend::Backend,
841 ) -> NdimageResult<Array<T, D>>
842 where
843 D: Dimension,
844 Op: GpuStreamableOp<T, D>,
845 {
846 use crate::backend::{Backend, GpuContext};
847
848 let gpucontext: Box<dyn GpuContext> = match gpu_backend {
850 #[cfg(feature = "cuda")]
851 Backend::Cuda => {
852 use crate::backend::CudaContext;
853 Box::new(CudaContext::new(None)?)
854 }
855 #[cfg(feature = "opencl")]
856 Backend::OpenCL => {
857 use crate::backend::OpenCLContext;
858 Box::new(OpenCLContext::new(None)?)
859 }
860 _ => {
861 return Err(crate::error::NdimageError::GpuNotAvailable(
862 "Unsupported GPU backend".to_string(),
863 ))?
864 }
865 };
866
867 let required_overlap = op.required_overlap();
869 let overlap = if required_overlap.is_empty() {
870 vec![0; input.ndim()]
871 } else {
872 required_overlap
873 };
874
875 let chunk_dims = self.calculate_optimal_chunk_dimensions(
877 input.shape(),
878 std::mem::size_of::<T>(),
879 &self.base_config,
880 );
881
882 let mut output = Array::<T, D>::zeros(input.raw_dim());
884
885 let chunk_iter = ChunkIterator::new(input.shape(), &chunk_dims, &overlap);
887
888 for chunk_info in chunk_iter {
890 let chunk_view = input.slice_each_axis(|ax| {
892 let range = &chunk_info.ranges[ax.axis.index()];
893 Slice::from(range.start..range.end)
894 });
895
896 if !op.is_gpu_suitable(chunk_view.shape()) {
898 let chunk_result = op.apply_chunk(&chunk_view)?;
900
901 let mut output_slice = output.slice_each_axis_mut(|ax| {
903 let range = &chunk_info.output_ranges[ax.axis.index()];
904 Slice::from(range.start..range.end)
905 });
906
907 let result_slice = chunk_result.slice_each_axis(|ax| {
909 let input_range = &chunk_info.ranges[ax.axis.index()];
910 let output_range = &chunk_info.output_ranges[ax.axis.index()];
911 let offset = output_range.start - input_range.start;
912 let size = output_range.end - output_range.start;
913 Slice::from(offset..offset + size)
914 });
915
916 output_slice.assign(&result_slice);
917 continue;
918 }
919
920 let chunk_result = op.apply_chunk_gpu(&chunk_view, gpucontext.as_ref())?;
922
923 if overlap.iter().any(|&x| x > 0) {
925 let overlap_info = OverlapInfo {
926 dimension: 0, output_start: chunk_info.output_ranges[0].start,
928 output_end: chunk_info.output_ranges[0].end,
929 overlap_size: overlap[0],
930 };
931
932 let mut output_slice = output.slice_each_axis_mut(|ax| {
933 let range = &chunk_info.output_ranges[ax.axis.index()];
934 Slice::from(range.start..range.end)
935 });
936
937 let result_slice = chunk_result.slice_each_axis(|ax| {
939 let input_range = &chunk_info.ranges[ax.axis.index()];
940 let output_range = &chunk_info.output_ranges[ax.axis.index()];
941 let offset = output_range.start - input_range.start;
942 let size = output_range.end - output_range.start;
943 Slice::from(offset..offset + size)
944 });
945
946 op.merge_overlap(&mut output_slice, &result_slice, &overlap_info)?;
947 } else {
948 let mut output_slice = output.slice_each_axis_mut(|ax| {
950 let range = &chunk_info.output_ranges[ax.axis.index()];
951 Slice::from(range.start..range.end)
952 });
953
954 output_slice.assign(&chunk_result);
956 }
957 }
958
959 Ok(output)
960 }
961}
962
963#[allow(dead_code)]
965struct PerformanceMonitor {
966 history: Vec<PerformanceMetrics>,
967}
968
969impl PerformanceMonitor {
970 fn new() -> Self {
971 Self {
972 history: Vec::new(),
973 }
974 }
975
976 fn adjust_config(
977 &mut self,
978 current: &StreamConfig,
979 avg_time: std::time::Duration,
980 ) -> StreamConfig {
981 let mut new_config = current.clone();
982
983 if avg_time.as_millis() < 100 {
986 new_config.chunk_size = (current.chunk_size as f64 * 1.2) as usize;
987 } else if avg_time.as_millis() > 1000 {
988 new_config.chunk_size = (current.chunk_size as f64 * 0.8) as usize;
989 }
990
991 new_config.chunk_size = new_config.chunk_size.max(64 * 1024); self.history.push(PerformanceMetrics {
995 chunk_size: current.chunk_size,
996 processing_time: avg_time,
997 timestamp: std::time::Instant::now(),
998 });
999
1000 new_config
1001 }
1002}
1003
1004#[derive(Debug, Clone)]
1006#[allow(dead_code)]
1007struct PerformanceMetrics {
1008 chunk_size: usize,
1009 processing_time: std::time::Duration,
1010 timestamp: std::time::Instant,
1011}
1012
1013#[allow(dead_code)]
1015struct MemoryManager {
1016 available_memory: usize,
1017 cache_sizes: [usize; 3], }
1019
1020impl MemoryManager {
1021 fn new() -> Self {
1022 Self {
1023 available_memory: Self::detect_available_memory(),
1024 cache_sizes: Self::detect_cache_sizes(),
1025 }
1026 }
1027
1028 fn calculate_optimal_chunk_size(
1029 &self,
1030 element_size: usize,
1031 shape: &[usize],
1032 complexity: OperationComplexity,
1033 ) -> usize {
1034 let total_elements: usize = shape.iter().product();
1035 let _total_bytes = total_elements * element_size;
1036
1037 let memory_fraction = match complexity {
1039 OperationComplexity::Low => 0.1,
1040 OperationComplexity::Medium => 0.05,
1041 OperationComplexity::High => 0.02,
1042 };
1043
1044 let target_size = (self.available_memory as f64 * memory_fraction) as usize;
1045
1046 if complexity == OperationComplexity::Low {
1048 target_size.min(self.cache_sizes[2])
1049 } else {
1050 target_size
1051 }
1052 .max(64 * 1024) }
1054
1055 fn detect_available_memory() -> usize {
1056 1_000_000_000 }
1060
1061 fn detect_cache_sizes() -> [usize; 3] {
1062 [32 * 1024, 256 * 1024, 8 * 1024 * 1024] }
1065}
1066
1067#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1069pub enum OperationComplexity {
1070 Low, Medium, High, }
1074
1075#[allow(dead_code)]
1077pub trait AdaptiveOperation<T, D>: StreamableOp<T, D>
1078where
1079 T: Float + FromPrimitive + Debug + Clone,
1080 D: Dimension,
1081{
1082 fn estimate_complexity(&self, shape: &[usize]) -> OperationComplexity;
1084
1085 fn suggest_overlap(&self, _chunkdims: &[usize]) -> Vec<usize> {
1087 self.required_overlap()
1088 }
1089
1090 fn supports_parallel_chunks(&self) -> bool {
1092 true
1093 }
1094}
1095
1096#[cfg(feature = "gpu")]
1098#[allow(dead_code)]
1099pub trait GpuStreamableOp<T, D>: StreamableOp<T, D>
1100where
1101 T: Float + FromPrimitive + Debug + Clone,
1102 D: Dimension,
1103{
1104 fn apply_chunk_gpu(
1106 &self,
1107 chunk: &ArrayView<T, D>,
1108 gpucontext: &dyn crate::backend::GpuContext,
1109 ) -> NdimageResult<Array<T, D>>;
1110
1111 fn is_gpu_suitable(&self, chunkshape: &[usize]) -> bool;
1113
1114 fn gpu_memory_requirement(&self, chunkshape: &[usize]) -> usize;
1116}
1117
1118#[cfg(feature = "gpu")]
1120#[allow(dead_code)]
1121pub struct GpuContext {
1122 device_id: u32,
1124 memory_pool: Option<*mut u8>,
1125}
1126
1127#[cfg(feature = "gpu")]
1128impl GpuContext {
1129 pub fn new() -> NdimageResult<Self> {
1130 Ok(Self {
1133 device_id: 0, memory_pool: None,
1135 })
1136 }
1137
1138 pub fn device_id(&self) -> u32 {
1139 self.device_id
1140 }
1141
1142 pub fn allocate_memory(&mut self, size: usize) -> NdimageResult<*mut u8> {
1143 Ok(std::ptr::null_mut())
1146 }
1147
1148 pub fn free_memory(&mut self, ptr: *mut u8) -> NdimageResult<()> {
1149 Ok(())
1152 }
1153}
1154
1155#[cfg(feature = "compression")]
1158struct GzipWriteAdapter {
1159 file: File,
1160 buffer: Vec<u8>,
1161 level: u8,
1162}
1163
1164#[cfg(feature = "compression")]
1165impl GzipWriteAdapter {
1166 fn new(file: File, level: u8) -> Self {
1167 Self {
1168 file,
1169 buffer: Vec::new(),
1170 level,
1171 }
1172 }
1173}
1174
1175#[cfg(feature = "compression")]
1176impl Write for GzipWriteAdapter {
1177 fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
1178 self.buffer.extend_from_slice(data);
1179 Ok(data.len())
1180 }
1181 fn flush(&mut self) -> std::io::Result<()> {
1182 if !self.buffer.is_empty() {
1183 let compressed = oxiarc_deflate::gzip_compress(&self.buffer, self.level)
1184 .map_err(|e| std::io::Error::other(e.to_string()))?;
1185 self.file.write_all(&compressed)?;
1186 self.file.flush()?;
1187 self.buffer.clear();
1188 }
1189 Ok(())
1190 }
1191}
1192
1193#[cfg(feature = "compression")]
1194impl Drop for GzipWriteAdapter {
1195 fn drop(&mut self) {
1196 let _ = self.flush();
1197 }
1198}
1199
1200#[cfg(feature = "compression")]
1203struct Lz4WriteAdapter {
1204 file: File,
1205 buffer: Vec<u8>,
1206}
1207
1208#[cfg(feature = "compression")]
1209impl Lz4WriteAdapter {
1210 fn new(file: File) -> Self {
1211 Self {
1212 file,
1213 buffer: Vec::new(),
1214 }
1215 }
1216}
1217
1218#[cfg(feature = "compression")]
1219impl Write for Lz4WriteAdapter {
1220 fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
1221 self.buffer.extend_from_slice(data);
1222 Ok(data.len())
1223 }
1224 fn flush(&mut self) -> std::io::Result<()> {
1225 if !self.buffer.is_empty() {
1226 let compressed = oxiarc_lz4::compress(&self.buffer)
1227 .map_err(|e| std::io::Error::other(e.to_string()))?;
1228 self.file.write_all(&compressed)?;
1229 self.file.flush()?;
1230 self.buffer.clear();
1231 }
1232 Ok(())
1233 }
1234}
1235
1236#[cfg(feature = "compression")]
1237impl Drop for Lz4WriteAdapter {
1238 fn drop(&mut self) {
1239 let _ = self.flush();
1240 }
1241}
1242
1243#[cfg(feature = "compression")]
1246struct ZstdWriteAdapter {
1247 file: File,
1248 buffer: Vec<u8>,
1249 level: i32,
1250}
1251
1252#[cfg(feature = "compression")]
1253impl ZstdWriteAdapter {
1254 fn new(file: File, level: i32) -> Self {
1255 Self {
1256 file,
1257 buffer: Vec::new(),
1258 level,
1259 }
1260 }
1261}
1262
1263#[cfg(feature = "compression")]
1264impl Write for ZstdWriteAdapter {
1265 fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
1266 self.buffer.extend_from_slice(data);
1267 Ok(data.len())
1268 }
1269 fn flush(&mut self) -> std::io::Result<()> {
1270 if !self.buffer.is_empty() {
1271 let compressed = oxiarc_zstd::compress_with_level(&self.buffer, self.level)
1272 .map_err(|e| std::io::Error::other(e.to_string()))?;
1273 self.file.write_all(&compressed)?;
1274 self.file.flush()?;
1275 self.buffer.clear();
1276 }
1277 Ok(())
1278 }
1279}
1280
1281#[cfg(feature = "compression")]
1282impl Drop for ZstdWriteAdapter {
1283 fn drop(&mut self) {
1284 let _ = self.flush();
1285 }
1286}
1287
1288#[allow(dead_code)]
1290pub fn stream_process_file_compressed<T>(
1291 input_path: &std::path::Path,
1292 output_path: &std::path::Path,
1293 shape: &[usize],
1294 compression: CompressionType,
1295 config: StreamConfig,
1296) -> NdimageResult<()>
1297where
1298 T: Float + FromPrimitive + Debug + Clone + Send + Sync + 'static,
1299{
1300 let input_file = File::open(input_path)
1302 .map_err(|e| NdimageError::ComputationError(format!("Failed to open input file: {}", e)))?;
1303
1304 let mut input_reader: Box<dyn Read> = match compression {
1305 CompressionType::None => Box::new(BufReader::new(input_file)),
1306 CompressionType::Gzip => {
1307 #[cfg(feature = "compression")]
1308 {
1309 let mut compressed_data = Vec::new();
1311 std::io::Read::read_to_end(&mut BufReader::new(input_file), &mut compressed_data)
1312 .map_err(|e| {
1313 NdimageError::ComputationError(format!("Failed to read input: {}", e))
1314 })?;
1315 let decompressed =
1316 oxiarc_deflate::gzip_decompress(&compressed_data).map_err(|e| {
1317 NdimageError::ComputationError(format!("Failed to decompress gzip: {}", e))
1318 })?;
1319 Box::new(std::io::Cursor::new(decompressed))
1320 }
1321 #[cfg(not(feature = "compression"))]
1322 return Err(NdimageError::InvalidInput(
1323 "Gzip compression support not enabled".into(),
1324 ));
1325 }
1326 CompressionType::Lz4 => {
1327 #[cfg(feature = "compression")]
1328 {
1329 let mut compressed_data = Vec::new();
1331 std::io::Read::read_to_end(&mut BufReader::new(input_file), &mut compressed_data)
1332 .map_err(|e| {
1333 NdimageError::ComputationError(format!("Failed to read input: {}", e))
1334 })?;
1335 let decompressed = oxiarc_lz4::decompress(&compressed_data, 256 * 1024 * 1024)
1336 .map_err(|e| {
1337 NdimageError::ComputationError(format!("Failed to decompress LZ4: {}", e))
1338 })?;
1339 Box::new(std::io::Cursor::new(decompressed))
1340 }
1341 #[cfg(not(feature = "compression"))]
1342 return Err(NdimageError::InvalidInput(
1343 "LZ4 compression support not enabled".into(),
1344 ));
1345 }
1346 CompressionType::Zstd => {
1347 #[cfg(feature = "compression")]
1348 {
1349 let mut compressed_data = Vec::new();
1351 std::io::Read::read_to_end(&mut BufReader::new(input_file), &mut compressed_data)
1352 .map_err(|e| {
1353 NdimageError::ComputationError(format!("Failed to read input: {}", e))
1354 })?;
1355 let decompressed = oxiarc_zstd::decompress(&compressed_data).map_err(|e| {
1356 NdimageError::ComputationError(format!("Failed to decompress Zstd: {}", e))
1357 })?;
1358 Box::new(std::io::Cursor::new(decompressed))
1359 }
1360 #[cfg(not(feature = "compression"))]
1361 return Err(NdimageError::InvalidInput(
1362 "Zstd compression support not enabled".into(),
1363 ));
1364 }
1365 };
1366
1367 let output_file = File::create(output_path).map_err(|e| {
1369 NdimageError::ComputationError(format!("Failed to create output file: {}", e))
1370 })?;
1371
1372 let mut output_writer: Box<dyn Write> = match compression {
1373 CompressionType::None => Box::new(BufWriter::new(output_file)),
1374 CompressionType::Gzip => {
1375 #[cfg(feature = "compression")]
1376 {
1377 Box::new(GzipWriteAdapter::new(output_file, 6))
1379 }
1380 #[cfg(not(feature = "compression"))]
1381 return Err(NdimageError::InvalidInput(
1382 "Gzip compression support not enabled".into(),
1383 ));
1384 }
1385 CompressionType::Lz4 => {
1386 #[cfg(feature = "compression")]
1387 {
1388 Box::new(Lz4WriteAdapter::new(output_file))
1390 }
1391 #[cfg(not(feature = "compression"))]
1392 return Err(NdimageError::InvalidInput(
1393 "LZ4 compression support not enabled".into(),
1394 ));
1395 }
1396 CompressionType::Zstd => {
1397 #[cfg(feature = "compression")]
1398 {
1399 Box::new(ZstdWriteAdapter::new(output_file, 3))
1401 }
1402 #[cfg(not(feature = "compression"))]
1403 return Err(NdimageError::InvalidInput(
1404 "Zstd compression support not enabled".into(),
1405 ));
1406 }
1407 };
1408
1409 let element_size = std::mem::size_of::<T>();
1411 let total_elements: usize = shape.iter().product();
1412 let chunk_elements = config.chunk_size / element_size;
1413
1414 let mut elements_processed = 0;
1416 while elements_processed < total_elements {
1417 let chunk_size = (total_elements - elements_processed).min(chunk_elements);
1418
1419 let mut chunk_data = vec![0u8; chunk_size * element_size];
1421 input_reader
1422 .read_exact(&mut chunk_data)
1423 .map_err(|e| NdimageError::ComputationError(format!("Failed to read chunk: {}", e)))?;
1424
1425 output_writer
1433 .write_all(&chunk_data)
1434 .map_err(|e| NdimageError::ComputationError(format!("Failed to write chunk: {}", e)))?;
1435
1436 elements_processed += chunk_size;
1437 }
1438
1439 output_writer
1441 .flush()
1442 .map_err(|e| NdimageError::ComputationError(format!("Failed to flush output: {}", e)))?;
1443
1444 Ok(())
1445}
1446
1447#[derive(Debug, Clone, Copy)]
1449#[allow(dead_code)]
1450pub enum CompressionType {
1451 None,
1452 Lz4,
1453 Zstd,
1454 Gzip,
1455}
1456
1457#[allow(dead_code)]
1459pub trait BatchStreamableOp<T, D>: Send + Sync
1460where
1461 T: Float + FromPrimitive + Debug + Clone,
1462 D: Dimension,
1463{
1464 fn apply_batch(&self, chunks: &[ArrayView<T, D>]) -> NdimageResult<Vec<Array<T, D>>>;
1466
1467 fn required_batch_overlap(&self) -> Vec<usize>;
1469}