use scirs2_core::ndarray::{
Array, ArrayView, ArrayViewMut, Dimension, IxDyn, Slice, SliceInfoElem,
};
use scirs2_core::numeric::{Float, FromPrimitive, Zero};
use scirs2_core::parallel_ops::*;
use std::fmt::Debug;
use std::fs::File;
use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
use std::path::Path;
use crate::error::{NdimageError, NdimageResult};
use crate::filters::BorderMode;
#[derive(Debug, Clone)]
pub struct StreamConfig {
pub chunk_size: usize,
pub overlap: Vec<usize>,
pub use_mmap: bool,
pub cache_chunks: usize,
pub temp_dir: Option<String>,
}
impl Default for StreamConfig {
fn default() -> Self {
Self {
chunk_size: 128 * 1024 * 1024, overlap: vec![],
use_mmap: true,
cache_chunks: 4,
temp_dir: None,
}
}
}
pub trait StreamableOp<T, D>: Send + Sync
where
T: Float + FromPrimitive + Debug + Clone,
D: Dimension,
{
fn apply_chunk(&self, chunk: &ArrayView<T, D>) -> NdimageResult<Array<T, D>>;
fn required_overlap(&self) -> Vec<usize>;
fn merge_overlap(
&self,
output: &mut ArrayViewMut<T, D>,
new_chunk: &ArrayView<T, D>,
overlap_info: &OverlapInfo,
) -> NdimageResult<()>;
}
#[derive(Debug, Clone)]
pub struct OverlapInfo {
pub dimension: usize,
pub output_start: usize,
pub output_end: usize,
pub overlap_size: usize,
}
pub struct StreamProcessor<T> {
config: StreamConfig,
phantom: std::marker::PhantomData<T>,
}
impl<T> StreamProcessor<T>
where
T: Float + FromPrimitive + Debug + Clone + Send + Sync + 'static,
{
pub fn new(config: StreamConfig) -> Self {
Self {
config,
phantom: std::marker::PhantomData,
}
}
pub fn process_file<D, Op>(
&self,
input_path: &Path,
output_path: &Path,
shape: &[usize],
op: Op,
) -> NdimageResult<()>
where
D: Dimension,
Op: StreamableOp<T, D>,
{
let element_size = std::mem::size_of::<T>();
let total_elements: usize = shape.iter().product();
let total_size = total_elements * element_size;
let chunk_dims = self.calculate_chunk_dimensions(shape, element_size)?;
let mut input_file = BufReader::new(File::open(input_path)?);
let mut output_file = BufWriter::new(File::create(output_path)?);
for chunk_info in self.chunk_iterator(shape, &chunk_dims) {
let chunk = self.read_chunk(&mut input_file, &chunk_info, shape)?;
let chunk_d = chunk.into_dimensionality::<D>().map_err(|_| {
NdimageError::ComputationError("Failed to convert chunk dimension".to_string())
})?;
let result = op.apply_chunk(&chunk_d.view())?;
self.write_chunk(&mut output_file, &result.view().into_dyn(), &chunk_info)?;
}
Ok(())
}
pub fn process_in_memory<D, Op>(
&self,
input: &ArrayView<T, D>,
op: Op,
) -> NdimageResult<Array<T, D>>
where
D: Dimension,
Op: StreamableOp<T, D>,
{
let shape = input.shape();
let element_size = std::mem::size_of::<T>();
let chunk_dims = self.calculate_chunk_dimensions(shape, element_size)?;
let mut output = Array::zeros(input.raw_dim());
if is_parallel_enabled() {
let chunks: Vec<_> = self.chunk_iterator(shape, &chunk_dims).collect();
chunks.par_iter().try_for_each(|chunk_info| {
let chunk = self.extract_chunk(input, chunk_info)?;
let _result = op.apply_chunk(&chunk.view())?;
Ok::<(), NdimageError>(())
})?;
} else {
for chunk_info in self.chunk_iterator(shape, &chunk_dims) {
let chunk = self.extract_chunk(input, &chunk_info)?;
let result = op.apply_chunk(&chunk.view())?;
self.insert_chunk(&mut output.view_mut(), &result.view(), &chunk_info)?;
}
}
Ok(output)
}
fn calculate_chunk_dimensions(
&self,
shape: &[usize],
element_size: usize,
) -> NdimageResult<Vec<usize>> {
let ndim = shape.len();
let mut chunk_dims = shape.to_vec();
let mut current_size = shape.iter().product::<usize>() * element_size;
while current_size > self.config.chunk_size && chunk_dims.iter().any(|&d| d > 1) {
let (max_idx_, _) = chunk_dims
.iter()
.enumerate()
.filter(|(_, &d)| d > 1)
.max_by_key(|(_, &d)| d)
.expect("Operation failed");
chunk_dims[max_idx_] /= 2;
current_size = chunk_dims.iter().product::<usize>() * element_size;
}
if !self.config.overlap.is_empty() {
for (i, &overlap) in self.config.overlap.iter().enumerate() {
if i < ndim {
chunk_dims[i] = chunk_dims[i].saturating_add(overlap * 2);
}
}
}
Ok(chunk_dims)
}
fn chunk_iterator<'a>(&'a self, shape: &'a [usize], chunkdims: &'a [usize]) -> ChunkIterator {
ChunkIterator::new(shape, chunkdims, &self.config.overlap)
}
fn extract_chunk<D>(
&self,
array: &ArrayView<T, D>,
chunk_info: &ChunkInfo,
) -> NdimageResult<Array<T, D>>
where
D: Dimension,
{
let slices: Vec<_> = chunk_info
.ranges
.iter()
.map(|r| SliceInfoElem::Slice {
start: r.start as isize,
end: Some(r.end as isize),
step: 1,
})
.collect();
let array_dyn = array.view().into_dyn();
Ok(array_dyn
.slice(slices.as_slice())
.to_owned()
.into_dimensionality::<D>()
.map_err(|_| {
NdimageError::ComputationError(
"Failed to convert chunk back to original dimension".to_string(),
)
})?)
}
fn insert_chunk<D>(
&self,
output: &mut ArrayViewMut<T, D>,
chunk: &ArrayView<T, D>,
chunk_info: &ChunkInfo,
) -> NdimageResult<()>
where
D: Dimension,
{
let slices: Vec<_> = chunk_info
.output_ranges
.iter()
.map(|r| SliceInfoElem::Slice {
start: r.start as isize,
end: Some(r.end as isize),
step: 1,
})
.collect();
let mut output_dyn = output.view_mut().into_dyn();
let mut output_slice = output_dyn.slice_mut(slices.as_slice());
output_slice.assign(&chunk.view().into_dyn());
Ok(())
}
fn read_chunk(
&self,
file: &mut BufReader<File>,
chunk_info: &ChunkInfo,
shape: &[usize],
) -> NdimageResult<Array<T, IxDyn>> {
let element_size = std::mem::size_of::<T>();
let chunk_elements: usize = chunk_info.ranges.iter().map(|r| r.end - r.start).product();
let offset = self.calculate_file_offset(&chunk_info.ranges, shape, element_size);
file.seek(SeekFrom::Start(offset as u64))?;
let mut buffer = vec![T::zero(); chunk_elements];
let byte_buffer = unsafe {
std::slice::from_raw_parts_mut(
buffer.as_mut_ptr() as *mut u8,
chunk_elements * element_size,
)
};
file.read_exact(byte_buffer)?;
let chunkshape: Vec<_> = chunk_info.ranges.iter().map(|r| r.end - r.start).collect();
Ok(Array::from_shape_vec(IxDyn(&chunkshape), buffer)?)
}
fn write_chunk(
&self,
file: &mut BufWriter<File>,
chunk: &ArrayView<T, IxDyn>,
_chunk_info: &ChunkInfo,
) -> NdimageResult<()> {
let element_size = std::mem::size_of::<T>();
let slice = chunk
.as_slice()
.ok_or_else(|| NdimageError::InvalidInput("Chunk is not contiguous".into()))?;
let byte_slice = unsafe {
std::slice::from_raw_parts(slice.as_ptr() as *const u8, slice.len() * element_size)
};
file.write_all(byte_slice)?;
Ok(())
}
fn calculate_file_offset(
&self,
ranges: &[std::ops::Range<usize>],
shape: &[usize],
element_size: usize,
) -> usize {
let mut offset = 0;
let mut stride = element_size;
for (i, range) in ranges.iter().enumerate().rev() {
offset += range.start * stride;
if i > 0 {
stride *= shape[i];
}
}
offset
}
}
#[derive(Debug, Clone)]
struct ChunkInfo {
ranges: Vec<std::ops::Range<usize>>,
output_ranges: Vec<std::ops::Range<usize>>,
}
struct ChunkIterator {
shape: Vec<usize>,
chunk_dims: Vec<usize>,
overlap: Vec<usize>,
current: Vec<usize>,
done: bool,
}
impl ChunkIterator {
fn new(shape: &[usize], chunkdims: &[usize], overlap: &[usize]) -> Self {
Self {
shape: shape.to_vec(),
chunk_dims: chunkdims.to_vec(),
overlap: overlap.to_vec(),
current: vec![0; shape.len()],
done: false,
}
}
}
impl Iterator for ChunkIterator {
type Item = ChunkInfo;
fn next(&mut self) -> Option<Self::Item> {
if self.done {
return None;
}
let mut ranges = Vec::new();
let mut output_ranges = Vec::new();
for i in 0..self.shape.len() {
let overlap = self.overlap.get(i).copied().unwrap_or(0);
let start = self.current[i].saturating_sub(overlap);
let end = (self.current[i] + self.chunk_dims[i]).min(self.shape[i]);
ranges.push(start..end);
let output_start = if self.current[i] == 0 { 0 } else { overlap };
let output_end = if self.current[i] + self.chunk_dims[i] >= self.shape[i] {
end - start
} else {
end - start - overlap
};
output_ranges.push(output_start..output_end);
}
let chunk_info = ChunkInfo {
ranges,
output_ranges,
};
let mut carry = true;
for i in (0..self.shape.len()).rev() {
if carry {
self.current[i] += self.chunk_dims[i] - self.overlap.get(i).copied().unwrap_or(0);
if self.current[i] < self.shape[i] {
carry = false;
} else {
self.current[i] = 0;
}
}
}
if carry {
self.done = true;
}
Some(chunk_info)
}
}
pub struct StreamingGaussianFilter<T> {
sigma: Vec<T>,
truncate: Option<T>,
}
impl<T: Float + FromPrimitive + Debug + Clone> StreamingGaussianFilter<T> {
pub fn new(sigma: Vec<T>, truncate: Option<T>) -> Self {
Self { sigma, truncate }
}
}
impl<T, D> StreamableOp<T, D> for StreamingGaussianFilter<T>
where
T: Float + FromPrimitive + Debug + Clone + Send + Sync + 'static,
D: Dimension + 'static,
{
fn apply_chunk(&self, chunk: &ArrayView<T, D>) -> NdimageResult<Array<T, D>> {
let chunk_f64 = chunk.mapv(|x| x.to_f64().unwrap_or(0.0));
let sigma = self
.sigma
.first()
.map(|s| s.to_f64().unwrap_or(1.0))
.unwrap_or(1.0);
let truncate = self.truncate.and_then(|t| t.to_f64());
let result_f64 = crate::filters::gaussian_filter(
&chunk_f64,
sigma,
Some(BorderMode::Reflect),
truncate,
)?;
Ok(result_f64.mapv(|x| T::from_f64(x).unwrap_or_else(|| T::zero())))
}
fn required_overlap(&self) -> Vec<usize> {
self.sigma
.iter()
.map(|&s| {
let truncate = self
.truncate
.unwrap_or(T::from_f64(4.0).expect("Operation failed"));
(truncate * s).to_usize().unwrap_or(4)
})
.collect()
}
fn merge_overlap(
&self,
output: &mut ArrayViewMut<T, D>,
new_chunk: &ArrayView<T, D>,
overlap_info: &OverlapInfo,
) -> NdimageResult<()> {
let dim = overlap_info.dimension;
let overlap_size = overlap_info.overlap_size;
if overlap_size == 0 {
output.assign(new_chunk);
return Ok(());
}
let outputshape = output.shape().to_vec();
let chunkshape = new_chunk.shape().to_vec();
if outputshape != chunkshape {
return Err(NdimageError::DimensionError(
"Output and _chunk shapes must match for overlap merging".to_string(),
));
}
let mut flat_idx = 0;
for (output_pixel, &chunk_pixel) in output.iter_mut().zip(new_chunk.iter()) {
let mut coord_in_dim = flat_idx;
for d in (dim + 1..outputshape.len()).rev() {
coord_in_dim /= outputshape[d];
}
coord_in_dim %= outputshape[dim];
if coord_in_dim < overlap_size {
let distance_from_edge = coord_in_dim;
let weight = T::from_f64(distance_from_edge as f64 / overlap_size as f64)
.expect("Operation failed");
*output_pixel = *output_pixel * (T::one() - weight) + chunk_pixel * weight;
} else if coord_in_dim >= outputshape[dim] - overlap_size {
let distance_from_end = outputshape[dim] - 1 - coord_in_dim;
let weight = T::from_f64(distance_from_end as f64 / overlap_size as f64)
.expect("Operation failed");
*output_pixel = *output_pixel * (T::one() - weight) + chunk_pixel * weight;
} else {
*output_pixel = chunk_pixel;
}
flat_idx += 1;
}
Ok(())
}
}
#[allow(dead_code)]
pub fn stream_process_file<T, D, Op>(
input_path: &Path,
output_path: &Path,
shape: &[usize],
op: Op,
config: Option<StreamConfig>,
) -> NdimageResult<()>
where
T: Float + FromPrimitive + Debug + Clone + Send + Sync + 'static,
D: Dimension,
Op: StreamableOp<T, D>,
{
let config = config.unwrap_or_default();
let processor = StreamProcessor::<T>::new(config);
processor.process_file::<D, Op>(input_path, output_path, shape, op)
}
#[cfg(test)]
mod tests {
use super::*;
use scirs2_core::ndarray::arr2;
#[test]
fn test_chunk_iterator() {
let shape = vec![100, 100];
let chunk_dims = vec![30, 30];
let overlap = vec![5, 5];
let mut count = 0;
for chunk in ChunkIterator::new(&shape, &chunk_dims, &overlap) {
assert!(!chunk.ranges.is_empty());
count += 1;
}
assert!(count > 1);
}
#[test]
fn test_streaming_processor() {
let config = StreamConfig {
chunk_size: 1024,
overlap: vec![2, 2],
..Default::default()
};
let processor = StreamProcessor::<f64>::new(config);
let input = arr2(&[[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]]);
let op = StreamingGaussianFilter::new(vec![1.0, 1.0], None);
let result = processor
.process_in_memory(&input.view(), op)
.expect("Operation failed");
assert_eq!(result.shape(), input.shape());
}
}
#[allow(dead_code)]
pub struct AdaptiveStreamProcessor<T> {
base_config: StreamConfig,
performance_monitor: PerformanceMonitor,
memory_manager: MemoryManager,
_phantom: std::marker::PhantomData<T>,
}
impl<T> AdaptiveStreamProcessor<T>
where
T: Float + FromPrimitive + Debug + Clone + Send + Sync + 'static,
{
pub fn new(_baseconfig: StreamConfig) -> Self {
Self {
base_config: _baseconfig,
performance_monitor: PerformanceMonitor::new(),
memory_manager: MemoryManager::new(),
_phantom: std::marker::PhantomData,
}
}
pub fn process_adaptive<D, Op>(
&mut self,
input: &ArrayView<T, D>,
op: Op,
) -> NdimageResult<Array<T, D>>
where
D: Dimension,
Op: StreamableOp<T, D> + AdaptiveOperation<T, D>,
{
let shape = input.shape();
let mut current_config = self.base_config.clone();
let complexity = op.estimate_complexity(shape);
current_config.chunk_size = self.memory_manager.calculate_optimal_chunk_size(
std::mem::size_of::<T>(),
shape,
complexity,
);
let mut output = Array::zeros(input.raw_dim());
let mut chunk_times = Vec::new();
for (chunk_idx, chunk_info) in self.chunk_iterator(shape, ¤t_config).enumerate() {
let start_time = std::time::Instant::now();
let chunk = self.extract_chunk_with_bounds(input, &chunk_info)?;
let result = op.apply_chunk(&chunk.view())?;
self.merge_chunk_result(&mut output.view_mut(), &result.view(), &chunk_info)?;
let chunk_time = start_time.elapsed();
chunk_times.push(chunk_time);
if chunk_idx > 0 && chunk_idx % 10 == 0 {
let avg_time =
chunk_times.iter().sum::<std::time::Duration>() / chunk_times.len() as u32;
let new_config = self
.performance_monitor
.adjust_config(¤t_config, avg_time);
if new_config.chunk_size != current_config.chunk_size {
current_config = new_config;
chunk_times.clear(); }
}
}
Ok(output)
}
#[cfg(feature = "gpu")]
pub fn process_gpu_accelerated<D, Op>(
&mut self,
input: &ArrayView<T, D>,
op: Op,
) -> NdimageResult<Array<T, D>>
where
D: Dimension,
Op: StreamableOp<T, D> + GpuStreamableOp<T, D>,
{
#[cfg(feature = "gpu")]
if let Ok(device_manager) = crate::backend::device_detection::get_device_manager() {
if let Ok(dm) = device_manager.lock() {
if let Some((backend, device_id)) =
dm.get_best_device(input.len() * std::mem::size_of::<T>())
{
return self.process_gpu_chunks(input, op, backend);
}
}
}
let result = op.apply_chunk(&input)?;
Ok(result)
}
fn extract_chunk_with_bounds<D>(
&self,
input: &ArrayView<T, D>,
chunk_info: &ChunkInfo,
) -> NdimageResult<Array<T, D>>
where
D: Dimension,
{
let mut slice_info = Vec::new();
for range in &chunk_info.ranges {
slice_info.push(Slice::from(range.clone()));
}
let chunk_view =
input.slice_each_axis(|ax| Slice::from(chunk_info.ranges[ax.axis.index()].clone()));
Ok(chunk_view.to_owned())
}
fn merge_chunk_result<D>(
&self,
output: &mut ArrayViewMut<T, D>,
result: &ArrayView<T, D>,
chunk_info: &ChunkInfo,
) -> NdimageResult<()>
where
D: Dimension,
{
let mut result_slice_info = Vec::new();
for (i, output_range) in chunk_info.output_ranges.iter().enumerate() {
let input_range = &chunk_info.ranges[i];
let offset_start = output_range.start - input_range.start;
let offset_end = offset_start + (output_range.end - output_range.start);
result_slice_info.push(offset_start..offset_end);
}
let result_slice = result.slice_each_axis(|ax| {
let range = &result_slice_info[ax.axis.index()];
Slice::from(range.start..range.end)
});
let mut output_slice = output.slice_each_axis_mut(|ax| {
let range = &chunk_info.output_ranges[ax.axis.index()];
Slice::from(range.start..range.end)
});
if result_slice.shape() != output_slice.shape() {
return Err(NdimageError::DimensionError(format!(
"Shape mismatch in chunk merging: result slice {:?} vs output slice {:?}",
result_slice.shape(),
output_slice.shape()
)));
}
output_slice.assign(&result_slice);
Ok(())
}
fn chunk_iterator(
&self,
shape: &[usize],
config: &StreamConfig,
) -> impl Iterator<Item = ChunkInfo> {
let element_size = std::mem::size_of::<T>();
let chunk_dims = self.calculate_optimal_chunk_dimensions(shape, element_size, config);
ChunkIterator::new(shape, &chunk_dims, &config.overlap)
}
fn calculate_optimal_chunk_dimensions(
&self,
shape: &[usize],
element_size: usize,
config: &StreamConfig,
) -> Vec<usize> {
let target_elements = config.chunk_size / element_size;
let ndim = shape.len();
let base_size = (target_elements as f64).powf(1.0 / ndim as f64) as usize;
shape
.iter()
.map(|&dim_size| base_size.min(dim_size))
.collect()
}
#[cfg(feature = "gpu")]
fn process_gpu_chunks<D, Op>(
&mut self,
input: &ArrayView<T, D>,
op: Op,
gpu_backend: crate::backend::Backend,
) -> NdimageResult<Array<T, D>>
where
D: Dimension,
Op: GpuStreamableOp<T, D>,
{
use crate::backend::{Backend, GpuContext};
let gpucontext: Box<dyn GpuContext> = match gpu_backend {
#[cfg(feature = "cuda")]
Backend::Cuda => {
use crate::backend::CudaContext;
Box::new(CudaContext::new(None)?)
}
#[cfg(feature = "opencl")]
Backend::OpenCL => {
use crate::backend::OpenCLContext;
Box::new(OpenCLContext::new(None)?)
}
_ => {
return Err(crate::error::NdimageError::GpuNotAvailable(
"Unsupported GPU backend".to_string(),
))?
}
};
let required_overlap = op.required_overlap();
let overlap = if required_overlap.is_empty() {
vec![0; input.ndim()]
} else {
required_overlap
};
let chunk_dims = self.calculate_optimal_chunk_dimensions(
input.shape(),
std::mem::size_of::<T>(),
&self.base_config,
);
let mut output = Array::<T, D>::zeros(input.raw_dim());
let chunk_iter = ChunkIterator::new(input.shape(), &chunk_dims, &overlap);
for chunk_info in chunk_iter {
let chunk_view = input.slice_each_axis(|ax| {
let range = &chunk_info.ranges[ax.axis.index()];
Slice::from(range.start..range.end)
});
if !op.is_gpu_suitable(chunk_view.shape()) {
let chunk_result = op.apply_chunk(&chunk_view)?;
let mut output_slice = output.slice_each_axis_mut(|ax| {
let range = &chunk_info.output_ranges[ax.axis.index()];
Slice::from(range.start..range.end)
});
let result_slice = chunk_result.slice_each_axis(|ax| {
let input_range = &chunk_info.ranges[ax.axis.index()];
let output_range = &chunk_info.output_ranges[ax.axis.index()];
let offset = output_range.start - input_range.start;
let size = output_range.end - output_range.start;
Slice::from(offset..offset + size)
});
output_slice.assign(&result_slice);
continue;
}
let chunk_result = op.apply_chunk_gpu(&chunk_view, gpucontext.as_ref())?;
if overlap.iter().any(|&x| x > 0) {
let overlap_info = OverlapInfo {
dimension: 0, output_start: chunk_info.output_ranges[0].start,
output_end: chunk_info.output_ranges[0].end,
overlap_size: overlap[0],
};
let mut output_slice = output.slice_each_axis_mut(|ax| {
let range = &chunk_info.output_ranges[ax.axis.index()];
Slice::from(range.start..range.end)
});
let result_slice = chunk_result.slice_each_axis(|ax| {
let input_range = &chunk_info.ranges[ax.axis.index()];
let output_range = &chunk_info.output_ranges[ax.axis.index()];
let offset = output_range.start - input_range.start;
let size = output_range.end - output_range.start;
Slice::from(offset..offset + size)
});
op.merge_overlap(&mut output_slice, &result_slice, &overlap_info)?;
} else {
let mut output_slice = output.slice_each_axis_mut(|ax| {
let range = &chunk_info.output_ranges[ax.axis.index()];
Slice::from(range.start..range.end)
});
output_slice.assign(&chunk_result);
}
}
Ok(output)
}
}
#[allow(dead_code)]
struct PerformanceMonitor {
history: Vec<PerformanceMetrics>,
}
impl PerformanceMonitor {
fn new() -> Self {
Self {
history: Vec::new(),
}
}
fn adjust_config(
&mut self,
current: &StreamConfig,
avg_time: std::time::Duration,
) -> StreamConfig {
let mut new_config = current.clone();
if avg_time.as_millis() < 100 {
new_config.chunk_size = (current.chunk_size as f64 * 1.2) as usize;
} else if avg_time.as_millis() > 1000 {
new_config.chunk_size = (current.chunk_size as f64 * 0.8) as usize;
}
new_config.chunk_size = new_config.chunk_size.max(64 * 1024);
self.history.push(PerformanceMetrics {
chunk_size: current.chunk_size,
processing_time: avg_time,
timestamp: std::time::Instant::now(),
});
new_config
}
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
struct PerformanceMetrics {
chunk_size: usize,
processing_time: std::time::Duration,
timestamp: std::time::Instant,
}
#[allow(dead_code)]
struct MemoryManager {
available_memory: usize,
cache_sizes: [usize; 3], }
impl MemoryManager {
fn new() -> Self {
Self {
available_memory: Self::detect_available_memory(),
cache_sizes: Self::detect_cache_sizes(),
}
}
fn calculate_optimal_chunk_size(
&self,
element_size: usize,
shape: &[usize],
complexity: OperationComplexity,
) -> usize {
let total_elements: usize = shape.iter().product();
let _total_bytes = total_elements * element_size;
let memory_fraction = match complexity {
OperationComplexity::Low => 0.1,
OperationComplexity::Medium => 0.05,
OperationComplexity::High => 0.02,
};
let target_size = (self.available_memory as f64 * memory_fraction) as usize;
if complexity == OperationComplexity::Low {
target_size.min(self.cache_sizes[2])
} else {
target_size
}
.max(64 * 1024) }
fn detect_available_memory() -> usize {
1_000_000_000 }
fn detect_cache_sizes() -> [usize; 3] {
[32 * 1024, 256 * 1024, 8 * 1024 * 1024] }
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OperationComplexity {
Low, Medium, High, }
#[allow(dead_code)]
pub trait AdaptiveOperation<T, D>: StreamableOp<T, D>
where
T: Float + FromPrimitive + Debug + Clone,
D: Dimension,
{
fn estimate_complexity(&self, shape: &[usize]) -> OperationComplexity;
fn suggest_overlap(&self, _chunkdims: &[usize]) -> Vec<usize> {
self.required_overlap()
}
fn supports_parallel_chunks(&self) -> bool {
true
}
}
#[cfg(feature = "gpu")]
#[allow(dead_code)]
pub trait GpuStreamableOp<T, D>: StreamableOp<T, D>
where
T: Float + FromPrimitive + Debug + Clone,
D: Dimension,
{
fn apply_chunk_gpu(
&self,
chunk: &ArrayView<T, D>,
gpucontext: &dyn crate::backend::GpuContext,
) -> NdimageResult<Array<T, D>>;
fn is_gpu_suitable(&self, chunkshape: &[usize]) -> bool;
fn gpu_memory_requirement(&self, chunkshape: &[usize]) -> usize;
}
#[cfg(feature = "gpu")]
#[allow(dead_code)]
pub struct GpuContext {
device_id: u32,
memory_pool: Option<*mut u8>,
}
#[cfg(feature = "gpu")]
impl GpuContext {
pub fn new() -> NdimageResult<Self> {
Ok(Self {
device_id: 0, memory_pool: None,
})
}
pub fn device_id(&self) -> u32 {
self.device_id
}
pub fn allocate_memory(&mut self, size: usize) -> NdimageResult<*mut u8> {
Ok(std::ptr::null_mut())
}
pub fn free_memory(&mut self, ptr: *mut u8) -> NdimageResult<()> {
Ok(())
}
}
#[cfg(feature = "compression")]
struct GzipWriteAdapter {
file: File,
buffer: Vec<u8>,
level: u8,
}
#[cfg(feature = "compression")]
impl GzipWriteAdapter {
fn new(file: File, level: u8) -> Self {
Self {
file,
buffer: Vec::new(),
level,
}
}
}
#[cfg(feature = "compression")]
impl Write for GzipWriteAdapter {
fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
self.buffer.extend_from_slice(data);
Ok(data.len())
}
fn flush(&mut self) -> std::io::Result<()> {
if !self.buffer.is_empty() {
let compressed = oxiarc_deflate::gzip_compress(&self.buffer, self.level)
.map_err(|e| std::io::Error::other(e.to_string()))?;
self.file.write_all(&compressed)?;
self.file.flush()?;
self.buffer.clear();
}
Ok(())
}
}
#[cfg(feature = "compression")]
impl Drop for GzipWriteAdapter {
fn drop(&mut self) {
let _ = self.flush();
}
}
#[cfg(feature = "compression")]
struct Lz4WriteAdapter {
file: File,
buffer: Vec<u8>,
}
#[cfg(feature = "compression")]
impl Lz4WriteAdapter {
fn new(file: File) -> Self {
Self {
file,
buffer: Vec::new(),
}
}
}
#[cfg(feature = "compression")]
impl Write for Lz4WriteAdapter {
fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
self.buffer.extend_from_slice(data);
Ok(data.len())
}
fn flush(&mut self) -> std::io::Result<()> {
if !self.buffer.is_empty() {
let compressed = oxiarc_lz4::compress(&self.buffer)
.map_err(|e| std::io::Error::other(e.to_string()))?;
self.file.write_all(&compressed)?;
self.file.flush()?;
self.buffer.clear();
}
Ok(())
}
}
#[cfg(feature = "compression")]
impl Drop for Lz4WriteAdapter {
fn drop(&mut self) {
let _ = self.flush();
}
}
#[cfg(feature = "compression")]
struct ZstdWriteAdapter {
file: File,
buffer: Vec<u8>,
level: i32,
}
#[cfg(feature = "compression")]
impl ZstdWriteAdapter {
fn new(file: File, level: i32) -> Self {
Self {
file,
buffer: Vec::new(),
level,
}
}
}
#[cfg(feature = "compression")]
impl Write for ZstdWriteAdapter {
fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
self.buffer.extend_from_slice(data);
Ok(data.len())
}
fn flush(&mut self) -> std::io::Result<()> {
if !self.buffer.is_empty() {
let compressed = oxiarc_zstd::compress_with_level(&self.buffer, self.level)
.map_err(|e| std::io::Error::other(e.to_string()))?;
self.file.write_all(&compressed)?;
self.file.flush()?;
self.buffer.clear();
}
Ok(())
}
}
#[cfg(feature = "compression")]
impl Drop for ZstdWriteAdapter {
fn drop(&mut self) {
let _ = self.flush();
}
}
#[allow(dead_code)]
pub fn stream_process_file_compressed<T>(
input_path: &std::path::Path,
output_path: &std::path::Path,
shape: &[usize],
compression: CompressionType,
config: StreamConfig,
) -> NdimageResult<()>
where
T: Float + FromPrimitive + Debug + Clone + Send + Sync + 'static,
{
let input_file = File::open(input_path)
.map_err(|e| NdimageError::ComputationError(format!("Failed to open input file: {}", e)))?;
let mut input_reader: Box<dyn Read> = match compression {
CompressionType::None => Box::new(BufReader::new(input_file)),
CompressionType::Gzip => {
#[cfg(feature = "compression")]
{
let mut compressed_data = Vec::new();
std::io::Read::read_to_end(&mut BufReader::new(input_file), &mut compressed_data)
.map_err(|e| {
NdimageError::ComputationError(format!("Failed to read input: {}", e))
})?;
let decompressed =
oxiarc_deflate::gzip_decompress(&compressed_data).map_err(|e| {
NdimageError::ComputationError(format!("Failed to decompress gzip: {}", e))
})?;
Box::new(std::io::Cursor::new(decompressed))
}
#[cfg(not(feature = "compression"))]
return Err(NdimageError::InvalidInput(
"Gzip compression support not enabled".into(),
));
}
CompressionType::Lz4 => {
#[cfg(feature = "compression")]
{
let mut compressed_data = Vec::new();
std::io::Read::read_to_end(&mut BufReader::new(input_file), &mut compressed_data)
.map_err(|e| {
NdimageError::ComputationError(format!("Failed to read input: {}", e))
})?;
let decompressed = oxiarc_lz4::decompress(&compressed_data, 256 * 1024 * 1024)
.map_err(|e| {
NdimageError::ComputationError(format!("Failed to decompress LZ4: {}", e))
})?;
Box::new(std::io::Cursor::new(decompressed))
}
#[cfg(not(feature = "compression"))]
return Err(NdimageError::InvalidInput(
"LZ4 compression support not enabled".into(),
));
}
CompressionType::Zstd => {
#[cfg(feature = "compression")]
{
let mut compressed_data = Vec::new();
std::io::Read::read_to_end(&mut BufReader::new(input_file), &mut compressed_data)
.map_err(|e| {
NdimageError::ComputationError(format!("Failed to read input: {}", e))
})?;
let decompressed = oxiarc_zstd::decompress(&compressed_data).map_err(|e| {
NdimageError::ComputationError(format!("Failed to decompress Zstd: {}", e))
})?;
Box::new(std::io::Cursor::new(decompressed))
}
#[cfg(not(feature = "compression"))]
return Err(NdimageError::InvalidInput(
"Zstd compression support not enabled".into(),
));
}
};
let output_file = File::create(output_path).map_err(|e| {
NdimageError::ComputationError(format!("Failed to create output file: {}", e))
})?;
let mut output_writer: Box<dyn Write> = match compression {
CompressionType::None => Box::new(BufWriter::new(output_file)),
CompressionType::Gzip => {
#[cfg(feature = "compression")]
{
Box::new(GzipWriteAdapter::new(output_file, 6))
}
#[cfg(not(feature = "compression"))]
return Err(NdimageError::InvalidInput(
"Gzip compression support not enabled".into(),
));
}
CompressionType::Lz4 => {
#[cfg(feature = "compression")]
{
Box::new(Lz4WriteAdapter::new(output_file))
}
#[cfg(not(feature = "compression"))]
return Err(NdimageError::InvalidInput(
"LZ4 compression support not enabled".into(),
));
}
CompressionType::Zstd => {
#[cfg(feature = "compression")]
{
Box::new(ZstdWriteAdapter::new(output_file, 3))
}
#[cfg(not(feature = "compression"))]
return Err(NdimageError::InvalidInput(
"Zstd compression support not enabled".into(),
));
}
};
let element_size = std::mem::size_of::<T>();
let total_elements: usize = shape.iter().product();
let chunk_elements = config.chunk_size / element_size;
let mut elements_processed = 0;
while elements_processed < total_elements {
let chunk_size = (total_elements - elements_processed).min(chunk_elements);
let mut chunk_data = vec![0u8; chunk_size * element_size];
input_reader
.read_exact(&mut chunk_data)
.map_err(|e| NdimageError::ComputationError(format!("Failed to read chunk: {}", e)))?;
output_writer
.write_all(&chunk_data)
.map_err(|e| NdimageError::ComputationError(format!("Failed to write chunk: {}", e)))?;
elements_processed += chunk_size;
}
output_writer
.flush()
.map_err(|e| NdimageError::ComputationError(format!("Failed to flush output: {}", e)))?;
Ok(())
}
#[derive(Debug, Clone, Copy)]
#[allow(dead_code)]
pub enum CompressionType {
None,
Lz4,
Zstd,
Gzip,
}
#[allow(dead_code)]
pub trait BatchStreamableOp<T, D>: Send + Sync
where
T: Float + FromPrimitive + Debug + Clone,
D: Dimension,
{
fn apply_batch(&self, chunks: &[ArrayView<T, D>]) -> NdimageResult<Vec<Array<T, D>>>;
fn required_batch_overlap(&self) -> Vec<usize>;
}