use crate::error::{CoreError, CoreResult};
use crate::memory::metrics::{track_allocation, track_deallocation};
use ::ndarray::{Array, IxDyn};
use std::collections::{HashMap, VecDeque};
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Instant;
#[derive(Debug, thiserror::Error)]
pub enum OutOfCoreError {
#[error("I/O error: {0}")]
IoError(#[from] std::io::Error),
#[error("Chunk not found: {0}")]
ChunkNotFound(String),
#[error("Invalid chunk size: {0}")]
InvalidChunkSize(String),
#[error("Cache is full and no evictable chunks found")]
CacheFull,
#[error("Serialization error: {0}")]
SerializationError(String),
#[error("Index out of bounds: {index} >= {size}")]
IndexOutOfBounds { index: usize, size: usize },
}
impl From<OutOfCoreError> for CoreError {
fn from(err: OutOfCoreError) -> Self {
CoreError::ComputationError(crate::error::ErrorContext::new(err.to_string()))
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ChunkId {
pub array_id: String,
pub coordinates: Vec<usize>,
}
impl ChunkId {
pub fn new(arrayid: String, coordinates: Vec<usize>) -> Self {
Self {
array_id: arrayid,
coordinates,
}
}
}
impl std::fmt::Display for ChunkId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}:{}",
self.array_id,
self.coordinates
.iter()
.map(|c| c.to_string())
.collect::<Vec<_>>()
.join(",")
)
}
}
#[derive(Debug, Clone)]
pub struct ChunkMetadata {
pub id: ChunkId,
pub size_bytes: usize,
pub shape: Vec<usize>,
pub file_offset: u64,
pub last_accessed: Instant,
pub access_count: u64,
pub is_dirty: bool,
}
impl ChunkMetadata {
pub fn new(id: ChunkId, shape: Vec<usize>, fileoffset: u64) -> Self {
let size_bytes = shape.iter().product::<usize>() * std::mem::size_of::<f64>();
Self {
id,
size_bytes,
shape,
file_offset: fileoffset,
last_accessed: Instant::now(),
access_count: 0,
is_dirty: false,
}
}
pub fn new_with_element_size(
id: ChunkId,
shape: Vec<usize>,
file_offset: u64,
element_size: usize,
) -> Self {
let size_bytes = shape.iter().product::<usize>() * element_size;
Self {
id,
size_bytes,
shape,
file_offset,
last_accessed: Instant::now(),
access_count: 0,
is_dirty: false,
}
}
pub fn touch(&mut self) {
self.last_accessed = Instant::now();
self.access_count += 1;
}
pub fn mark_dirty(&mut self) {
self.is_dirty = true;
}
}
#[derive(Debug, Clone)]
pub enum CachePolicy {
Lru,
Lfu,
Fifo,
Mru,
}
#[derive(Debug, Clone)]
pub struct OutOfCoreConfig {
pub max_cache_memory: usize,
pub max_cached_chunks: usize,
pub chunkshape: Vec<usize>,
pub cache_policy: CachePolicy,
pub enable_prefetching: bool,
pub prefetch_count: usize,
pub enable_compression: bool,
pub io_buffersize: usize,
}
impl Default for OutOfCoreConfig {
fn default() -> Self {
Self {
max_cache_memory: 1024 * 1024 * 1024, max_cached_chunks: 100,
chunkshape: vec![1000, 1000], cache_policy: CachePolicy::Lru,
enable_prefetching: true,
prefetch_count: 4,
enable_compression: false,
io_buffersize: 64 * 1024, }
}
}
pub struct ChunkCache<T> {
chunks: RwLock<HashMap<ChunkId, Array<T, IxDyn>>>,
pub(crate) metadata: RwLock<HashMap<ChunkId, ChunkMetadata>>,
access_order: Mutex<VecDeque<ChunkId>>,
config: OutOfCoreConfig,
current_memory: Mutex<usize>,
}
impl<T> ChunkCache<T>
where
T: Clone + Default + 'static + Send + Sync,
{
pub fn new(config: OutOfCoreConfig) -> Self {
Self {
chunks: RwLock::new(HashMap::new()),
metadata: RwLock::new(HashMap::new()),
access_order: Mutex::new(VecDeque::new()),
config,
current_memory: Mutex::new(0),
}
}
pub fn get(&self, chunkid: &ChunkId) -> Option<Array<T, IxDyn>> {
let chunks = self.chunks.read().expect("Operation failed");
if let Some(chunk) = chunks.get(chunkid) {
self.update_access_stats(chunkid);
Some(chunk.clone())
} else {
None
}
}
pub(crate) fn put_with_writer<F>(
&self,
chunk_id: ChunkId,
chunk: Array<T, IxDyn>,
metadata: ChunkMetadata,
writer: F,
) -> CoreResult<()>
where
F: Fn(&ChunkId) -> CoreResult<()>,
{
self.ensure_cache_space_with_writer(&metadata, writer)?;
let chunk_size = chunk.len() * std::mem::size_of::<T>();
{
let mut chunks = self.chunks.write().expect("Operation failed");
let mut metadata_map = self.metadata.write().expect("Operation failed");
let mut access_order = self.access_order.lock().expect("Operation failed");
let mut current_memory = self.current_memory.lock().expect("Operation failed");
chunks.insert(chunk_id.clone(), chunk);
metadata_map.insert(chunk_id.clone(), metadata);
access_order.push_back(chunk_id.clone());
*current_memory += chunk_size;
}
track_allocation("OutOfCoreCache", chunk_size, 0);
Ok(())
}
pub fn put(
&self,
chunk_id: ChunkId,
chunk: Array<T, IxDyn>,
metadata: ChunkMetadata,
) -> CoreResult<()> {
self.put_with_writer(chunk_id, chunk, metadata, |_| Ok(()))
}
pub fn remove(&self, chunkid: &ChunkId) -> Option<Array<T, IxDyn>> {
let mut chunks = self.chunks.write().expect("Operation failed");
let mut metadata_map = self.metadata.write().expect("Operation failed");
let mut access_order = self.access_order.lock().expect("Operation failed");
let mut current_memory = self.current_memory.lock().expect("Operation failed");
if let Some(chunk) = chunks.remove(chunkid) {
let chunk_size = chunk.len() * std::mem::size_of::<T>();
metadata_map.remove(chunkid);
access_order.retain(|id| id != chunkid);
*current_memory = current_memory.saturating_sub(chunk_size);
track_deallocation("OutOfCoreCache", chunk_size, 0);
Some(chunk)
} else {
None
}
}
fn update_access_stats(&self, chunkid: &ChunkId) {
let mut metadata_map = self.metadata.write().expect("Operation failed");
if let Some(metadata) = metadata_map.get_mut(chunkid) {
metadata.touch();
}
let mut access_order = self.access_order.lock().expect("Operation failed");
access_order.retain(|id| id != chunkid);
access_order.push_back(chunkid.clone());
}
pub(crate) fn ensure_cache_space_with_writer<F>(
&self,
new_metadata: &ChunkMetadata,
writer: F,
) -> CoreResult<()>
where
F: Fn(&ChunkId) -> CoreResult<()>,
{
let current_memory = *self.current_memory.lock().expect("Operation failed");
let current_count = self.chunks.read().expect("Operation failed").len();
let needs_eviction = current_memory + new_metadata.size_bytes
> self.config.max_cache_memory
|| current_count >= self.config.max_cached_chunks;
if needs_eviction {
self.evict_chunks_with_writer(1, writer)?;
}
Ok(())
}
#[allow(dead_code)]
fn ensure_cache_space(&self, newmetadata: &ChunkMetadata) -> CoreResult<()> {
self.ensure_cache_space_with_writer(newmetadata, |_| Ok(()))
}
fn evict_chunks_with_writer<F>(&self, count: usize, writer: F) -> CoreResult<()>
where
F: Fn(&ChunkId) -> CoreResult<()>,
{
let chunks_to_evict = self.select_eviction_candidates(count)?;
for chunk_id in chunks_to_evict {
if let Some(metadata) = self
.metadata
.read()
.expect("Operation failed")
.get(&chunk_id)
{
if metadata.is_dirty {
writer(&chunk_id)?;
}
}
self.remove(&chunk_id);
}
Ok(())
}
#[allow(dead_code)]
fn evict_chunks(&self, count: usize) -> CoreResult<()> {
self.evict_chunks_with_writer(count, |_| Ok(()))
}
fn select_eviction_candidates(&self, count: usize) -> CoreResult<Vec<ChunkId>> {
let access_order = self.access_order.lock().expect("Operation failed");
let metadata_map = self.metadata.read().expect("Operation failed");
let candidates: Vec<ChunkId> = match self.config.cache_policy {
CachePolicy::Lru => {
access_order.iter().take(count).cloned().collect()
}
CachePolicy::Mru => {
access_order.iter().rev().take(count).cloned().collect()
}
CachePolicy::Fifo => {
access_order.iter().take(count).cloned().collect()
}
CachePolicy::Lfu => {
let mut candidates: Vec<_> = metadata_map
.iter()
.map(|(id, metadata)| (id.clone(), metadata.access_count))
.collect();
candidates.sort_by_key(|(_, count)| *count);
candidates
.into_iter()
.take(count)
.map(|(id, _)| id)
.collect()
}
};
if candidates.is_empty() {
Err(OutOfCoreError::CacheFull.into())
} else {
Ok(candidates)
}
}
pub fn get_statistics(&self) -> CacheStatistics {
let chunks = self.chunks.read().expect("Operation failed");
let metadata_map = self.metadata.read().expect("Operation failed");
let current_memory = *self.current_memory.lock().expect("Operation failed");
let dirty_count = metadata_map.values().filter(|m| m.is_dirty).count();
CacheStatistics {
cached_chunks: chunks.len(),
memory_usage: current_memory,
dirty_chunks: dirty_count,
hit_rate: 0.0, }
}
pub fn flush_dirty_chunks(&self) -> CoreResult<Vec<ChunkId>> {
let metadata_map = self.metadata.read().expect("Operation failed");
let dirty_chunks: Vec<ChunkId> = metadata_map
.iter()
.filter(|(_, metadata)| metadata.is_dirty)
.map(|(id, _)| id.clone())
.collect();
Ok(dirty_chunks)
}
pub fn mark_clean(&self, chunkid: &ChunkId) {
let mut metadata_map = self.metadata.write().expect("Operation failed");
if let Some(metadata) = metadata_map.get_mut(chunkid) {
metadata.is_dirty = false;
}
}
pub fn mark_dirty(&self, chunkid: &ChunkId) {
let mut metadata_map = self.metadata.write().expect("Operation failed");
if let Some(metadata) = metadata_map.get_mut(chunkid) {
metadata.mark_dirty();
}
}
}
#[derive(Debug, Clone)]
pub struct CacheStatistics {
pub cached_chunks: usize,
pub memory_usage: usize,
pub dirty_chunks: usize,
pub hit_rate: f64,
}
pub trait StorageBackend: Send + Sync {
fn read_chunk(&self, metadata: &ChunkMetadata) -> CoreResult<Vec<u8>>;
fn write_chunk(&self, metadata: &ChunkMetadata, data: &[u8]) -> CoreResult<()>;
fn allocate_chunk(&self, chunkid: &ChunkId, size: usize) -> CoreResult<ChunkMetadata>;
fn deallocate_chunk(&self, chunkid: &ChunkId) -> CoreResult<()>;
fn flush(&self) -> CoreResult<()>;
}
pub struct FileStorageBackend {
base_path: PathBuf,
file_handles: RwLock<HashMap<String, Arc<Mutex<File>>>>,
chunk_registry: RwLock<HashMap<ChunkId, ChunkMetadata>>,
}
impl FileStorageBackend {
pub fn new<P: AsRef<Path>>(basepath: P, path: P) -> CoreResult<Self> {
let base_path = basepath.as_ref().to_path_buf();
std::fs::create_dir_all(&base_path)?;
Ok(Self {
base_path,
file_handles: RwLock::new(HashMap::new()),
chunk_registry: RwLock::new(HashMap::new()),
})
}
fn get_file_handle(&self, arrayid: &str) -> CoreResult<Arc<Mutex<File>>> {
let mut handles = self.file_handles.write().expect("Operation failed");
if let Some(handle) = handles.get(arrayid) {
Ok(handle.clone())
} else {
let file_path = self.base_path.join(format!("{arrayid}.dat"));
let file = OpenOptions::new()
.create(true)
.truncate(true)
.read(true)
.write(true)
.open(file_path)?;
let handle = Arc::new(Mutex::new(file));
handles.insert(arrayid.to_string(), handle.clone());
Ok(handle)
}
}
}
impl StorageBackend for FileStorageBackend {
fn read_chunk(&self, metadata: &ChunkMetadata) -> CoreResult<Vec<u8>> {
let file_handle = self.get_file_handle(&metadata.id.array_id)?;
let mut file = file_handle.lock().expect("Operation failed");
file.seek(SeekFrom::Start(metadata.file_offset))?;
let mut buffer = vec![0u8; metadata.size_bytes];
file.read_exact(&mut buffer)?;
Ok(buffer)
}
fn write_chunk(&self, metadata: &ChunkMetadata, data: &[u8]) -> CoreResult<()> {
let file_handle = self.get_file_handle(&metadata.id.array_id)?;
let mut file = file_handle.lock().expect("Operation failed");
file.seek(SeekFrom::Start(metadata.file_offset))?;
file.write_all(data)?;
Ok(())
}
fn allocate_chunk(&self, chunkid: &ChunkId, size: usize) -> CoreResult<ChunkMetadata> {
let file_handle = self.get_file_handle(&chunkid.array_id)?;
let file = file_handle.lock().expect("Operation failed");
let file_offset = file.metadata()?.len();
let shape = vec![size / std::mem::size_of::<f64>()];
let metadata = ChunkMetadata::new(chunkid.clone(), shape, file_offset);
let mut registry = self.chunk_registry.write().expect("Operation failed");
registry.insert(chunkid.clone(), metadata.clone());
Ok(metadata)
}
fn deallocate_chunk(&self, chunkid: &ChunkId) -> CoreResult<()> {
let mut registry = self.chunk_registry.write().expect("Operation failed");
registry.remove(chunkid);
Ok(())
}
fn flush(&self) -> CoreResult<()> {
let handles = self.file_handles.read().expect("Operation failed");
for handle in handles.values() {
let mut file = handle.lock().expect("Operation failed");
file.flush()?;
}
Ok(())
}
}
pub struct OutOfCoreArray<T> {
array_id: String,
shape: Vec<usize>,
cache: Arc<ChunkCache<T>>,
storage: Arc<dyn StorageBackend>,
config: OutOfCoreConfig,
chunk_map: RwLock<HashMap<Vec<usize>, ChunkId>>,
}
impl<T> OutOfCoreArray<T>
where
T: Clone + Default + 'static + Send + Sync + serde::Serialize + serde::de::DeserializeOwned,
{
pub fn new(
array_id: String,
shape: Vec<usize>,
storage: Arc<dyn StorageBackend>,
config: OutOfCoreConfig,
) -> Self {
let cache = Arc::new(ChunkCache::new(config.clone()));
Self {
array_id,
shape,
cache,
storage,
config,
chunk_map: RwLock::new(HashMap::new()),
}
}
pub fn shape(&self) -> &[usize] {
&self.shape
}
pub fn len(&self) -> usize {
self.shape.iter().product()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[allow(dead_code)]
fn calculate_chunk_coords(&self, indices: &[usize]) -> CoreResult<Vec<usize>> {
if indices.len() != self.shape.len() {
return Err(OutOfCoreError::InvalidChunkSize(format!(
"Index dimensions {} don't match array dimensions {}",
indices.len(),
self.shape.len()
))
.into());
}
let chunk_coords: Vec<usize> = indices
.iter()
.zip(self.config.chunkshape.iter())
.map(|(&idx, &chunk_size)| idx / chunk_size)
.collect();
Ok(chunk_coords)
}
fn get_chunk(&self, chunkcoords: &[usize]) -> CoreResult<Array<T, IxDyn>> {
let chunk_id_opt = {
let chunk_map = self.chunk_map.read().expect("Operation failed");
chunk_map.get(chunkcoords).cloned()
};
if let Some(chunk_id) = chunk_id_opt {
if let Some(chunk) = self.cache.get(&chunk_id) {
return Ok(chunk);
}
self.load_chunk_from_storage(&chunk_id)
} else {
self.create_new_chunk(chunkcoords)
}
}
#[allow(dead_code)]
fn get_chunk_mut(&self, chunkcoords: &[usize]) -> CoreResult<Array<T, IxDyn>> {
let chunk = self.get_chunk(chunkcoords)?;
{
let chunk_map = self.chunk_map.read().expect("Operation failed");
if let Some(chunk_id) = chunk_map.get(chunkcoords) {
self.cache.mark_dirty(chunk_id);
}
}
Ok(chunk)
}
fn set_chunk(&self, chunkcoords: &[usize], data: Array<T, IxDyn>) -> CoreResult<()> {
let _ = self.get_chunk(chunkcoords)?;
let chunk_id = {
let chunk_map = self.chunk_map.read().expect("Operation failed");
chunk_map
.get(chunkcoords)
.ok_or_else(|| {
OutOfCoreError::ChunkNotFound(format!("Chunk not found: {chunkcoords:?}"))
})?
.clone()
};
let metadata = ChunkMetadata::new(
chunk_id.clone(),
data.shape().to_vec(),
0, );
let writer = |chunk_id: &ChunkId| self.write_chunk_to_storage(chunk_id);
self.cache
.put_with_writer(chunk_id.clone(), data, metadata, writer)?;
self.cache.mark_dirty(&chunk_id);
Ok(())
}
fn load_chunk_from_storage(&self, chunkid: &ChunkId) -> CoreResult<Array<T, IxDyn>> {
let metadata = ChunkMetadata::new(
chunkid.clone(),
self.config.chunkshape.clone(),
0, );
let data = self.storage.read_chunk(&metadata)?;
let chunk = self.deserialize_chunk_data(&data, &metadata.shape)?;
let writer = |chunk_id: &ChunkId| self.write_chunk_to_storage(chunk_id);
self.cache
.put_with_writer(chunkid.clone(), chunk.clone(), metadata, writer)?;
Ok(chunk)
}
fn create_new_chunk(&self, chunkcoords: &[usize]) -> CoreResult<Array<T, IxDyn>> {
let chunk_id = ChunkId::new(self.array_id.clone(), chunkcoords.to_vec());
let chunkshape = self.calculate_actual_chunkshape(chunkcoords);
let chunk = Array::<T, IxDyn>::default(IxDyn(&chunkshape));
let chunk_size = chunk.len() * std::mem::size_of::<T>();
let metadata = self.storage.allocate_chunk(&chunk_id, chunk_size)?;
let mut chunk_map = self.chunk_map.write().expect("Operation failed");
chunk_map.insert(chunkcoords.to_vec(), chunk_id.clone());
let writer = |chunk_id: &ChunkId| self.write_chunk_to_storage(chunk_id);
self.cache
.put_with_writer(chunk_id, chunk.clone(), metadata, writer)?;
Ok(chunk)
}
fn calculate_actual_chunkshape(&self, chunkcoords: &[usize]) -> Vec<usize> {
chunkcoords
.iter()
.zip(self.config.chunkshape.iter())
.zip(self.shape.iter())
.map(|((&coord, &chunk_size), &total_size)| {
let start = coord * chunk_size;
let end = ((coord + 1) * chunk_size).min(total_size);
end - start
})
.collect()
}
fn deserialize_chunk_data(&self, data: &[u8], shape: &[usize]) -> CoreResult<Array<T, IxDyn>> {
let cfg = oxicode::config::standard();
if data.is_empty() {
return Ok(Array::<T, IxDyn>::default(IxDyn(shape)));
}
match oxicode::serde::decode_owned_from_slice::<Vec<T>, _>(data, cfg) {
Ok((vec_data, _len)) => {
let total_elements: usize = shape.iter().product();
if vec_data.len() != total_elements {
return Err(OutOfCoreError::SerializationError(format!(
"Data length {} does not match expected shape {:?} (total: {})",
vec_data.len(),
shape,
total_elements
))
.into());
}
Array::from_shape_vec(IxDyn(shape), vec_data)
.map_err(|e| OutOfCoreError::SerializationError(e.to_string()).into())
}
Err(e) => Err(OutOfCoreError::SerializationError(e.to_string()).into()),
}
}
fn serialize_chunk_data(&self, chunk: &Array<T, IxDyn>) -> CoreResult<Vec<u8>> {
let vec_data: Vec<T> = chunk.iter().cloned().collect();
let cfg = oxicode::config::standard();
oxicode::serde::encode_to_vec(&vec_data, cfg)
.map_err(|e| OutOfCoreError::SerializationError(e.to_string()).into())
}
pub fn view_region(&self, ranges: &[(usize, usize)]) -> CoreResult<RegionView<T>> {
RegionView::new(self, ranges)
}
pub fn process_chunks<F>(&self, mut processor: F) -> CoreResult<()>
where
F: FnMut(&Array<T, IxDyn>, &[usize]) -> CoreResult<()>,
{
let total_chunks: Vec<usize> = self
.shape
.iter()
.zip(self.config.chunkshape.iter())
.map(|(&total, &chunk_size)| total.div_ceil(chunk_size))
.collect();
self.iterate_chunk_coords(
&total_chunks,
&mut processor,
&mut vec![0; total_chunks.len()],
0,
)
}
fn iterate_chunk_coords<F>(
&self,
total_chunks: &[usize],
processor: &mut F,
current_coords: &mut Vec<usize>,
dimension: usize,
) -> CoreResult<()>
where
F: FnMut(&Array<T, IxDyn>, &[usize]) -> CoreResult<()>,
{
if dimension == total_chunks.len() {
let chunk = self.get_chunk(current_coords)?;
processor(&chunk, current_coords)?;
} else {
for i in 0..total_chunks[dimension] {
current_coords[dimension] = i;
self.iterate_chunk_coords(total_chunks, processor, current_coords, dimension + 1)?;
}
}
Ok(())
}
pub fn flush(&self) -> CoreResult<()> {
let dirty_chunk_ids = self.cache.flush_dirty_chunks()?;
for chunk_id in dirty_chunk_ids {
self.write_chunk_to_storage(&chunk_id)?;
}
self.storage.flush()?;
Ok(())
}
fn write_chunk_to_storage(&self, chunkid: &ChunkId) -> CoreResult<()> {
if let Some(chunk) = self.cache.get(chunkid) {
let chunk_map = self.chunk_map.read().expect("Operation failed");
let _chunk_coords = chunk_map
.iter()
.find(|(_, id)| *id == chunkid)
.map(|(coords, _)| coords.clone())
.ok_or_else(|| OutOfCoreError::ChunkNotFound(chunkid.to_string()))?;
let metadata = ChunkMetadata::new(
chunkid.clone(),
chunk.shape().to_vec(),
0, );
let data = self.serialize_chunk_data(&chunk)?;
self.storage.write_chunk(&metadata, &data)?;
self.cache.mark_clean(chunkid);
}
Ok(())
}
pub fn get_statistics(&self) -> ArrayStatistics {
let cache_stats = self.cache.get_statistics();
let chunk_map = self.chunk_map.read().expect("Operation failed");
ArrayStatistics {
array_id: self.array_id.clone(),
total_elements: self.len(),
total_chunks: chunk_map.len(),
cache_stats,
}
}
}
pub struct RegionView<'a, T> {
array: &'a OutOfCoreArray<T>,
ranges: Vec<(usize, usize)>,
}
impl<'a, T> RegionView<'a, T>
where
T: Clone + Default + 'static + Send + Sync + serde::Serialize + serde::de::DeserializeOwned,
{
fn new(array: &'a OutOfCoreArray<T>, ranges: &[(usize, usize)]) -> CoreResult<Self> {
if ranges.len() != array.shape.len() {
return Err(OutOfCoreError::InvalidChunkSize(format!(
"Range dimensions {} don't match array dimensions {}",
ranges.len(),
array.shape.len()
))
.into());
}
Ok(Self {
array,
ranges: ranges.to_vec(),
})
}
pub fn shape(&self) -> Vec<usize> {
self.ranges.iter().map(|(start, end)| end - start).collect()
}
pub fn process_intersecting_chunks<F>(&self, mut processor: F) -> CoreResult<()>
where
F: FnMut(&Array<T, IxDyn>, &[usize], &[(usize, usize)]) -> CoreResult<()>,
{
let chunk_ranges: Vec<(usize, usize)> = self
.ranges
.iter()
.zip(self.array.config.chunkshape.iter())
.map(|((start, end), &chunk_size)| {
let chunk_start = start / chunk_size;
let chunk_end = (end - 1) / chunk_size + 1;
(chunk_start, chunk_end)
})
.collect();
self.iterate_region_chunks(
&chunk_ranges,
&mut processor,
&mut vec![0; chunk_ranges.len()],
0,
)
}
fn iterate_region_chunks<F>(
&self,
chunk_ranges: &[(usize, usize)],
processor: &mut F,
current_coords: &mut Vec<usize>,
dimension: usize,
) -> CoreResult<()>
where
F: FnMut(&Array<T, IxDyn>, &[usize], &[(usize, usize)]) -> CoreResult<()>,
{
if dimension == chunk_ranges.len() {
let chunk = self.array.get_chunk(current_coords)?;
let intersection = self.calculate_chunk_intersection(current_coords);
processor(&chunk, current_coords, &intersection)?;
} else {
let (start, end) = chunk_ranges[dimension];
for i in start..end {
current_coords[dimension] = i;
self.iterate_region_chunks(chunk_ranges, processor, current_coords, dimension + 1)?;
}
}
Ok(())
}
fn calculate_chunk_intersection(&self, chunkcoords: &[usize]) -> Vec<(usize, usize)> {
chunkcoords
.iter()
.zip(self.array.config.chunkshape.iter())
.zip(self.ranges.iter())
.map(|((&coord, &chunk_size), &(region_start, region_end))| {
let chunk_start = coord * chunk_size;
let chunk_end = chunk_start + chunk_size;
let intersect_start = chunk_start.max(region_start) - chunk_start;
let intersect_end = chunk_end.min(region_end) - chunk_start;
(intersect_start, intersect_end)
})
.collect()
}
}
#[derive(Debug, Clone)]
pub struct ArrayStatistics {
pub array_id: String,
pub total_elements: usize,
pub total_chunks: usize,
pub cache_stats: CacheStatistics,
}
pub struct OutOfCoreManager {
arrays: RwLock<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>,
storage_backends: RwLock<HashMap<String, Arc<dyn StorageBackend>>>,
config: OutOfCoreConfig,
}
impl OutOfCoreManager {
pub fn new(config: OutOfCoreConfig) -> Self {
Self {
arrays: RwLock::new(HashMap::new()),
storage_backends: RwLock::new(HashMap::new()),
config,
}
}
pub fn register_storage_backend(&self, name: String, backend: Arc<dyn StorageBackend>) {
let mut backends = self.storage_backends.write().expect("Operation failed");
backends.insert(name, backend);
}
pub fn create_array<T>(
&self,
array_id: String,
shape: Vec<usize>,
storage_name: Option<String>,
config: Option<OutOfCoreConfig>,
) -> CoreResult<Arc<OutOfCoreArray<T>>>
where
T: Clone + Default + 'static + Send + Sync + serde::Serialize + serde::de::DeserializeOwned,
{
let storage_backends = self.storage_backends.read().expect("Operation failed");
let storage = if let Some(name) = storage_name {
storage_backends
.get(&name)
.ok_or_else(|| OutOfCoreError::ChunkNotFound(format!("Storage backend: {name}")))?
.clone()
} else {
Arc::new(FileStorageBackend::new(
"./out_of_core_data",
"./out_of_core_data",
)?)
};
let config = config.unwrap_or_else(|| self.config.clone());
let array = Arc::new(OutOfCoreArray::new(
array_id.clone(),
shape,
storage,
config,
));
let mut arrays = self.arrays.write().expect("Operation failed");
arrays.insert(array_id, Box::new(array.clone()));
Ok(array)
}
pub fn get_array<T>(&self, arrayid: &str) -> Option<Arc<OutOfCoreArray<T>>>
where
T: Clone + Default + 'static + Send + Sync + serde::Serialize + serde::de::DeserializeOwned,
{
let arrays = self.arrays.read().expect("Operation failed");
arrays
.get(arrayid)
.and_then(|boxed| boxed.downcast_ref::<Arc<OutOfCoreArray<T>>>())
.cloned()
}
pub fn remove_array(&self, arrayid: &str) -> bool {
let mut arrays = self.arrays.write().expect("Operation failed");
arrays.remove(arrayid).is_some()
}
pub fn list_arrays(&self) -> Vec<String> {
let arrays = self.arrays.read().expect("Operation failed");
arrays.keys().cloned().collect()
}
pub fn get_overall_statistics(&self) -> ManagerStatistics {
let arrays = self.arrays.read().expect("Operation failed");
let total_arrays = arrays.len();
let mut total_memory_usage = 0;
let mut total_cached_chunks = 0;
for (_id, array_box) in arrays.iter() {
if let Some(array) = array_box.downcast_ref::<Arc<OutOfCoreArray<f64>>>() {
let stats = array.get_statistics();
total_memory_usage += stats.cache_stats.memory_usage;
total_cached_chunks += stats.cache_stats.cached_chunks;
}
else if let Some(array) = array_box.downcast_ref::<Arc<OutOfCoreArray<f32>>>() {
let stats = array.get_statistics();
total_memory_usage += stats.cache_stats.memory_usage;
total_cached_chunks += stats.cache_stats.cached_chunks;
}
else if let Some(array) = array_box.downcast_ref::<Arc<OutOfCoreArray<i32>>>() {
let stats = array.get_statistics();
total_memory_usage += stats.cache_stats.memory_usage;
total_cached_chunks += stats.cache_stats.cached_chunks;
}
else if let Some(array) = array_box.downcast_ref::<Arc<OutOfCoreArray<i64>>>() {
let stats = array.get_statistics();
total_memory_usage += stats.cache_stats.memory_usage;
total_cached_chunks += stats.cache_stats.cached_chunks;
}
else if let Some(array) = array_box.downcast_ref::<Arc<OutOfCoreArray<u8>>>() {
let stats = array.get_statistics();
total_memory_usage += stats.cache_stats.memory_usage;
total_cached_chunks += stats.cache_stats.cached_chunks;
}
}
ManagerStatistics {
total_arrays,
total_memory_usage,
total_cached_chunks,
}
}
}
impl Default for OutOfCoreManager {
fn default() -> Self {
Self::new(OutOfCoreConfig::default())
}
}
#[derive(Debug, Clone)]
pub struct ManagerStatistics {
pub total_arrays: usize,
pub total_memory_usage: usize,
pub total_cached_chunks: usize,
}
static GLOBAL_MANAGER: std::sync::OnceLock<Arc<OutOfCoreManager>> = std::sync::OnceLock::new();
#[allow(dead_code)]
pub fn global_manager() -> Arc<OutOfCoreManager> {
GLOBAL_MANAGER
.get_or_init(|| Arc::new(OutOfCoreManager::default()))
.clone()
}
pub mod utils {
use super::*;
pub fn create_simple_array<T>(
array_id: String,
shape: Vec<usize>,
) -> CoreResult<Arc<OutOfCoreArray<T>>>
where
T: Clone + Default + 'static + Send + Sync + serde::Serialize + serde::de::DeserializeOwned,
{
let manager = global_manager();
manager.create_array(array_id, shape, None, None)
}
pub fn process_large_dataset<T, F>(
data_path: &Path,
shape: Vec<usize>,
chunk_processor: F,
) -> CoreResult<()>
where
T: Clone + Default + 'static + Send + Sync + serde::Serialize + serde::de::DeserializeOwned,
F: FnMut(&Array<T, IxDyn>, &[usize]) -> CoreResult<()>,
{
let storage = Arc::new(FileStorageBackend::new(
data_path.parent().expect("Operation failed"),
data_path,
)?);
let config = OutOfCoreConfig::default();
let array = OutOfCoreArray::new(
data_path
.file_stem()
.expect("Operation failed")
.to_string_lossy()
.to_string(),
shape,
storage,
config,
);
array.process_chunks(chunk_processor)?;
Ok(())
}
fn copy_chunks_recursive<T>(
sourcearray: &Array<T, IxDyn>,
targetarray: &OutOfCoreArray<T>,
chunks_per_dim: &[usize],
chunkcoords: &mut Vec<usize>,
dimension: usize,
) -> CoreResult<()>
where
T: Clone + Default + 'static + Send + Sync + serde::Serialize + serde::de::DeserializeOwned,
{
if dimension == chunks_per_dim.len() {
let chunkshape = &targetarray.config.chunkshape;
let mut slices = vec![];
for (i, (&coord, &chunk_size)) in chunkcoords.iter().zip(chunkshape.iter()).enumerate()
{
let start = coord * chunk_size;
let end = ((coord + 1) * chunk_size).min(sourcearray.shape()[i]);
slices.push(start..end);
}
let chunk_data = extract_chunk_data(sourcearray, &slices)?;
targetarray.set_chunk(chunkcoords, chunk_data)?;
Ok(())
} else {
for i in 0..chunks_per_dim[dimension] {
chunkcoords[dimension] = i;
copy_chunks_recursive(
sourcearray,
targetarray,
chunks_per_dim,
chunkcoords,
dimension + 1,
)?;
}
Ok(())
}
}
fn extract_chunk_data<T>(
array: &Array<T, IxDyn>,
slices: &[std::ops::Range<usize>],
) -> CoreResult<Array<T, IxDyn>>
where
T: Clone,
{
use ndarray::{SliceInfo, SliceInfoElem};
let slice_info: Vec<SliceInfoElem> = slices
.iter()
.map(|range| SliceInfoElem::Slice {
start: range.start as isize,
end: Some(range.end as isize),
step: 1,
})
.collect();
let slice_info = SliceInfo::<Vec<SliceInfoElem>, IxDyn, IxDyn>::try_from(slice_info)
.map_err(|e| OutOfCoreError::InvalidChunkSize(e.to_string()))?;
Ok(array.slice(slice_info).to_owned())
}
pub fn convert_to_out_of_core<T>(
array: &Array<T, IxDyn>,
array_id: String,
chunkshape: Vec<usize>,
) -> CoreResult<Arc<OutOfCoreArray<T>>>
where
T: Clone + Default + 'static + Send + Sync + serde::Serialize + serde::de::DeserializeOwned,
{
let config = OutOfCoreConfig {
chunkshape,
..Default::default()
};
let manager = global_manager();
let out_of_core_array =
manager.create_array(array_id, array.shape().to_vec(), None, Some(config))?;
let chunkshape = &out_of_core_array.config.chunkshape;
let arrayshape = array.shape();
let chunks_per_dim: Vec<usize> = arrayshape
.iter()
.zip(chunkshape.iter())
.map(|(&total, &chunk)| total.div_ceil(chunk))
.collect();
let mut chunk_coords = vec![0; chunks_per_dim.len()];
copy_chunks_recursive(
array,
&out_of_core_array,
&chunks_per_dim,
&mut chunk_coords,
0,
)?;
out_of_core_array.flush()?;
Ok(out_of_core_array)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_chunk_id_creation() {
let chunk_id = ChunkId::new("test_array".to_string(), vec![0, 1, 2]);
assert_eq!(chunk_id.array_id, "test_array");
assert_eq!(chunk_id.coordinates, vec![0, 1, 2]);
assert_eq!(format!("{chunk_id}"), "test_array:0,1,2");
}
#[test]
fn test_chunk_metadata() {
let chunk_id = ChunkId::new("test".to_string(), vec![0, 0]);
let mut metadata = ChunkMetadata::new(chunk_id, vec![100, 100], 0);
let initial_access_count = metadata.access_count;
metadata.touch();
assert!(metadata.access_count > initial_access_count);
assert!(!metadata.is_dirty);
metadata.mark_dirty();
assert!(metadata.is_dirty);
}
#[test]
fn test_out_of_core_config() {
let config = OutOfCoreConfig::default();
assert_eq!(config.max_cache_memory, 1024 * 1024 * 1024);
assert_eq!(config.max_cached_chunks, 100);
assert_eq!(config.chunkshape, vec![1000, 1000]);
assert!(matches!(config.cache_policy, CachePolicy::Lru));
}
#[test]
fn test_cache_policy_variants() {
let policies = [
CachePolicy::Lru,
CachePolicy::Lfu,
CachePolicy::Fifo,
CachePolicy::Mru,
];
for policy in &policies {
let cloned = policy.clone();
match (policy, &cloned) {
(CachePolicy::Lru, CachePolicy::Lru) => {}
(CachePolicy::Lfu, CachePolicy::Lfu) => {}
(CachePolicy::Fifo, CachePolicy::Fifo) => {}
(CachePolicy::Mru, CachePolicy::Mru) => {}
_ => panic!("Policy clone mismatch"),
}
}
}
#[test]
fn test_file_storage_backend() -> CoreResult<()> {
let temp_dir = TempDir::new().expect("Operation failed");
let storage = FileStorageBackend::new(temp_dir.path(), temp_dir.path())?;
let chunk_id = ChunkId::new("test_array".to_string(), vec![0, 0]);
let metadata = storage.allocate_chunk(&chunk_id, 1024)?;
assert_eq!(metadata.id, chunk_id);
assert_eq!(metadata.size_bytes, 1024);
let test_data = vec![1u8, 2, 3, 4, 5];
storage.write_chunk(&metadata, &test_data)?;
storage.flush()?;
storage.deallocate_chunk(&chunk_id)?;
Ok(())
}
#[test]
fn test_chunk_cache() {
let config = OutOfCoreConfig {
max_cached_chunks: 3, max_cache_memory: 2048, ..Default::default()
};
let cache = ChunkCache::<f64>::new(config);
let chunk_id1 = ChunkId::new("test".to_string(), vec![0, 0]);
let chunk_id2 = ChunkId::new("test".to_string(), vec![0, 1]);
let chunk1 = Array::<f64, IxDyn>::zeros(IxDyn(&[10, 10]));
let chunk2 = Array::<f64, IxDyn>::zeros(IxDyn(&[10, 10]));
let metadata1 = ChunkMetadata::new(chunk_id1.clone(), vec![10, 10], 0);
let metadata2 = ChunkMetadata::new(chunk_id2.clone(), vec![10, 10], 100);
assert!(cache.put(chunk_id1.clone(), chunk1, metadata1).is_ok());
assert!(cache.put(chunk_id2.clone(), chunk2, metadata2).is_ok());
assert!(cache.get(&chunk_id1).is_some());
assert!(cache.get(&chunk_id2).is_some());
let stats = cache.get_statistics();
assert_eq!(stats.cached_chunks, 2);
}
#[test]
fn test_chunk_cache_eviction() {
let config = OutOfCoreConfig {
max_cached_chunks: 2, max_cache_memory: 2048, cache_policy: CachePolicy::Lru,
..Default::default()
};
let cache = ChunkCache::<f64>::new(config);
let chunk_id1 = ChunkId::new("test".to_string(), vec![0, 0]);
let chunk_id2 = ChunkId::new("test".to_string(), vec![0, 1]);
let chunk_id3 = ChunkId::new("test".to_string(), vec![0, 2]);
let chunk1 = Array::<f64, IxDyn>::zeros(IxDyn(&[10, 10]));
let chunk2 = Array::<f64, IxDyn>::zeros(IxDyn(&[10, 10]));
let chunk3 = Array::<f64, IxDyn>::zeros(IxDyn(&[10, 10]));
let metadata1 = ChunkMetadata::new(chunk_id1.clone(), vec![10, 10], 0);
let metadata2 = ChunkMetadata::new(chunk_id2.clone(), vec![10, 10], 100);
let metadata3 = ChunkMetadata::new(chunk_id3.clone(), vec![10, 10], 200);
assert!(cache.put(chunk_id1.clone(), chunk1, metadata1).is_ok());
assert!(cache.put(chunk_id2.clone(), chunk2, metadata2).is_ok());
assert!(cache.get(&chunk_id1).is_some());
assert!(cache.get(&chunk_id2).is_some());
let stats = cache.get_statistics();
assert_eq!(stats.cached_chunks, 2);
assert!(cache.put(chunk_id3.clone(), chunk3, metadata3).is_ok());
assert!(cache.get(&chunk_id1).is_none());
assert!(cache.get(&chunk_id2).is_some());
assert!(cache.get(&chunk_id3).is_some());
let stats = cache.get_statistics();
assert_eq!(stats.cached_chunks, 2);
}
#[test]
fn test_dirty_chunk_tracking() -> CoreResult<()> {
let temp_dir = TempDir::new()?;
let storage = Arc::new(FileStorageBackend::new(temp_dir.path(), temp_dir.path())?);
let config = OutOfCoreConfig {
chunkshape: vec![100, 100],
max_cached_chunks: 2,
..Default::default()
};
let array =
OutOfCoreArray::<f64>::new("test_dirty".to_string(), vec![200, 200], storage, config);
let chunk_coords = vec![0, 0];
let chunk_data = Array::<f64, IxDyn>::ones(IxDyn(&[100, 100]));
array.set_chunk(&chunk_coords, chunk_data)?;
let cache_stats = array.cache.get_statistics();
assert_eq!(cache_stats.dirty_chunks, 1);
array.flush()?;
let cache_stats = array.cache.get_statistics();
assert_eq!(cache_stats.dirty_chunks, 0);
Ok(())
}
#[test]
fn test_out_of_core_manager() -> CoreResult<()> {
let manager = OutOfCoreManager::default();
let array_id = "test_array".to_string();
let shape = vec![1000, 1000];
let array: Arc<OutOfCoreArray<f64>> =
manager.create_array(array_id.clone(), shape.clone(), None, None)?;
assert_eq!(array.shape(), &shape);
assert_eq!(array.len(), 1_000_000);
let retrieved: Option<Arc<OutOfCoreArray<f64>>> = manager.get_array(&array_id);
assert!(retrieved.is_some());
let array_list = manager.list_arrays();
assert!(array_list.contains(&array_id));
assert!(manager.remove_array(&array_id));
assert!(!manager.list_arrays().contains(&array_id));
Ok(())
}
#[test]
fn test_global_manager() {
let manager = global_manager();
let manager2 = global_manager();
assert!(Arc::ptr_eq(&manager, &manager2));
let stats = manager.get_overall_statistics();
assert_eq!(stats.total_arrays, 0);
}
}