use crate::compression::{compress_data, decompress_data, CompressionAlgorithm};
use crate::error::{IoError, Result};
use byteorder::{ByteOrder, LittleEndian};
use memmap2::{Mmap, MmapMut, MmapOptions};
use scirs2_core::ndarray::{Array, ArrayView, Dimension, IxDyn};
use scirs2_core::numeric::ScientificNumber;
use statrs::statistics::Statistics;
use std::collections::{HashMap, VecDeque};
use std::fs::{File, OpenOptions};
use std::io::{BufReader, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
#[derive(Debug, Clone)]
pub struct OutOfCoreConfig {
pub chunk_size: usize,
pub cache_size_bytes: usize,
pub compression: Option<CompressionAlgorithm>,
pub write_through: bool,
pub temp_dir: Option<PathBuf>,
}
impl Default for OutOfCoreConfig {
fn default() -> Self {
Self {
chunk_size: 1024 * 1024, cache_size_bytes: 1024 * 1024 * 1024, compression: None,
write_through: true,
temp_dir: None,
}
}
}
#[derive(Debug, Clone)]
struct ArrayMetadata {
shape: Vec<usize>,
#[allow(dead_code)]
dtype: String,
element_size: usize,
chunkshape: Vec<usize>,
compression: Option<CompressionAlgorithm>,
num_chunks: usize,
chunk_offsets: Vec<u64>,
chunk_sizes: Vec<usize>,
}
pub struct OutOfCoreArray<T> {
file_path: PathBuf,
metadata: ArrayMetadata,
mmap: Option<Mmap>,
#[allow(dead_code)]
mmap_mut: Option<MmapMut>,
config: OutOfCoreConfig,
cache: Arc<RwLock<ChunkCache<T>>>,
_phantom: std::marker::PhantomData<T>,
}
struct ChunkCache<T> {
max_size_bytes: usize,
current_size_bytes: usize,
chunks: HashMap<usize, CachedChunk<T>>,
lru_queue: VecDeque<usize>,
}
struct CachedChunk<T> {
data: Vec<T>,
dirty: bool,
#[allow(dead_code)]
access_count: usize,
}
impl<T: ScientificNumber + Clone> OutOfCoreArray<T> {
pub fn create<P: AsRef<Path>>(path: P, shape: &[usize]) -> Result<Self> {
Self::create_with_config(path, shape, OutOfCoreConfig::default())
}
pub fn create_with_config<P: AsRef<Path>>(
path: P,
shape: &[usize],
config: OutOfCoreConfig,
) -> Result<Self> {
let file_path = path.as_ref().to_path_buf();
let chunkshape = Self::calculate_chunkshape(shape, config.chunk_size);
let chunks_per_dim: Vec<_> = shape
.iter()
.zip(&chunkshape)
.map(|(&dim, &chunk)| (dim + chunk - 1) / chunk)
.collect();
let num_chunks = chunks_per_dim.iter().product();
let metadata = ArrayMetadata {
shape: shape.to_vec(),
dtype: std::any::type_name::<T>().to_string(),
element_size: std::mem::size_of::<T>(),
chunkshape,
compression: config.compression,
num_chunks,
chunk_offsets: vec![0; num_chunks],
chunk_sizes: vec![0; num_chunks],
};
let mut file = File::create(&file_path)
.map_err(|e| IoError::FileError(format!("Failed to create file: {e}")))?;
Self::write_metadata(&mut file, &metadata)?;
if config.compression.is_none() {
let total_size = shape.iter().product::<usize>() * std::mem::size_of::<T>();
file.set_len((Self::metadata_size() + total_size) as u64)
.map_err(|e| IoError::FileError(format!("Failed to set file size: {e}")))?;
}
let cache = Arc::new(RwLock::new(ChunkCache::<T> {
max_size_bytes: config.cache_size_bytes,
current_size_bytes: 0,
chunks: HashMap::new(),
lru_queue: VecDeque::new(),
}));
Ok(Self {
file_path,
metadata,
mmap: None,
mmap_mut: None,
config,
cache: Arc::new(std::sync::RwLock::new(ChunkCache::<T> {
max_size_bytes: 128 * 1024 * 1024, current_size_bytes: 0,
chunks: HashMap::new(),
lru_queue: VecDeque::new(),
})),
_phantom: std::marker::PhantomData,
})
}
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
Self::open_with_config(path, OutOfCoreConfig::default())
}
pub fn open_with_config<P: AsRef<Path>>(path: P, config: OutOfCoreConfig) -> Result<Self> {
let file_path = path.as_ref().to_path_buf();
let mut file = File::open(&file_path)
.map_err(|_| IoError::FileNotFound(file_path.to_string_lossy().to_string()))?;
let metadata = Self::read_metadata(&mut file)?;
let mmap = unsafe {
MmapOptions::new()
.map(&file)
.map_err(|e| IoError::ParseError(format!("Failed to create memory map: {e}")))?
};
let cache = Arc::new(RwLock::new(ChunkCache::<T> {
max_size_bytes: config.cache_size_bytes,
current_size_bytes: 0,
chunks: HashMap::new(),
lru_queue: VecDeque::new(),
}));
Ok(Self {
file_path,
metadata,
mmap: Some(mmap),
mmap_mut: None,
config,
cache,
_phantom: std::marker::PhantomData,
})
}
pub fn shape(&self) -> &[usize] {
&self.metadata.shape
}
pub fn len(&self) -> usize {
self.metadata.shape.iter().product()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
fn calculate_chunkshape(shape: &[usize], targetsize: usize) -> Vec<usize> {
let ndim = shape.len();
let elements_per_dim = (targetsize as f64).powf(1.0 / ndim as f64) as usize;
shape
.iter()
.map(|&dim| dim.min(elements_per_dim.max(1)))
.collect()
}
fn metadata_size() -> usize {
4096 }
fn write_metadata(file: &mut File, metadata: &ArrayMetadata) -> Result<()> {
let mut buffer = vec![0u8; Self::metadata_size()];
let mut cursor = 0;
buffer[0..8].copy_from_slice(b"OOCARRAY");
cursor += 8;
LittleEndian::write_u32(&mut buffer[cursor..], 1);
cursor += 4;
LittleEndian::write_u32(&mut buffer[cursor..], metadata.shape.len() as u32);
cursor += 4;
for &dim in &metadata.shape {
LittleEndian::write_u64(&mut buffer[cursor..], dim as u64);
cursor += 8;
}
LittleEndian::write_u32(&mut buffer[cursor..], metadata.element_size as u32);
cursor += 4;
for &dim in &metadata.chunkshape {
LittleEndian::write_u64(&mut buffer[cursor..], dim as u64);
cursor += 8;
}
let compression_id = match metadata.compression {
None => 0,
Some(CompressionAlgorithm::Gzip) => 1,
Some(CompressionAlgorithm::Zstd) => 2,
Some(CompressionAlgorithm::Lz4) => 3,
Some(CompressionAlgorithm::Bzip2) => 4,
Some(CompressionAlgorithm::Brotli) => 5,
Some(CompressionAlgorithm::Snappy) => 6,
Some(CompressionAlgorithm::FpZip) => 7,
Some(CompressionAlgorithm::DeltaLz4) => 8,
};
buffer[cursor] = compression_id;
file.write_all(&buffer)
.map_err(|e| IoError::FileError(format!("Failed to write metadata: {e}")))
}
fn read_metadata(file: &mut File) -> Result<ArrayMetadata> {
let mut buffer = vec![0u8; Self::metadata_size()];
file.read_exact(&mut buffer)
.map_err(|e| IoError::ParseError(format!("Failed to read metadata: {e}")))?;
let mut cursor = 0;
if &buffer[0..8] != b"OOCARRAY" {
return Err(IoError::ParseError("Invalid _file format".to_string()));
}
cursor += 8;
let version = LittleEndian::read_u32(&buffer[cursor..]);
if version != 1 {
return Err(IoError::ParseError(format!(
"Unsupported version: {}",
version
)));
}
cursor += 4;
let ndim = LittleEndian::read_u32(&buffer[cursor..]) as usize;
cursor += 4;
let mut shape = Vec::with_capacity(ndim);
for _ in 0..ndim {
shape.push(LittleEndian::read_u64(&buffer[cursor..]) as usize);
cursor += 8;
}
let element_size = LittleEndian::read_u32(&buffer[cursor..]) as usize;
cursor += 4;
let mut chunkshape = Vec::with_capacity(ndim);
for _ in 0..ndim {
chunkshape.push(LittleEndian::read_u64(&buffer[cursor..]) as usize);
cursor += 8;
}
let compression = match buffer[cursor] {
0 => None,
1 => Some(CompressionAlgorithm::Gzip),
2 => Some(CompressionAlgorithm::Zstd),
3 => Some(CompressionAlgorithm::Lz4),
4 => Some(CompressionAlgorithm::Bzip2),
_ => return Err(IoError::ParseError("Invalid compression type".to_string())),
};
let chunks_per_dim: Vec<_> = shape
.iter()
.zip(&chunkshape)
.map(|(&dim, &chunk)| (dim + chunk - 1) / chunk)
.collect();
let num_chunks = chunks_per_dim.iter().product();
Ok(ArrayMetadata {
shape,
dtype: String::new(), element_size,
chunkshape,
compression,
num_chunks,
chunk_offsets: vec![0; num_chunks],
chunk_sizes: vec![0; num_chunks],
})
}
fn get_chunk(&self, chunkid: usize) -> Result<Vec<T>> {
{
let cache = self.cache.read().expect("Operation failed");
if let Some(cached) = cache.chunks.get(&chunkid) {
return Ok(cached.data.clone());
}
}
let data = self.read_chunk_from_disk(chunkid)?;
{
let mut cache = self.cache.write().expect("Operation failed");
self.update_cache(&mut cache, chunkid, data.clone());
}
Ok(data)
}
fn read_chunk_from_disk(&self, chunkid: usize) -> Result<Vec<T>> {
if let Some(ref mmap) = self.mmap {
let chunk_size = self.metadata.chunkshape.iter().product::<usize>();
let offset = Self::metadata_size() + chunkid * chunk_size * self.metadata.element_size;
if let Some(compression) = self.metadata.compression {
let compressed_size = self.metadata.chunk_sizes[chunkid];
let compressed_offset = self.metadata.chunk_offsets[chunkid];
if compressed_size == 0 {
let chunk_size = self.metadata.chunkshape.iter().product::<usize>();
return Ok(vec![T::zero(); chunk_size]);
}
let compressed_data = &mmap
[compressed_offset as usize..(compressed_offset as usize + compressed_size)];
let decompressed_data =
decompress_data(compressed_data, compression).map_err(|e| {
IoError::ParseError(format!("Failed to decompress chunk: {}", e))
})?;
let chunk_size = self.metadata.chunkshape.iter().product::<usize>();
let mut data = Vec::with_capacity(chunk_size);
for i in 0..chunk_size {
let start = i * self.metadata.element_size;
let end = start + self.metadata.element_size;
if end <= decompressed_data.len() {
let value = T::from_le_bytes(&decompressed_data[start..end]);
data.push(value);
} else {
break;
}
}
Ok(data)
} else {
let bytes = &mmap[offset..offset + chunk_size * self.metadata.element_size];
let mut data = Vec::with_capacity(chunk_size);
for i in 0..chunk_size {
let start = i * self.metadata.element_size;
let end = start + self.metadata.element_size;
let value = T::from_le_bytes(&bytes[start..end]);
data.push(value);
}
Ok(data)
}
} else {
Err(IoError::ParseError(
"Array not opened for reading".to_string(),
))
}
}
fn write_chunk_to_disk(&self, chunkid: usize, data: &[T]) -> Result<()> {
let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&self.file_path)
.map_err(|e| IoError::FileError(format!("Failed to open file for writing: {}", e)))?;
let mut chunk_bytes = Vec::new();
for value in data {
value.write_le(&mut chunk_bytes)?;
}
if let Some(compression) = self.metadata.compression {
let compressed_data = compress_data(&chunk_bytes, compression, None)
.map_err(|e| IoError::FileError(format!("Failed to compress chunk: {}", e)))?;
let offset = if self.metadata.chunk_offsets[chunkid] == 0 {
file.seek(SeekFrom::End(0))
.map_err(|e| IoError::FileError(format!("Failed to seek to end: {}", e)))?
} else {
self.metadata.chunk_offsets[chunkid]
};
file.seek(SeekFrom::Start(offset))
.map_err(|e| IoError::FileError(format!("Failed to seek: {}", e)))?;
file.write_all(&compressed_data).map_err(|e| {
IoError::FileError(format!("Failed to write compressed data: {}", e))
})?;
} else {
let chunk_size = self.metadata.chunkshape.iter().product::<usize>();
let offset = Self::metadata_size() + chunkid * chunk_size * self.metadata.element_size;
file.seek(SeekFrom::Start(offset as u64))
.map_err(|e| IoError::FileError(format!("Failed to seek: {}", e)))?;
file.write_all(&chunk_bytes)
.map_err(|e| IoError::FileError(format!("Failed to write data: {}", e)))?;
}
file.sync_all()
.map_err(|e| IoError::FileError(format!("Failed to sync file: {}", e)))?;
Ok(())
}
fn update_cache(&self, cache: &mut ChunkCache<T>, chunkid: usize, data: Vec<T>) {
let chunk_size_bytes = data.len() * std::mem::size_of::<T>();
while cache.current_size_bytes + chunk_size_bytes > cache.max_size_bytes
&& !cache.lru_queue.is_empty()
{
if let Some(evict_id) = cache.lru_queue.pop_front() {
if let Some(evicted) = cache.chunks.remove(&evict_id) {
cache.current_size_bytes -= evicted.data.len() * std::mem::size_of::<T>();
if evicted.dirty && self.config.write_through {
if let Err(e) = self.write_chunk_to_disk(evict_id, &evicted.data) {
eprintln!(
"Warning: Failed to write back dirty chunk {}: {}",
evict_id, e
);
}
}
}
}
}
cache.chunks.insert(
chunkid,
CachedChunk {
data,
dirty: false,
access_count: 1,
},
);
cache.lru_queue.push_back(chunkid);
cache.current_size_bytes += chunk_size_bytes;
}
pub fn process_chunks<F, R>(&self, chunk_size: usize, processor: F) -> Result<Vec<R>>
where
F: Fn(ArrayView<T, IxDyn>) -> Result<R>,
R: Send,
{
let total_elements = self.len();
let num_chunks = (total_elements + chunk_size - 1) / chunk_size;
let mut results = Vec::with_capacity(num_chunks);
for chunk_id in 0..self.metadata.num_chunks {
let chunk_data = self.get_chunk(chunk_id)?;
let chunkshape = self.get_chunkshape(chunk_id);
let array_view = ArrayView::from_shape(chunkshape, &chunk_data)
.map_err(|e| IoError::ParseError(format!("Failed to create array view: {}", e)))?;
let result = processor(array_view)?;
results.push(result);
}
Ok(results)
}
fn get_chunkshape(&self, chunkid: usize) -> IxDyn {
let mut chunk_coords = Vec::with_capacity(self.metadata.shape.len());
let mut temp_id = chunkid;
let chunks_per_dim: Vec<_> = self
.metadata
.shape
.iter()
.zip(&self.metadata.chunkshape)
.map(|(&dim, &chunk)| (dim + chunk - 1) / chunk)
.collect();
for &chunks in chunks_per_dim.iter().rev() {
chunk_coords.push(temp_id % chunks);
temp_id /= chunks;
}
chunk_coords.reverse();
let chunkshape: Vec<_> = chunk_coords
.iter()
.zip(&self.metadata.shape)
.zip(&self.metadata.chunkshape)
.map(|((&coord, &dim), &chunk_dim)| {
let start = coord * chunk_dim;
let end = ((coord + 1) * chunk_dim).min(dim);
end - start
})
.collect();
IxDyn(&chunkshape)
}
pub fn view_window(&self, start: &[usize], shape: &[usize]) -> Result<Array<T, IxDyn>> {
if start.len() != self.metadata.shape.len() || shape.len() != self.metadata.shape.len() {
return Err(IoError::ParseError("Invalid window dimensions".to_string()));
}
for i in 0..start.len() {
if start[i] + shape[i] > self.metadata.shape[i] {
return Err(IoError::ParseError(
"Window extends beyond array bounds".to_string(),
));
}
}
let mut result = Array::zeros(IxDyn(shape));
let start_chunks: Vec<_> = start
.iter()
.zip(&self.metadata.chunkshape)
.map(|(&s, &chunk)| s / chunk)
.collect();
let end_chunks: Vec<_> = start
.iter()
.zip(shape)
.zip(&self.metadata.chunkshape)
.map(|((&s, &sz), &chunk)| (s + sz - 1) / chunk)
.collect();
self.copy_chunks_to_window(start, &start_chunks, &end_chunks, &mut result)?;
Ok(result)
}
fn copy_chunks_to_window(
&self,
window_start: &[usize],
start_chunks: &[usize],
end_chunks: &[usize],
result: &mut Array<T, IxDyn>,
) -> Result<()> {
let mut chunk_coords = start_chunks.to_vec();
loop {
let chunk_id = self.coords_to_chunk_id(&chunk_coords);
let chunk_data = self.get_chunk(chunk_id)?;
let chunkshape = self.get_chunkshape(chunk_id);
let chunk_start: Vec<_> = chunk_coords
.iter()
.zip(&self.metadata.chunkshape)
.map(|(&coord, &size)| coord * size)
.collect();
let overlap_start: Vec<_> = chunk_start
.iter()
.zip(window_start)
.map(|(&chunk_s, &win_s)| chunk_s.max(win_s))
.collect();
let overlap_end: Vec<_> = chunk_start
.iter()
.zip(chunkshape.slice())
.zip(window_start)
.zip(result.shape())
.map(|(((chunk_s, chunk_sz), win_s), win_sz)| {
(chunk_s + chunk_sz).min(win_s + win_sz)
})
.collect();
if overlap_start.iter().zip(&overlap_end).all(|(s, e)| s < e) {
let chunksrc_start: Vec<_> = overlap_start
.iter()
.zip(&chunk_start)
.map(|(overlap, chunk)| overlap - chunk)
.collect();
let chunksrc_end: Vec<_> = overlap_end
.iter()
.zip(&chunk_start)
.map(|(overlap, chunk)| overlap - chunk)
.collect();
let result_dst_start: Vec<_> = overlap_start
.iter()
.zip(window_start)
.map(|(overlap, win)| overlap - win)
.collect();
let result_dst_end: Vec<_> = overlap_end
.iter()
.zip(window_start)
.map(|(overlap, win)| overlap - win)
.collect();
self.copy_chunk_region(
&chunk_data,
chunkshape.slice(),
&chunksrc_start,
&chunksrc_end,
result,
&result_dst_start,
&result_dst_end,
)?;
}
if !self.increment_chunk_coords(&mut chunk_coords, start_chunks, end_chunks) {
break;
}
}
Ok(())
}
fn coords_to_chunk_id(&self, coords: &[usize]) -> usize {
let chunks_per_dim: Vec<_> = self
.metadata
.shape
.iter()
.zip(&self.metadata.chunkshape)
.map(|(&dim, &chunk)| (dim + chunk - 1) / chunk)
.collect();
let mut chunk_id = 0;
let mut multiplier = 1;
for (_i, (&coord, &chunks_in_dim)) in coords.iter().zip(&chunks_per_dim).enumerate().rev() {
chunk_id += coord * multiplier;
multiplier *= chunks_in_dim;
}
chunk_id
}
fn copy_chunk_region(
&self,
chunk_data: &[T],
chunkshape: &[usize],
src_start: &[usize],
src_end: &[usize],
result: &mut Array<T, IxDyn>,
dst_start: &[usize],
_dst_end: &[usize],
) -> Result<()> {
match chunkshape.len() {
1 => {
let src_len = src_end[0] - src_start[0];
for i in 0..src_len {
let src_idx = src_start[0] + i;
let dst_idx = dst_start[0] + i;
result[[dst_idx]] = chunk_data[src_idx];
}
}
2 => {
for i in 0..(src_end[0] - src_start[0]) {
for j in 0..(src_end[1] - src_start[1]) {
let src_idx = (src_start[0] + i) * chunkshape[1] + (src_start[1] + j);
let dst_idx = [dst_start[0] + i, dst_start[1] + j];
result[&dst_idx[..]] = chunk_data[src_idx];
}
}
}
_ => {
return Err(IoError::ParseError(
"High dimensional copying not yet implemented".to_string(),
));
}
}
Ok(())
}
fn increment_chunk_coords(
&self,
coords: &mut [usize],
start_chunks: &[usize],
end_chunks: &[usize],
) -> bool {
for i in (0..coords.len()).rev() {
coords[i] += 1;
if coords[i] <= end_chunks[i] {
return true;
}
coords[i] = start_chunks[i];
}
false
}
pub fn write_window(&mut self, start: &[usize], data: &ArrayView<T, IxDyn>) -> Result<()> {
if start.len() != self.metadata.shape.len() || data.ndim() != self.metadata.shape.len() {
return Err(IoError::FileError("Invalid window dimensions".to_string()));
}
for (i, &start_val) in start.iter().enumerate() {
if start_val + data.shape()[i] > self.metadata.shape[i] {
return Err(IoError::FileError(
"Window extends beyond array bounds".to_string(),
));
}
}
let start_chunks: Vec<_> = start
.iter()
.zip(&self.metadata.chunkshape)
.map(|(&s, &chunk)| s / chunk)
.collect();
let end_chunks: Vec<_> = start
.iter()
.zip(data.shape())
.zip(&self.metadata.chunkshape)
.map(|((&s, &sz), &chunk)| (s + sz - 1) / chunk)
.collect();
let mut chunk_coords = start_chunks.clone();
loop {
let chunk_id = self.coords_to_chunk_id(&chunk_coords);
let mut chunk_data = self.get_chunk(chunk_id)?;
let chunkshape = self.get_chunkshape(chunk_id);
let chunk_start: Vec<_> = chunk_coords
.iter()
.zip(&self.metadata.chunkshape)
.map(|(&coord, &size)| coord * size)
.collect();
let overlap_start: Vec<_> = chunk_start
.iter()
.zip(start)
.map(|(&chunk_s, &win_s)| chunk_s.max(win_s))
.collect();
let overlap_end: Vec<_> = chunk_start
.iter()
.zip(chunkshape.slice())
.zip(start)
.zip(data.shape())
.map(|(((chunk_s, chunk_sz), win_s), win_sz)| {
(chunk_s + chunk_sz).min(win_s + win_sz)
})
.collect();
if overlap_start.iter().zip(&overlap_end).all(|(s, e)| s < e) {
self.write_to_chunk_region(
&mut chunk_data,
chunkshape.slice(),
&chunk_start,
&overlap_start,
&overlap_end,
data,
start,
)?;
{
let mut cache = self.cache.write().expect("Operation failed");
if let Some(cached_chunk) = cache.chunks.get_mut(&chunk_id) {
cached_chunk.data = chunk_data;
cached_chunk.dirty = true;
} else {
let chunk_size_bytes = chunk_data.len() * std::mem::size_of::<T>();
cache.chunks.insert(
chunk_id,
CachedChunk {
data: chunk_data,
dirty: true,
access_count: 1,
},
);
cache.current_size_bytes += chunk_size_bytes;
cache.lru_queue.push_back(chunk_id);
}
}
}
if !self.increment_chunk_coords(&mut chunk_coords, &start_chunks, &end_chunks) {
break;
}
}
Ok(())
}
fn write_to_chunk_region(
&self,
chunk_data: &mut [T],
chunkshape: &[usize],
chunk_start: &[usize],
overlap_start: &[usize],
overlap_end: &[usize],
source_data: &ArrayView<T, IxDyn>,
source_start: &[usize],
) -> Result<()> {
let chunk_local_start: Vec<_> = overlap_start
.iter()
.zip(chunk_start)
.map(|(overlap, chunk)| overlap - chunk)
.collect();
let source_local_start: Vec<_> = overlap_start
.iter()
.zip(source_start)
.map(|(overlap, source)| overlap - source)
.collect();
match chunkshape.len() {
1 => {
let len = overlap_end[0] - overlap_start[0];
for i in 0..len {
let chunk_idx = chunk_local_start[0] + i;
let source_idx = [source_local_start[0] + i];
chunk_data[chunk_idx] = source_data[&source_idx[..]];
}
}
2 => {
for i in 0..(overlap_end[0] - overlap_start[0]) {
for j in 0..(overlap_end[1] - overlap_start[1]) {
let chunk_idx =
(chunk_local_start[0] + i) * chunkshape[1] + (chunk_local_start[1] + j);
let source_idx = [source_local_start[0] + i, source_local_start[1] + j];
chunk_data[chunk_idx] = source_data[&source_idx[..]];
}
}
}
_ => {
return Err(IoError::ParseError(
"High dimensional writing not yet implemented".to_string(),
));
}
}
Ok(())
}
pub fn flush(&mut self) -> Result<()> {
let cache = self.cache.write().expect("Operation failed");
for (&chunk_id, chunk) in &cache.chunks {
if chunk.dirty {
self.write_chunk_to_disk(chunk_id, &chunk.data)?;
}
}
Ok(())
}
}
pub trait ChunkProcessor<T> {
fn process(&mut self, chunk: ArrayView<T, IxDyn>) -> Result<()>;
fn finalize(self) -> Result<()>;
}
pub struct OutOfCoreSorter<T> {
temp_dir: PathBuf,
chunk_size: usize,
chunk_files: Vec<PathBuf>,
_phantom: std::marker::PhantomData<T>,
}
impl<T: ScientificNumber + Ord + Clone> OutOfCoreSorter<T> {
pub fn new(temp_dir: PathBuf, chunk_size: usize) -> Result<Self> {
std::fs::create_dir_all(&temp_dir)
.map_err(|e| IoError::FileError(format!("Failed to create temp dir: {}", e)))?;
Ok(Self {
temp_dir,
chunk_size,
chunk_files: Vec::new(),
_phantom: std::marker::PhantomData,
})
}
pub fn add_data(&mut self, data: &[T]) -> Result<()> {
for chunk in data.chunks(self.chunk_size) {
let mut sorted_chunk = chunk.to_vec();
sorted_chunk.sort();
let chunk_file = self
.temp_dir
.join(format!("chunk_{}.tmp", self.chunk_files.len()));
let mut file = File::create(&chunk_file)
.map_err(|e| IoError::FileError(format!("Failed to create chunk file: {}", e)))?;
for value in &sorted_chunk {
value.write_le(&mut file)?;
}
self.chunk_files.push(chunk_file);
}
Ok(())
}
pub fn merge<W: Write>(self, output: &mut W) -> Result<()> {
let readers: Vec<_> = self
.chunk_files
.iter()
.map(File::open)
.collect::<std::io::Result<_>>()
.map_err(|e| IoError::ParseError(format!("Failed to open chunk file: {}", e)))?;
use std::cmp::Reverse;
use std::collections::BinaryHeap;
let mut buffered_readers: Vec<_> = readers.into_iter().map(BufReader::new).collect();
let mut heap: BinaryHeap<Reverse<(T, usize)>> = BinaryHeap::new();
for (reader_id, reader) in buffered_readers.iter_mut().enumerate() {
if let Ok(value) = <T as ScientificNumberRead>::read_le(reader) {
heap.push(Reverse((value, reader_id)));
}
}
while let Some(Reverse((value, reader_id))) = heap.pop() {
value.write_le(output)?;
if let Ok(next_value) =
<T as ScientificNumberRead>::read_le(&mut buffered_readers[reader_id])
{
heap.push(Reverse((next_value, reader_id)));
}
}
for chunk_file in &self.chunk_files {
let _ = std::fs::remove_file(chunk_file);
}
Ok(())
}
}
pub struct VirtualArray<T> {
arrays: Vec<Box<dyn ArraySource<T>>>,
shape: Vec<usize>,
axis: usize,
}
pub trait ArraySource<T>: Send + Sync {
fn shape(&self) -> &[usize];
fn read_region(&self, start: &[usize], shape: &[usize]) -> Result<Array<T, IxDyn>>;
}
impl<T: Clone> VirtualArray<T> {
pub fn concatenate(arrays: Vec<Box<dyn ArraySource<T>>>, axis: usize) -> Result<Self> {
if arrays.is_empty() {
return Err(IoError::ParseError("No _arrays provided".to_string()));
}
let firstshape = arrays[0].shape();
for array in &arrays[1..] {
let shape = array.shape();
if shape.len() != firstshape.len() {
return Err(IoError::ParseError(
"Inconsistent array dimensions".to_string(),
));
}
for (i, (&a, &b)) in shape.iter().zip(firstshape).enumerate() {
if i != axis && a != b {
return Err(IoError::ParseError(format!(
"Inconsistent shape along axis {}: {} vs {}",
i, a, b
)));
}
}
}
let mut shape = firstshape.to_vec();
shape[axis] = arrays.iter().map(|a| a.shape()[axis]).sum();
Ok(Self {
arrays,
shape,
axis,
})
}
pub fn shape(&self) -> &[usize] {
&self.shape
}
pub fn read_region(&self, start: &[usize], shape: &[usize]) -> Result<Array<T, IxDyn>> {
let end_pos = start[self.axis] + shape[self.axis];
let mut current_pos = 0;
let mut result_parts = Vec::new();
for array in &self.arrays {
let array_size = array.shape()[self.axis];
let array_end = current_pos + array_size;
if current_pos < end_pos && array_end > start[self.axis] {
let local_start = start[self.axis].saturating_sub(current_pos);
let local_end = (end_pos - current_pos).min(array_size);
let mut local_region_start = start.to_vec();
local_region_start[self.axis] = local_start;
let mut local_regionshape = shape.to_vec();
local_regionshape[self.axis] = local_end - local_start;
let part = array.read_region(&local_region_start, &local_regionshape)?;
result_parts.push(part);
}
current_pos = array_end;
if current_pos >= end_pos {
break;
}
}
if result_parts.is_empty() {
return Err(IoError::ParseError(
"No data in requested region".to_string(),
));
}
Ok(result_parts.into_iter().next().expect("Operation failed"))
}
}
pub struct SlidingWindow<'a, T> {
array: &'a OutOfCoreArray<T>,
windowshape: Vec<usize>,
stride: Vec<usize>,
current_position: Vec<usize>,
}
impl<'a, T: ScientificNumber + Clone> SlidingWindow<'a, T> {
pub fn new(
array: &'a OutOfCoreArray<T>,
windowshape: Vec<usize>,
stride: Vec<usize>,
) -> Result<Self> {
if windowshape.len() != array.shape().len() || stride.len() != array.shape().len() {
return Err(IoError::ParseError("Dimension mismatch".to_string()));
}
Ok(Self {
array,
windowshape,
stride,
current_position: vec![0; array.shape().len()],
})
}
}
impl<T: ScientificNumber + Clone> Iterator for SlidingWindow<'_, T> {
type Item = Result<Array<T, IxDyn>>;
fn next(&mut self) -> Option<Self::Item> {
for (i, &pos) in self.current_position.iter().enumerate() {
if pos + self.windowshape[i] > self.array.shape()[i] {
return None;
}
}
let window = self
.array
.view_window(&self.current_position, &self.windowshape);
let mut carry = true;
for i in (0..self.current_position.len()).rev() {
if carry {
self.current_position[i] += self.stride[i];
if self.current_position[i] + self.windowshape[i] <= self.array.shape()[i] {
carry = false;
} else if i > 0 {
self.current_position[i] = 0;
}
}
}
Some(window)
}
}
trait ScientificNumberWrite {
fn write_le<W: Write>(&self, writer: &mut W) -> Result<()>;
}
trait ScientificNumberRead: Sized {
fn read_le<R: Read>(reader: &mut R) -> Result<Self>;
}
impl<T: ScientificNumber> ScientificNumberWrite for T {
fn write_le<W: Write>(&self, writer: &mut W) -> Result<()> {
let bytes = self.to_le_bytes();
writer
.write_all(&bytes)
.map_err(|e| IoError::FileError(format!("Failed to write numeric value: {}", e)))
}
}
impl<T: ScientificNumber> ScientificNumberRead for T {
fn read_le<R: Read>(reader: &mut R) -> Result<Self> {
let size = std::mem::size_of::<T>();
let mut bytes = vec![0u8; size];
reader
.read_exact(&mut bytes)
.map_err(|e| IoError::ParseError(format!("Failed to read numeric value: {}", e)))?;
Ok(T::from_le_bytes(&bytes))
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_out_of_core_array_creation() -> Result<()> {
let temp_dir = TempDir::new().expect("Operation failed");
let file_path = temp_dir.path().join("test_array.ooc");
let array = OutOfCoreArray::<f64>::create(&file_path, &[1000, 1000])?;
assert_eq!(array.shape(), &[1000, 1000]);
assert_eq!(array.len(), 1_000_000);
Ok(())
}
#[test]
fn test_chunk_calculation() {
let shape = vec![10000, 5000, 100];
let chunkshape = OutOfCoreArray::<f64>::calculate_chunkshape(&shape, 1_000_000);
let chunk_elements: usize = chunkshape.iter().product();
assert!(chunk_elements <= 1_000_000);
for (&dim, &chunk) in shape.iter().zip(&chunkshape) {
assert!(chunk <= dim);
assert!(chunk > 0);
}
}
#[test]
fn test_sliding_window() -> Result<()> {
let temp_dir = TempDir::new().expect("Operation failed");
let file_path = temp_dir.path().join("test_window.ooc");
let array = OutOfCoreArray::<f64>::create(&file_path, &[100, 100])?;
let window = SlidingWindow::new(&array, vec![10, 10], vec![5, 5])?;
let windows: Vec<_> = window.collect();
assert_eq!(windows.len(), 19 * 19);
Ok(())
}
}