#![allow(dead_code)]
#![allow(missing_docs)]
use crate::error::{IoError, Result};
use memmap2::{Mmap, MmapMut, MmapOptions};
use scirs2_core::ndarray::{Array1, ArrayView, ArrayView1, ArrayViewMut, IxDyn};
use scirs2_core::parallel_ops::*;
use scirs2_core::simd_ops::{PlatformCapabilities, SimdUnifiedOps};
use std::fs::{File, OpenOptions};
use std::marker::PhantomData;
use std::mem;
use std::path::Path;
use std::slice;
#[cfg(feature = "async")]
use tokio::sync::Semaphore;
pub struct ZeroCopyArrayView<'a, T> {
mmap: &'a Mmap,
shape: Vec<usize>,
_phantom: PhantomData<T>,
}
impl<'a, T> ZeroCopyArrayView<'a, T>
where
T: 'static + Copy,
{
pub fn apply_simd_operation<F>(&self, op: F) -> Result<Vec<T>>
where
F: Fn(&[T]) -> Vec<T>,
{
let slice = self.as_slice();
Ok(op(slice))
}
pub fn new(mmap: &'a Mmap, shape: Vec<usize>) -> Result<Self> {
let expected_bytes = shape.iter().product::<usize>() * mem::size_of::<T>();
if mmap.len() < expected_bytes {
return Err(IoError::FormatError(format!(
"Memory map too small: expected {} bytes, got {}",
expected_bytes,
mmap.len()
)));
}
Ok(Self {
mmap,
shape,
_phantom: PhantomData,
})
}
pub fn as_array_view(&self) -> ArrayView<T, IxDyn> {
let ptr = self.mmap.as_ptr() as *const T;
let slice = unsafe { slice::from_raw_parts(ptr, self.shape.iter().product()) };
ArrayView::from_shape(IxDyn(&self.shape), slice).expect("Shape mismatch in zero-copy view")
}
pub fn as_slice(&self) -> &[T] {
let ptr = self.mmap.as_ptr() as *const T;
let len = self.shape.iter().product();
unsafe { slice::from_raw_parts(ptr, len) }
}
}
pub struct ZeroCopyArrayViewMut<'a, T> {
mmap: &'a mut MmapMut,
shape: Vec<usize>,
_phantom: PhantomData<T>,
}
impl<'a, T> ZeroCopyArrayViewMut<'a, T>
where
T: 'static + Copy,
{
pub fn apply_simd_operation_inplace<F>(&mut self, op: F) -> Result<()>
where
F: Fn(&mut [T]),
{
let slice = self.as_slice_mut();
op(slice);
Ok(())
}
pub fn new(mmap: &'a mut MmapMut, shape: Vec<usize>) -> Result<Self> {
let expected_bytes = shape.iter().product::<usize>() * mem::size_of::<T>();
if mmap.len() < expected_bytes {
return Err(IoError::FormatError(format!(
"Memory map too small: expected {} bytes, got {}",
expected_bytes,
mmap.len()
)));
}
Ok(Self {
mmap,
shape,
_phantom: PhantomData,
})
}
pub fn as_array_view_mut(&mut self) -> ArrayViewMut<T, IxDyn> {
let ptr = self.mmap.as_mut_ptr() as *mut T;
let slice = unsafe { slice::from_raw_parts_mut(ptr, self.shape.iter().product()) };
ArrayViewMut::from_shape(IxDyn(&self.shape), slice)
.expect("Shape mismatch in zero-copy view")
}
pub fn as_slice_mut(&mut self) -> &mut [T] {
let ptr = self.mmap.as_mut_ptr() as *mut T;
let len = self.shape.iter().product();
unsafe { slice::from_raw_parts_mut(ptr, len) }
}
}
pub struct ZeroCopyReader {
file: File,
mmap: Option<Mmap>,
}
impl ZeroCopyReader {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
let file = File::open(path).map_err(|e| IoError::FileError(e.to_string()))?;
Ok(Self { file, mmap: None })
}
pub fn map_file(&mut self) -> Result<&Mmap> {
if self.mmap.is_none() {
let mmap = unsafe {
MmapOptions::new()
.map(&self.file)
.map_err(|e| IoError::FileError(e.to_string()))?
};
self.mmap = Some(mmap);
}
Ok(self.mmap.as_ref().expect("Operation failed"))
}
pub fn read_array<T>(&mut self, shape: Vec<usize>) -> Result<ZeroCopyArrayView<T>>
where
T: 'static + Copy,
{
let mmap = self.map_file()?;
ZeroCopyArrayView::new(mmap, shape)
}
pub fn read_slice(&mut self, offset: usize, len: usize) -> Result<&[u8]> {
let mmap = self.map_file()?;
if offset + len > mmap.len() {
return Err(IoError::Other(
"Slice extends beyond file boundaries".to_string(),
));
}
Ok(&mmap[offset..offset + len])
}
}
pub struct ZeroCopyWriter {
file: File,
mmap: Option<MmapMut>,
}
impl ZeroCopyWriter {
pub fn new<P: AsRef<Path>>(path: P, size: usize) -> Result<Self> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(path)
.map_err(|e| IoError::FileError(e.to_string()))?;
file.set_len(size as u64)
.map_err(|e| IoError::FileError(e.to_string()))?;
Ok(Self { file, mmap: None })
}
pub fn map_file_mut(&mut self) -> Result<&mut MmapMut> {
if self.mmap.is_none() {
let mmap = unsafe {
MmapOptions::new()
.map_mut(&self.file)
.map_err(|e| IoError::FileError(e.to_string()))?
};
self.mmap = Some(mmap);
}
Ok(self.mmap.as_mut().expect("Operation failed"))
}
pub fn write_array<T>(&mut self, shape: Vec<usize>) -> Result<ZeroCopyArrayViewMut<T>>
where
T: 'static + Copy,
{
let mmap = self.map_file_mut()?;
ZeroCopyArrayViewMut::new(mmap, shape)
}
pub fn write_slice(&mut self, offset: usize, data: &[u8]) -> Result<()> {
let mmap = self.map_file_mut()?;
if offset + data.len() > mmap.len() {
return Err(IoError::Other(
"Write extends beyond file boundaries".to_string(),
));
}
mmap[offset..offset + data.len()].copy_from_slice(data);
Ok(())
}
pub fn flush(&mut self) -> Result<()> {
if let Some(ref mut mmap) = self.mmap {
mmap.flush()
.map_err(|e| IoError::FileError(e.to_string()))?;
}
Ok(())
}
}
pub struct ZeroCopyCsvReader<'a> {
data: &'a [u8],
delimiter: u8,
}
impl<'a> ZeroCopyCsvReader<'a> {
pub fn new(data: &'a [u8], delimiter: u8) -> Self {
Self { data, delimiter }
}
pub fn lines(&self) -> ZeroCopyLineIterator<'a> {
ZeroCopyLineIterator {
data: self.data,
pos: 0,
}
}
pub fn parse_line(&self, line: &'a [u8]) -> Vec<&'a str> {
let mut fields = Vec::new();
let mut start = 0;
for (i, &byte) in line.iter().enumerate() {
if byte == self.delimiter {
if let Ok(field) = std::str::from_utf8(&line[start..i]) {
fields.push(field);
}
start = i + 1;
}
}
if start < line.len() {
if let Ok(field) = std::str::from_utf8(&line[start..]) {
fields.push(field);
}
}
fields
}
}
pub struct ZeroCopyLineIterator<'a> {
data: &'a [u8],
pos: usize,
}
impl<'a> Iterator for ZeroCopyLineIterator<'a> {
type Item = &'a [u8];
fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.data.len() {
return None;
}
let start = self.pos;
while self.pos < self.data.len() && self.data[self.pos] != b'\n' {
self.pos += 1;
}
let line = &self.data[start..self.pos];
if self.pos < self.data.len() {
self.pos += 1;
}
Some(line)
}
}
pub struct ZeroCopyBinaryReader<'a> {
data: &'a [u8],
pos: usize,
}
impl<'a> ZeroCopyBinaryReader<'a> {
pub fn new(data: &'a [u8]) -> Self {
Self { data, pos: 0 }
}
pub fn read<T: Copy>(&mut self) -> Result<T> {
let size = mem::size_of::<T>();
if self.pos + size > self.data.len() {
return Err(IoError::Other("Not enough data".to_string()));
}
let value = unsafe {
let ptr = self.data.as_ptr().add(self.pos) as *const T;
ptr.read_unaligned()
};
self.pos += size;
Ok(value)
}
pub fn read_slice(&mut self, len: usize) -> Result<&'a [u8]> {
if self.pos + len > self.data.len() {
return Err(IoError::Other("Not enough data".to_string()));
}
let slice = &self.data[self.pos..self.pos + len];
self.pos += len;
Ok(slice)
}
pub fn remaining(&self) -> &'a [u8] {
&self.data[self.pos..]
}
pub fn read_f32_array_simd(&mut self, count: usize) -> Result<Array1<f32>> {
let bytes_needed = count * mem::size_of::<f32>();
if self.pos + bytes_needed > self.data.len() {
return Err(IoError::Other("Not enough data for f32 array".to_string()));
}
let slice =
unsafe { slice::from_raw_parts(self.data.as_ptr().add(self.pos) as *const f32, count) };
self.pos += bytes_needed;
Ok(Array1::from_vec(slice.to_vec()))
}
pub fn read_f64_array_simd(&mut self, count: usize) -> Result<Array1<f64>> {
let bytes_needed = count * mem::size_of::<f64>();
if self.pos + bytes_needed > self.data.len() {
return Err(IoError::Other("Not enough data for f64 array".to_string()));
}
let slice =
unsafe { slice::from_raw_parts(self.data.as_ptr().add(self.pos) as *const f64, count) };
self.pos += bytes_needed;
Ok(Array1::from_vec(slice.to_vec()))
}
}
pub mod simd_zero_copy {
use super::*;
use scirs2_core::ndarray::{Array2, ArrayView2};
pub struct SimdZeroCopyOpsF32;
impl SimdZeroCopyOpsF32 {
pub fn add_mmap(a_mmap: &Mmap, b_mmap: &Mmap, shape: &[usize]) -> Result<Array1<f32>> {
if a_mmap.len() != b_mmap.len() {
return Err(IoError::Other(
"Memory maps must have same size".to_string(),
));
}
let count = shape.iter().product::<usize>();
let expected_bytes = count * mem::size_of::<f32>();
if a_mmap.len() < expected_bytes {
return Err(IoError::Other("Memory map too small for shape".to_string()));
}
let a_slice = unsafe { slice::from_raw_parts(a_mmap.as_ptr() as *const f32, count) };
let b_slice = unsafe { slice::from_raw_parts(b_mmap.as_ptr() as *const f32, count) };
let a_view = ArrayView1::from_shape(count, a_slice).expect("Operation failed");
let b_view = ArrayView1::from_shape(count, b_slice).expect("Operation failed");
let result: Array1<f32> = a_view
.iter()
.zip(b_view.iter())
.map(|(&a, &b)| a + b)
.collect();
Ok(result)
}
pub fn scalar_mul_mmap(mmap: &Mmap, scalar: f32, shape: &[usize]) -> Result<Array1<f32>> {
let count = shape.iter().product::<usize>();
let expected_bytes = count * mem::size_of::<f32>();
if mmap.len() < expected_bytes {
return Err(IoError::Other("Memory map too small for shape".to_string()));
}
let slice = unsafe { slice::from_raw_parts(mmap.as_ptr() as *const f32, count) };
let view = ArrayView1::from_shape(count, slice).expect("Operation failed");
let result: Array1<f32> = view.iter().map(|&x| x * scalar).collect();
Ok(result)
}
pub fn dot_mmap(a_mmap: &Mmap, b_mmap: &Mmap, len: usize) -> Result<f32> {
let expected_bytes = len * mem::size_of::<f32>();
if a_mmap.len() < expected_bytes || b_mmap.len() < expected_bytes {
return Err(IoError::Other("Memory maps too small".to_string()));
}
let a_slice = unsafe { slice::from_raw_parts(a_mmap.as_ptr() as *const f32, len) };
let b_slice = unsafe { slice::from_raw_parts(b_mmap.as_ptr() as *const f32, len) };
let a_view = ArrayView1::from_shape(len, a_slice).expect("Operation failed");
let b_view = ArrayView1::from_shape(len, b_slice).expect("Operation failed");
let result: f32 = a_view.iter().zip(b_view.iter()).map(|(&a, &b)| a * b).sum();
Ok(result)
}
}
pub struct SimdZeroCopyOpsF64;
impl SimdZeroCopyOpsF64 {
pub fn add_mmap(a_mmap: &Mmap, b_mmap: &Mmap, shape: &[usize]) -> Result<Array1<f64>> {
if a_mmap.len() != b_mmap.len() {
return Err(IoError::Other(
"Memory maps must have same size".to_string(),
));
}
let count = shape.iter().product::<usize>();
let expected_bytes = count * mem::size_of::<f64>();
if a_mmap.len() < expected_bytes {
return Err(IoError::Other("Memory map too small for shape".to_string()));
}
let a_slice = unsafe { slice::from_raw_parts(a_mmap.as_ptr() as *const f64, count) };
let b_slice = unsafe { slice::from_raw_parts(b_mmap.as_ptr() as *const f64, count) };
let a_view = ArrayView1::from_shape(count, a_slice).expect("Operation failed");
let b_view = ArrayView1::from_shape(count, b_slice).expect("Operation failed");
let result: Array1<f64> = a_view
.iter()
.zip(b_view.iter())
.map(|(&a, &b)| a + b)
.collect();
Ok(result)
}
pub fn gemm_mmap(
a_mmap: &Mmap,
b_mmap: &Mmap,
ashape: (usize, usize),
bshape: (usize, usize),
alpha: f64,
beta: f64,
) -> Result<Array2<f64>> {
let (m, k1) = ashape;
let (k2, n) = bshape;
if k1 != k2 {
return Err(IoError::Other(
"Matrix dimensions don't match for multiplication".to_string(),
));
}
let a_expected = m * k1 * mem::size_of::<f64>();
let b_expected = k2 * n * mem::size_of::<f64>();
if a_mmap.len() < a_expected || b_mmap.len() < b_expected {
return Err(IoError::Other(
"Memory maps too small for matrices".to_string(),
));
}
let a_slice = unsafe { slice::from_raw_parts(a_mmap.as_ptr() as *const f64, m * k1) };
let b_slice = unsafe { slice::from_raw_parts(b_mmap.as_ptr() as *const f64, k2 * n) };
let a_view = ArrayView2::from_shape((m, k1), a_slice).expect("Operation failed");
let b_view = ArrayView2::from_shape((k2, n), b_slice).expect("Operation failed");
let mut c = Array2::<f64>::zeros((m, n));
f64::simd_gemm(alpha, &a_view, &b_view, beta, &mut c);
Ok(c)
}
}
}
pub struct AsyncZeroCopyProcessor<T> {
reader: ZeroCopyReader,
chunk_size: usize,
numa_node: Option<usize>,
memory_policy: NumaMemoryPolicy,
async_config: AsyncConfig,
_phantom: PhantomData<T>,
}
#[derive(Debug, Clone, Copy)]
pub enum NumaMemoryPolicy {
Local,
Bind(usize),
Interleave,
Default,
}
#[derive(Debug, Clone)]
pub struct AsyncConfig {
pub max_concurrent_operations: usize,
pub prefetch_distance: usize,
pub enable_readahead: bool,
pub readahead_size: usize,
pub use_io_uring: bool,
pub memory_advice: MemoryAdvice,
}
impl Default for AsyncConfig {
fn default() -> Self {
Self {
max_concurrent_operations: 4,
prefetch_distance: 8,
enable_readahead: true,
readahead_size: 64 * 1024, use_io_uring: cfg!(target_os = "linux"),
memory_advice: MemoryAdvice::Sequential,
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum MemoryAdvice {
Normal,
Sequential,
Random,
WillNeed,
DontNeed,
}
impl<T: Copy + Send + Sync + 'static> AsyncZeroCopyProcessor<T> {
pub fn new<P: AsRef<Path>>(path: P, chunk_size: usize, config: AsyncConfig) -> Result<Self> {
let reader = ZeroCopyReader::new(path)?;
let numa_node = Self::detect_optimal_numa_node();
Ok(Self {
reader,
chunk_size,
numa_node,
memory_policy: NumaMemoryPolicy::Local,
async_config: config,
_phantom: PhantomData,
})
}
pub fn with_numa_binding<P: AsRef<Path>>(
path: P,
chunk_size: usize,
numa_node: usize,
config: AsyncConfig,
) -> Result<Self> {
let reader = ZeroCopyReader::new(path)?;
Ok(Self {
reader,
chunk_size,
numa_node: Some(numa_node),
memory_policy: NumaMemoryPolicy::Bind(numa_node),
async_config: config,
_phantom: PhantomData,
})
}
fn detect_optimal_numa_node() -> Option<usize> {
#[cfg(target_os = "linux")]
{
use std::process;
Some(process::id() as usize % 2) }
#[cfg(not(target_os = "linux"))]
{
None
}
}
fn apply_memory_advice(&self, addr: *const u8, len: usize) -> Result<()> {
match self.async_config.memory_advice {
MemoryAdvice::Normal => {
}
MemoryAdvice::Sequential => {
}
MemoryAdvice::Random => {
}
MemoryAdvice::WillNeed => {
}
MemoryAdvice::DontNeed => {
}
}
let _ = (addr, len);
Ok(())
}
pub async fn process_async<F, R>(&mut self, shape: Vec<usize>, processor: F) -> Result<Vec<R>>
where
F: Fn(&[T]) -> R + Send + Sync + Clone + 'static,
R: Send + 'static,
{
let _capabilities = PlatformCapabilities::detect();
let numa_node = self.numa_node;
let memory_advice = self.async_config.memory_advice;
let memory_policy = self.memory_policy;
let _max_concurrent_operations = self.async_config.max_concurrent_operations;
let enable_readahead = self.async_config.enable_readahead;
let aligned_chunk_size = self.calculate_aligned_chunk_size();
let mmap = self.reader.map_file()?;
let total_elements: usize = shape.iter().product();
let element_size = mem::size_of::<T>();
let total_bytes = total_elements * element_size;
if mmap.len() < total_bytes {
return Err(IoError::Other(
"File too small for specified shape".to_string(),
));
}
apply_memory_advice_static(mmap.as_ptr(), mmap.len(), memory_advice)?;
if let Some(numa_node) = numa_node {
configure_numa_policy_static(numa_node, memory_policy)?;
}
let ptr = mmap.as_ptr() as *const T;
let data_slice = unsafe { slice::from_raw_parts(ptr, total_elements) };
let chunks: Vec<_> = data_slice.chunks(aligned_chunk_size).collect();
let num_chunks = chunks.len();
#[cfg(feature = "async")]
let semaphore =
std::sync::Arc::new(tokio::sync::Semaphore::new(_max_concurrent_operations));
let tasks: Vec<_> = chunks
.into_iter()
.enumerate()
.map(|(idx, chunk)| {
let processor = processor.clone();
#[cfg(feature = "async")]
let permit = semaphore.clone();
let chunk_data = chunk.to_vec();
let _num_chunks_local = num_chunks;
let _enable_readahead_local = enable_readahead;
#[cfg(feature = "async")]
{
tokio::spawn(async move {
let _permit = permit.acquire().await.expect("Operation failed");
if idx + 1 < _num_chunks_local && _enable_readahead_local {
}
(idx, processor(&chunk_data))
})
}
#[cfg(not(feature = "async"))]
{
std::future::ready((idx, processor(&chunk_data)))
}
})
.collect();
let mut results: Vec<Option<R>> = (0..tasks.len()).map(|_| None).collect();
#[cfg(feature = "async")]
{
for task in tasks {
let (idx, result) = task
.await
.map_err(|e| IoError::Other(format!("Async task failed: {}", e)))?;
results[idx] = Some(result);
}
}
#[cfg(not(feature = "async"))]
{
for task in tasks {
let (idx, result) = task.await;
results[idx] = Some(result);
}
}
Ok(results
.into_iter()
.map(|r| r.expect("Operation failed"))
.collect())
}
fn configure_numa_policy(&self, numanode: usize) -> Result<()> {
#[cfg(target_os = "linux")]
{
match self.memory_policy {
NumaMemoryPolicy::Bind(_node) => {
eprintln!("Binding memory to NUMA _node {_node}");
}
NumaMemoryPolicy::Interleave => {
eprintln!("Enabling NUMA interleaving");
}
NumaMemoryPolicy::Local => {
eprintln!("Using local NUMA _node {numanode}");
}
NumaMemoryPolicy::Default => {
}
}
}
Ok(())
}
fn calculate_aligned_chunk_size(&self) -> usize {
let base_chunk_size = self.chunk_size;
let page_size = 4096; let cache_line_size = 64;
let aligned_to_page = ((base_chunk_size + page_size - 1) / page_size) * page_size;
((aligned_to_page + cache_line_size - 1) / cache_line_size) * cache_line_size
}
pub fn get_numa_info(&self) -> NumaTopologyInfo {
NumaTopologyInfo {
current_node: self.numa_node,
total_nodes: Self::get_total_numa_nodes(),
memory_policy: self.memory_policy,
node_distances: Self::get_numa_distances(),
}
}
fn get_total_numa_nodes() -> usize {
#[cfg(target_os = "linux")]
{
std::fs::read_dir("/sys/devices/system/node/")
.map(|entries| {
entries
.filter_map(|entry| entry.ok())
.filter(|entry| entry.file_name().to_string_lossy().starts_with("node"))
.count()
})
.unwrap_or(1)
}
#[cfg(not(target_os = "linux"))]
{
1 }
}
fn get_numa_distances() -> Vec<Vec<u8>> {
#[cfg(target_os = "linux")]
{
let num_nodes = Self::get_total_numa_nodes();
let mut distances = vec![vec![0u8; num_nodes]; num_nodes];
for (i, distance_row) in distances.iter_mut().enumerate().take(num_nodes) {
for (j, distance_cell) in distance_row.iter_mut().enumerate().take(num_nodes) {
*distance_cell = if i == j { 10 } else { 20 }; }
}
distances
}
#[cfg(not(target_os = "linux"))]
{
vec![vec![10]]
}
}
}
#[derive(Debug, Clone)]
pub struct NumaTopologyInfo {
pub current_node: Option<usize>,
pub total_nodes: usize,
pub memory_policy: NumaMemoryPolicy,
pub node_distances: Vec<Vec<u8>>,
}
pub struct ZeroCopyStreamProcessor<T> {
reader: ZeroCopyReader,
chunk_size: usize,
_phantom: PhantomData<T>,
}
impl<T: Copy + 'static> ZeroCopyStreamProcessor<T> {
pub fn new<P: AsRef<Path>>(path: P, chunk_size: usize) -> Result<Self> {
let reader = ZeroCopyReader::new(path)?;
Ok(Self {
reader,
chunk_size,
_phantom: PhantomData,
})
}
pub fn process_parallel<F, R>(&mut self, shape: Vec<usize>, processor: F) -> Result<Vec<R>>
where
F: Fn(&[T]) -> R + Send + Sync,
R: Send,
T: Send + Sync,
{
let capabilities = PlatformCapabilities::detect();
let mmap = self.reader.map_file()?;
let total_elements: usize = shape.iter().product();
let element_size = mem::size_of::<T>();
let total_bytes = total_elements * element_size;
if mmap.len() < total_bytes {
return Err(IoError::Other(
"File too small for specified shape".to_string(),
));
}
let ptr = mmap.as_ptr() as *const T;
let data_slice = unsafe { slice::from_raw_parts(ptr, total_elements) };
if capabilities.simd_available && total_elements > 10000 {
let results: Vec<R> = data_slice
.chunks(self.chunk_size)
.collect::<Vec<_>>()
.into_par_iter()
.map(&processor)
.collect();
Ok(results)
} else {
let results: Vec<R> = data_slice.chunks(self.chunk_size).map(processor).collect();
Ok(results)
}
}
}
#[allow(dead_code)]
fn apply_memory_advice_static(
addr: *const u8,
len: usize,
memory_advice: MemoryAdvice,
) -> Result<()> {
match memory_advice {
MemoryAdvice::Normal => {
}
MemoryAdvice::Sequential => {
}
MemoryAdvice::Random => {
}
MemoryAdvice::WillNeed => {
}
MemoryAdvice::DontNeed => {
}
}
let _ = (addr, len);
Ok(())
}
#[allow(dead_code)]
fn configure_numa_policy_static(numa_node: usize, memory_policy: NumaMemoryPolicy) -> Result<()> {
#[cfg(target_os = "linux")]
{
match memory_policy {
NumaMemoryPolicy::Bind(_node) => {
eprintln!("Binding memory to NUMA _node {_node}");
}
NumaMemoryPolicy::Interleave => {
eprintln!("Enabling NUMA interleaving");
}
NumaMemoryPolicy::Local => {
eprintln!("Using local NUMA _node {numa_node}");
}
NumaMemoryPolicy::Default => {
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn test_zero_copy_reader() -> Result<()> {
let mut file = NamedTempFile::new().map_err(|e| IoError::FileError(e.to_string()))?;
let data: Vec<f64> = (0..100).map(|i| i as f64).collect();
let bytes = unsafe { slice::from_raw_parts(data.as_ptr() as *const u8, data.len() * 8) };
file.write_all(bytes)
.map_err(|e| IoError::FileError(e.to_string()))?;
let mut reader = ZeroCopyReader::new(file.path())?;
let array_view = reader.read_array::<f64>(vec![10, 10])?;
let view = array_view.as_array_view();
assert_eq!(view.shape(), &[10, 10]);
assert_eq!(view[[0, 0]], 0.0);
assert_eq!(view[[9, 9]], 99.0);
Ok(())
}
#[test]
fn test_zero_copy_csv() {
let data = b"a,b,c\n1,2,3\n4,5,6";
let reader = ZeroCopyCsvReader::new(data, b',');
let lines: Vec<_> = reader.lines().collect();
assert_eq!(lines.len(), 3);
let fields = reader.parse_line(lines[0]);
assert_eq!(fields, vec!["a", "b", "c"]);
}
#[test]
fn test_simd_zero_copy_add() -> Result<()> {
let mut file1 = NamedTempFile::new().map_err(|e| IoError::FileError(e.to_string()))?;
let mut file2 = NamedTempFile::new().map_err(|e| IoError::FileError(e.to_string()))?;
let data1: Vec<f32> = (0..100).map(|i| i as f32).collect();
let data2: Vec<f32> = (0..100).map(|i| (i * 2) as f32).collect();
let bytes1 = unsafe { slice::from_raw_parts(data1.as_ptr() as *const u8, data1.len() * 4) };
let bytes2 = unsafe { slice::from_raw_parts(data2.as_ptr() as *const u8, data2.len() * 4) };
file1
.write_all(bytes1)
.map_err(|e| IoError::FileError(e.to_string()))?;
file2
.write_all(bytes2)
.map_err(|e| IoError::FileError(e.to_string()))?;
let mmap1 = unsafe {
MmapOptions::new()
.map(&file1)
.map_err(|e| IoError::FileError(e.to_string()))?
};
let mmap2 = unsafe {
MmapOptions::new()
.map(&file2)
.map_err(|e| IoError::FileError(e.to_string()))?
};
let result = simd_zero_copy::SimdZeroCopyOpsF32::add_mmap(&mmap1, &mmap2, &[100])?;
assert_eq!(result.len(), 100);
assert_eq!(result[0], 0.0); assert_eq!(result[50], 150.0); assert_eq!(result[99], 297.0);
Ok(())
}
#[test]
fn test_async_config() {
let config = AsyncConfig::default();
assert_eq!(config.max_concurrent_operations, 4);
assert!(config.enable_readahead);
assert_eq!(config.readahead_size, 64 * 1024);
}
#[test]
fn test_numa_topology_info() {
let total_nodes = AsyncZeroCopyProcessor::<f64>::get_total_numa_nodes();
assert!(total_nodes >= 1);
let distances = AsyncZeroCopyProcessor::<f64>::get_numa_distances();
assert_eq!(distances.len(), total_nodes);
if !distances.is_empty() {
assert_eq!(distances[0].len(), total_nodes);
}
}
#[test]
fn test_memory_advice() {
let advice = MemoryAdvice::Sequential;
match advice {
MemoryAdvice::Sequential => {} _ => panic!("Unexpected memory advice"),
}
}
#[test]
fn test_numa_memory_policy() {
let policy = NumaMemoryPolicy::Local;
match policy {
NumaMemoryPolicy::Local => {} _ => panic!("Unexpected NUMA policy"),
}
let bind_policy = NumaMemoryPolicy::Bind(0);
if let NumaMemoryPolicy::Bind(node) = bind_policy {
assert_eq!(node, 0);
}
}
#[cfg(feature = "async")]
#[tokio::test]
async fn test_async_zero_copy_processor() -> Result<()> {
let mut file = NamedTempFile::new().map_err(|e| IoError::FileError(e.to_string()))?;
let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
let bytes = unsafe { slice::from_raw_parts(data.as_ptr() as *const u8, data.len() * 8) };
file.write_all(bytes)
.map_err(|e| IoError::FileError(e.to_string()))?;
let config = AsyncConfig::default();
let mut processor = AsyncZeroCopyProcessor::new(file.path(), 100, config)?;
let shape = vec![1000];
let results = processor
.process_async(shape, |chunk: &[f64]| chunk.iter().sum::<f64>())
.await?;
assert!(!results.is_empty());
let numa_info = processor.get_numa_info();
assert!(numa_info.total_nodes >= 1);
Ok(())
}
}