use crate::memory_efficient::chunked::ChunkingStrategy;
use crate::memory_efficient::memmap::MemoryMappedArray;
use ::ndarray::Array1;
use std::fs::OpenOptions;
use std::io::{Seek, SeekFrom, Write};
#[cfg(feature = "parallel")]
use crate::parallel_ops::*;
pub trait MemoryMappedChunks<A: Clone + Copy + 'static + Send + Sync> {
fn chunk_count(&self, strategy: ChunkingStrategy) -> usize;
fn process_chunks<F, R>(&self, strategy: ChunkingStrategy, f: F) -> Vec<R>
where
F: Fn(&[A], usize) -> R;
fn process_chunks_mut<F>(&mut self, strategy: ChunkingStrategy, f: F)
where
F: Fn(&mut [A], usize);
}
#[cfg(feature = "parallel")]
pub trait MemoryMappedChunksParallel<A: Clone + Copy + 'static + Send + Sync + Send + Sync>:
MemoryMappedChunks<A>
{
fn process_chunks_parallel<F, R>(&self, strategy: ChunkingStrategy, f: F) -> Vec<R>
where
F: Fn(&[A], usize) -> R + Send + Sync,
R: Send;
fn process_chunks_mut_parallel<F>(&mut self, strategy: ChunkingStrategy, f: F)
where
F: Fn(&mut [A], usize) + Send + Sync;
}
pub struct ChunkIter<'a, A>
where
A: Clone + Copy + 'static + Send + Sync + Send + Sync,
{
array: &'a MemoryMappedArray<A>,
current_idx: usize,
total_chunks: usize,
strategy: ChunkingStrategy,
}
impl<A> Iterator for ChunkIter<'_, A>
where
A: Clone + Copy + 'static + Send + Sync,
{
type Item = Array1<A>;
fn next(&mut self) -> Option<Self::Item> {
if self.current_idx >= self.total_chunks {
None
} else {
let chunk_idx = self.current_idx;
self.current_idx += 1;
let chunk_size = match self.strategy {
ChunkingStrategy::Fixed(size) => size,
ChunkingStrategy::NumChunks(n) => self.array.size.div_ceil(n),
ChunkingStrategy::Auto => (self.array.size / 100).max(1),
ChunkingStrategy::FixedBytes(bytes) => {
let element_size = std::mem::size_of::<A>();
let elements_per_chunk = bytes / element_size;
elements_per_chunk.max(1)
}
ChunkingStrategy::Advanced(_) => {
(self.array.size / 100).max(1)
}
};
let start_idx = chunk_idx * chunk_size;
let end_idx = (start_idx + chunk_size).min(self.array.size);
if let Ok(array_1d) = self.array.as_array::<crate::ndarray::Ix1>() {
Some(array_1d.slice(crate::s![start_idx..end_idx]).to_owned())
} else {
None
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.total_chunks - self.current_idx;
(remaining, Some(remaining))
}
}
impl<A> ExactSizeIterator for ChunkIter<'_, A> where A: Clone + Copy + 'static + Send + Sync {}
pub trait MemoryMappedChunkIter<A: Clone + Copy + 'static + Send + Sync> {
fn chunks(&self, strategy: ChunkingStrategy) -> ChunkIter<A>;
}
impl<A: Clone + Copy + 'static + Send + Sync + Send + Sync> MemoryMappedChunks<A>
for MemoryMappedArray<A>
{
fn chunk_count(&self, strategy: ChunkingStrategy) -> usize {
match strategy {
ChunkingStrategy::Fixed(size) => {
self.size.div_ceil(size)
}
ChunkingStrategy::NumChunks(n) => {
n
}
ChunkingStrategy::Auto => {
let total_elements = self.size;
let optimal_chunk_size = (total_elements / 100).max(1);
total_elements.div_ceil(optimal_chunk_size)
}
ChunkingStrategy::FixedBytes(bytes) => {
let element_size = std::mem::size_of::<A>();
let elements_per_chunk = bytes / element_size;
let elements_per_chunk = elements_per_chunk.max(1); self.size.div_ceil(elements_per_chunk)
}
ChunkingStrategy::Advanced(_) => {
let total_elements = self.size;
let optimal_chunk_size = (total_elements / 100).max(1);
total_elements.div_ceil(optimal_chunk_size)
}
}
}
fn process_chunks<F, R>(&self, strategy: ChunkingStrategy, f: F) -> Vec<R>
where
F: Fn(&[A], usize) -> R,
{
let total_chunks = self.chunk_count(strategy);
let mut results = Vec::with_capacity(total_chunks);
for chunk_idx in 0..total_chunks {
let chunk_size = match strategy {
ChunkingStrategy::Fixed(size) => size,
ChunkingStrategy::NumChunks(n) => self.size.div_ceil(n),
ChunkingStrategy::Auto => {
let total_elements = self.size;
(total_elements / 100).max(1)
}
ChunkingStrategy::FixedBytes(bytes) => {
let element_size = std::mem::size_of::<A>();
let elements_per_chunk = bytes / element_size;
elements_per_chunk.max(1)
}
ChunkingStrategy::Advanced(_) => {
let total_elements = self.size;
(total_elements / 100).max(1)
}
};
let start_idx = chunk_idx * chunk_size;
let end_idx = (start_idx + chunk_size).min(self.size);
if let Ok(array_1d) = self.as_array::<crate::ndarray::Ix1>() {
let chunk_data = array_1d.slice(crate::s![start_idx..end_idx]).to_vec();
results.push(f(&chunk_data, chunk_idx));
}
}
results
}
fn process_chunks_mut<F>(&mut self, strategy: ChunkingStrategy, f: F)
where
F: Fn(&mut [A], usize),
{
let total_chunks = self.chunk_count(strategy);
let element_size = std::mem::size_of::<A>();
for chunk_idx in 0..total_chunks {
let chunk_size = match strategy {
ChunkingStrategy::Fixed(size) => size,
ChunkingStrategy::NumChunks(n) => self.size.div_ceil(n),
ChunkingStrategy::Auto => {
let total_elements = self.size;
(total_elements / 100).max(1)
}
ChunkingStrategy::FixedBytes(bytes) => {
let elements_per_chunk = bytes / element_size;
elements_per_chunk.max(1)
}
ChunkingStrategy::Advanced(_) => {
let total_elements = self.size;
(total_elements / 100).max(1)
}
};
let start_idx = chunk_idx * chunk_size;
let end_idx = (start_idx + chunk_size).min(self.size);
let mut chunk_data = Vec::with_capacity(end_idx - start_idx);
if let Ok(array_1d) = self.as_array::<crate::ndarray::Ix1>() {
chunk_data.extend_from_slice(
array_1d
.slice(crate::s![start_idx..end_idx])
.as_slice()
.expect("Operation failed"),
);
} else {
continue;
}
f(&mut chunk_data, chunk_idx);
let file_path = &self.file_path;
if let Ok(mut file) = OpenOptions::new().write(true).open(file_path) {
let effective_offset = self.offset + start_idx * element_size;
if file.seek(SeekFrom::Start(effective_offset as u64)).is_ok() {
let bytes = unsafe {
std::slice::from_raw_parts(
chunk_data.as_ptr() as *const u8,
chunk_data.len() * element_size,
)
};
let _ = file.write_all(bytes);
let _ = file.flush();
}
}
}
let _ = self.reload();
}
}
#[cfg(feature = "parallel")]
impl<A: Clone + Copy + 'static + Send + Sync + Send + Sync> MemoryMappedChunksParallel<A>
for MemoryMappedArray<A>
{
fn process_chunks_parallel<F, R>(&self, strategy: ChunkingStrategy, f: F) -> Vec<R>
where
F: Fn(&[A], usize) -> R + Send + Sync,
R: Send,
{
let total_chunks = self.chunk_count(strategy);
let chunks_info: Vec<_> = (0..total_chunks)
.map(|chunk_idx| {
let chunk_size = match strategy {
ChunkingStrategy::Fixed(size) => size,
ChunkingStrategy::NumChunks(n) => self.size.div_ceil(n),
ChunkingStrategy::Auto => {
let total_elements = self.size;
(total_elements / 100).max(1)
}
ChunkingStrategy::FixedBytes(bytes) => {
let element_size = std::mem::size_of::<A>();
let elements_per_chunk = bytes / element_size;
elements_per_chunk.max(1)
}
ChunkingStrategy::Advanced(_) => {
let total_elements = self.size;
(total_elements / 100).max(1)
}
};
let start_idx = chunk_idx * chunk_size;
let end_idx = (start_idx + chunk_size).min(self.size);
(chunk_idx, start_idx, end_idx)
})
.collect();
let array_1d = match self.as_array::<crate::ndarray::Ix1>() {
Ok(arr) => arr,
Err(_) => return Vec::new(),
};
let results: Vec<_> = chunks_info
.into_par_iter()
.map(|(chunk_idx, start_idx, end_idx)| {
let chunk_data = array_1d.slice(crate::s![start_idx..end_idx]).to_vec();
f(&chunk_data, chunk_idx)
})
.collect();
results
}
fn process_chunks_mut_parallel<F>(&mut self, strategy: ChunkingStrategy, f: F)
where
F: Fn(&mut [A], usize) + Send + Sync,
{
let total_chunks = self.chunk_count(strategy);
let element_size = std::mem::size_of::<A>();
let chunks_info: Vec<_> = (0..total_chunks)
.map(|chunk_idx| {
let chunk_size = match strategy {
ChunkingStrategy::Fixed(size) => size,
ChunkingStrategy::NumChunks(n) => self.size.div_ceil(n),
ChunkingStrategy::Auto => {
let total_elements = self.size;
(total_elements / 100).max(1)
}
ChunkingStrategy::FixedBytes(bytes) => {
let elements_per_chunk = bytes / element_size;
elements_per_chunk.max(1)
}
ChunkingStrategy::Advanced(_) => {
let total_elements = self.size;
(total_elements / 100).max(1)
}
};
let start_idx = chunk_idx * chunk_size;
let end_idx = (start_idx + chunk_size).min(self.size);
(chunk_idx, start_idx, end_idx)
})
.collect();
let file_path = self.file_path.clone();
let offset = self.offset;
let array_1d = match self.as_array::<crate::ndarray::Ix1>() {
Ok(arr) => arr,
Err(_) => return,
};
let modifications: Vec<_> = chunks_info
.into_par_iter()
.map(|(chunk_idx, start_idx, end_idx)| {
let mut chunk_data = array_1d.slice(crate::s![start_idx..end_idx]).to_vec();
f(&mut chunk_data, chunk_idx);
(chunk_idx, start_idx, chunk_data)
})
.collect();
for (_, start_idx, chunk_data) in modifications {
if let Ok(mut file) = OpenOptions::new().write(true).open(&file_path) {
let effective_offset = offset + start_idx * element_size;
if file.seek(SeekFrom::Start(effective_offset as u64)).is_ok() {
let bytes = unsafe {
std::slice::from_raw_parts(
chunk_data.as_ptr() as *const u8,
chunk_data.len() * element_size,
)
};
let _ = file.write_all(bytes);
let _ = file.flush();
}
}
}
let _ = self.reload();
}
}
impl<A: Clone + Copy + 'static + Send + Sync> MemoryMappedChunkIter<A> for MemoryMappedArray<A> {
fn chunks(&self, strategy: ChunkingStrategy) -> ChunkIter<A> {
ChunkIter {
array: self,
current_idx: 0,
total_chunks: self.chunk_count(strategy),
strategy,
}
}
}