use scirs2_core::ndarray::{ArrayBase, Data, Ix1};
use std::marker::PhantomData;
use super::parallel::ParallelConfig;
use crate::error::{MetricsError, Result};
pub trait StreamingMetric<T> {
type State;
fn init_state(&self) -> Self::State;
fn update_state(
&self,
state: &mut Self::State,
batch_true: &[T],
batch_pred: &[T],
) -> Result<()>;
fn finalize(&self, state: &Self::State) -> Result<f64>;
}
#[derive(Debug, Clone)]
pub struct ChunkedMetrics {
pub chunk_size: usize,
pub parallel_config: ParallelConfig,
}
impl Default for ChunkedMetrics {
fn default() -> Self {
ChunkedMetrics {
chunk_size: 10000,
parallel_config: ParallelConfig::default(),
}
}
}
impl ChunkedMetrics {
pub fn new() -> Self {
Default::default()
}
pub fn with_chunk_size(mut self, size: usize) -> Self {
self.chunk_size = size;
self
}
pub fn with_parallel_config(mut self, config: ParallelConfig) -> Self {
self.parallel_config = config;
self
}
pub fn compute_streaming<T, S1, S2, M>(
&self,
y_true: &ArrayBase<S1, Ix1>,
y_pred: &ArrayBase<S2, Ix1>,
metric: &M,
) -> Result<f64>
where
T: Clone,
S1: Data<Elem = T>,
S2: Data<Elem = T>,
M: StreamingMetric<T>,
{
if y_true.len() != y_pred.len() {
return Err(MetricsError::DimensionMismatch(format!(
"y_true and y_pred must have the same length, got {} and {}",
y_true.len(),
y_pred.len()
)));
}
let y_true_vec: Vec<T> = y_true.iter().cloned().collect();
let y_pred_vec: Vec<T> = y_pred.iter().cloned().collect();
let mut state = metric.init_state();
for chunk_idx in 0..y_true_vec.len().div_ceil(self.chunk_size) {
let start = chunk_idx * self.chunk_size;
let end = (start + self.chunk_size).min(y_true_vec.len());
metric.update_state(&mut state, &y_true_vec[start..end], &y_pred_vec[start..end])?;
}
metric.finalize(&state)
}
pub fn compute_rowwise<T, R>(
&self,
data: &[T],
row_op: impl Fn(&[T]) -> Result<R>,
combine: impl Fn(&[R]) -> Result<R>,
) -> Result<R>
where
T: Clone,
R: Clone,
{
if data.len() <= self.chunk_size {
return row_op(data);
}
let mut results = Vec::new();
for chunk_idx in 0..data.len().div_ceil(self.chunk_size) {
let start = chunk_idx * self.chunk_size;
let end = (start + self.chunk_size).min(data.len());
let result = row_op(&data[start..end])?;
results.push(result);
}
combine(&results)
}
}
#[derive(Debug, Clone)]
pub struct IncrementalMetrics<T, S> {
state: S,
count: usize,
_marker: PhantomData<T>,
}
impl<T, S> Default for IncrementalMetrics<T, S>
where
S: Default,
{
fn default() -> Self {
Self::new()
}
}
impl<T, S> IncrementalMetrics<T, S>
where
S: Default,
{
pub fn new() -> Self {
IncrementalMetrics {
state: S::default(),
count: 0,
_marker: PhantomData,
}
}
pub fn with_state(state: S) -> Self {
IncrementalMetrics {
state,
count: 0,
_marker: PhantomData,
}
}
pub fn state(&self) -> &S {
&self.state
}
pub fn count(&self) -> usize {
self.count
}
pub fn update<F>(&mut self, y_true: T, y_pred: T, updatefn: F) -> Result<()>
where
F: FnOnce(&mut S, T, T) -> Result<()>,
{
updatefn(&mut self.state, y_true, y_pred)?;
self.count += 1;
Ok(())
}
pub fn update_batch<F>(&mut self, y_true: &[T], y_pred: &[T], updatefn: F) -> Result<()>
where
F: Fn(&mut S, &[T], &[T]) -> Result<()>,
{
if y_true.len() != y_pred.len() {
return Err(MetricsError::DimensionMismatch(
"y_true and y_pred must have the same length".to_string(),
));
}
updatefn(&mut self.state, y_true, y_pred)?;
self.count += y_true.len();
Ok(())
}
pub fn finalize<F, R>(&self, finalizefn: F) -> Result<R>
where
F: FnOnce(&S, usize) -> Result<R>,
{
finalizefn(&self.state, self.count)
}
}
pub trait MemoryMappedMetric<T> {
type State;
fn init_state(&self) -> Self::State;
fn process_chunk(&self, state: &mut Self::State, chunkidx: usize, chunk: &[T]) -> Result<()>;
fn finalize(&self, state: &Self::State) -> Result<f64>;
}
use crossbeam_utils::CachePadded;
use scirs2_core::ndarray::{Array1, Array2, ArrayView1, ArrayView2, ArrayViewMut1, ArrayViewMut2};
use scirs2_core::numeric::Float;
use std::alloc::{alloc, dealloc, GlobalAlloc, Layout, System};
use std::cell::UnsafeCell;
use std::collections::HashMap;
use std::mem::{align_of, size_of, MaybeUninit};
use std::ptr::{addr_of_mut, NonNull};
use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};
#[derive(Debug)]
pub struct ZeroCopyMemoryManager {
memory_pools: HashMap<usize, MemoryPool>,
simd_allocator: SimdAlignedAllocator,
arena_allocator: ArenaAllocator,
mmap_manager: MemoryMappingManager,
recycler: LockFreeRecycler,
stats: MemoryStats,
}
#[derive(Debug)]
pub struct MemoryPool {
block_size: usize,
alignment: usize,
free_blocks: Arc<Mutex<Vec<NonNull<u8>>>>,
capacity: AtomicUsize,
allocated_count: AtomicUsize,
pool_stats: PoolStatistics,
}
#[derive(Debug)]
pub struct SimdAlignedAllocator {
alignment_cache: HashMap<usize, Vec<NonNull<u8>>>,
simd_stats: SimdStats,
}
#[derive(Debug)]
pub struct ArenaAllocator {
current_arena: Arc<Mutex<Arena>>,
arenas: Vec<Arc<Mutex<Arena>>>,
_default_arenasize: usize,
arena_stats: ArenaStats,
}
#[derive(Debug)]
pub struct Arena {
memory: NonNull<u8>,
size: usize,
offset: usize,
alignment: usize,
}
#[derive(Debug)]
pub struct MemoryMappingManager {
mappings: HashMap<String, MemoryMapping>,
mapping_stats: MappingStats,
}
#[derive(Debug)]
pub struct MemoryMapping {
file_handle: i32,
memory_region: NonNull<u8>,
size: usize,
access_mode: AccessMode,
ref_count: AtomicUsize,
}
#[derive(Debug, Clone, Copy)]
pub enum AccessMode {
ReadOnly,
ReadWrite,
WriteOnly,
Execute,
}
#[derive(Debug)]
pub struct LockFreeRecycler {
free_lists: Vec<AtomicPtr<RecyclerNode>>,
hazard_pointers: Vec<AtomicPtr<RecyclerNode>>,
retired_nodes: CachePadded<Mutex<Vec<*mut RecyclerNode>>>,
recycler_stats: RecyclerStats,
}
#[repr(align(64))] #[derive(Debug)]
pub struct RecyclerNode {
next: AtomicPtr<RecyclerNode>,
size: usize,
data: NonNull<u8>,
timestamp: AtomicUsize,
}
#[derive(Debug)]
pub struct ZeroCopyArrayView<'a, T> {
data: NonNull<T>,
len: usize,
_lifetime: std::marker::PhantomData<&'a T>,
memory_manager: &'a ZeroCopyMemoryManager,
}
#[derive(Debug)]
pub struct ZeroCopyArrayViewMut<'a, T> {
data: NonNull<T>,
len: usize,
_lifetime: std::marker::PhantomData<&'a mut T>,
memory_manager: &'a ZeroCopyMemoryManager,
}
pub struct ZeroCopyBuffer<T> {
data: NonNull<T>,
capacity: usize,
length: usize,
layout: Layout,
allocator: Arc<dyn CustomAllocator>,
}
pub trait CustomAllocator: Send + Sync + std::fmt::Debug {
fn allocate(&self, size: usize, alignment: usize) -> Result<NonNull<u8>>;
fn deallocate(&self, ptr: NonNull<u8>, size: usize, alignment: usize);
fn reallocate(
&self,
ptr: NonNull<u8>,
old_size: usize,
newsize: usize,
alignment: usize,
) -> Result<NonNull<u8>>;
fn get_stats(&self) -> AllocatorStats;
fn reset(&self);
}
pub struct ThreadLocalAllocator {
local_pools: std::thread::LocalKey<UnsafeCell<HashMap<usize, Vec<NonNull<u8>>>>>,
global_fallback: Arc<dyn CustomAllocator>,
local_stats: std::thread::LocalKey<UnsafeCell<AllocatorStats>>,
}
#[derive(Debug)]
pub struct SlabAllocator {
slab_size: usize,
object_size: usize,
objects_per_slab: usize,
slabs: Vec<Slab>,
free_objects: Vec<NonNull<u8>>,
slab_stats: SlabStats,
}
#[derive(Debug)]
pub struct Slab {
memory: NonNull<u8>,
free_mask: Vec<u64>,
free_count: usize,
id: usize,
}
#[derive(Debug)]
pub struct BuddyAllocator {
memory_block: NonNull<u8>,
block_size: usize,
free_lists: Vec<Vec<NonNull<u8>>>,
allocation_bitmap: Vec<u64>,
buddy_stats: BuddyStats,
}
#[derive(Debug)]
pub struct MemoryStats {
pub total_allocated: AtomicUsize,
pub total_deallocated: AtomicUsize,
pub peak_usage: AtomicUsize,
pub current_usage: AtomicUsize,
pub allocation_count: AtomicUsize,
pub deallocation_count: AtomicUsize,
pub fragmentation_ratio: AtomicUsize, }
#[derive(Debug)]
pub struct PoolStatistics {
pub hits: AtomicUsize,
pub misses: AtomicUsize,
pub utilization: AtomicUsize,
pub avg_allocation_time: AtomicUsize,
}
#[derive(Debug)]
pub struct SimdStats {
pub allocations_by_alignment: HashMap<usize, AtomicUsize>,
pub simd_memory_usage: AtomicUsize,
pub vectorization_efficiency: AtomicUsize,
}
#[derive(Debug)]
pub struct ArenaStats {
pub arenas_created: AtomicUsize,
pub total_arena_memory: AtomicUsize,
pub arena_utilization: AtomicUsize,
pub fragmentation_waste: AtomicUsize,
}
#[derive(Debug)]
pub struct MappingStats {
pub active_mappings: AtomicUsize,
pub total_mapped_memory: AtomicUsize,
pub cache_hits: AtomicUsize,
pub cache_misses: AtomicUsize,
}
#[derive(Debug)]
pub struct RecyclerStats {
pub successful_recycles: AtomicUsize,
pub failed_recycles: AtomicUsize,
pub hazard_contentions: AtomicUsize,
pub memory_reclaimed: AtomicUsize,
}
#[derive(Debug)]
pub struct AllocatorStats {
pub allocation_requests: AtomicUsize,
pub deallocation_requests: AtomicUsize,
pub bytes_allocated: AtomicUsize,
pub bytes_deallocated: AtomicUsize,
pub allocation_failures: AtomicUsize,
}
#[derive(Debug)]
pub struct SlabStats {
pub slabs_allocated: AtomicUsize,
pub objects_allocated: AtomicUsize,
pub slab_utilization: AtomicUsize,
pub internal_fragmentation: AtomicUsize,
}
#[derive(Debug)]
pub struct BuddyStats {
pub allocations_by_order: Vec<AtomicUsize>,
pub coalescing_operations: AtomicUsize,
pub splitting_operations: AtomicUsize,
pub external_fragmentation: AtomicUsize,
}
impl ZeroCopyMemoryManager {
pub fn new() -> Result<Self> {
Ok(Self {
memory_pools: HashMap::new(),
simd_allocator: SimdAlignedAllocator::new(),
arena_allocator: ArenaAllocator::new(1024 * 1024)?, mmap_manager: MemoryMappingManager::new(),
recycler: LockFreeRecycler::new(),
stats: MemoryStats::new(),
})
}
pub fn allocate_buffer<T>(&self, capacity: usize) -> Result<ZeroCopyBuffer<T>> {
let layout = Layout::array::<T>(capacity)
.map_err(|_| MetricsError::MemoryError("Invalid layout".to_string()))?;
let allocator = self.get_optimal_allocator(layout.size(), layout.align());
let ptr = allocator.allocate(layout.size(), layout.align())?;
self.stats
.total_allocated
.fetch_add(layout.size(), Ordering::Relaxed);
self.stats.allocation_count.fetch_add(1, Ordering::Relaxed);
Ok(ZeroCopyBuffer {
data: ptr.cast::<T>(),
capacity,
length: 0,
layout,
allocator,
})
}
pub fn create_view<'a, T>(&'a self, data: &'a [T]) -> ZeroCopyArrayView<'a, T> {
ZeroCopyArrayView {
data: NonNull::new(data.as_ptr() as *mut T).expect("Operation failed"),
len: data.len(),
_lifetime: std::marker::PhantomData,
memory_manager: self,
}
}
pub fn create_view_mut<'a, T>(&'a self, data: &'a mut [T]) -> ZeroCopyArrayViewMut<'a, T> {
ZeroCopyArrayViewMut {
data: NonNull::new(data.as_mut_ptr()).expect("Operation failed"),
len: data.len(),
_lifetime: std::marker::PhantomData,
memory_manager: self,
}
}
pub fn allocate_simd_aligned<T: Float>(
&mut self,
count: usize,
alignment: usize,
) -> Result<ZeroCopyBuffer<T>> {
let size = count * size_of::<T>();
let ptr = self.simd_allocator.allocate_aligned(size, alignment)?;
Ok(ZeroCopyBuffer {
data: ptr.cast::<T>(),
capacity: count,
length: 0,
layout: Layout::from_size_align(size, alignment).expect("Operation failed"),
allocator: Arc::new(SystemAllocator),
})
}
pub fn map_file<T>(
&self,
file_path: &str,
access_mode: AccessMode,
) -> Result<ZeroCopyArrayView<T>> {
let mapping = self.mmap_manager.map_file(file_path, access_mode)?;
let len = mapping.size / size_of::<T>();
Ok(ZeroCopyArrayView {
data: mapping.memory_region.cast::<T>(),
len,
_lifetime: std::marker::PhantomData,
memory_manager: self,
})
}
fn get_optimal_allocator(&self, size: usize, alignment: usize) -> Arc<dyn CustomAllocator> {
if size <= 4096 && alignment <= 64 {
Arc::new(PoolAllocator::new(size))
} else if alignment > 64 {
Arc::new(SimdAllocatorWrapper::new())
} else if size >= 1024 * 1024 {
Arc::new(ArenaAllocatorWrapper::new())
} else {
Arc::new(SystemAllocator)
}
}
pub fn get_stats(&self) -> &MemoryStats {
&self.stats
}
pub fn garbage_collect(&self) -> Result<usize> {
let mut reclaimed = 0;
reclaimed += self.recycler.reclaim_memory()?;
for pool in self.memory_pools.values() {
reclaimed += pool.compact()?;
}
reclaimed += self.arena_allocator.compact()?;
Ok(reclaimed)
}
}
impl MemoryPool {
pub fn new(_blocksize: usize, alignment: usize, initialcapacity: usize) -> Self {
Self {
block_size: _blocksize,
alignment,
free_blocks: Arc::new(Mutex::new(Vec::with_capacity(initialcapacity))),
capacity: AtomicUsize::new(0),
allocated_count: AtomicUsize::new(0),
pool_stats: PoolStatistics::new(),
}
}
pub fn allocate(&self) -> Result<NonNull<u8>> {
let start_time = std::time::Instant::now();
let mut free_blocks = self.free_blocks.lock().expect("Operation failed");
if let Some(ptr) = free_blocks.pop() {
self.pool_stats.hits.fetch_add(1, Ordering::Relaxed);
drop(free_blocks);
self.allocated_count.fetch_add(1, Ordering::Relaxed);
let allocation_time = start_time.elapsed().as_nanos() as usize;
self.update_avg_allocation_time(allocation_time);
Ok(ptr)
} else {
self.pool_stats.misses.fetch_add(1, Ordering::Relaxed);
drop(free_blocks);
let layout = Layout::from_size_align(self.block_size, self.alignment)
.map_err(|_| MetricsError::MemoryError("Invalid layout".to_string()))?;
let ptr = unsafe { alloc(layout) };
if ptr.is_null() {
return Err(MetricsError::MemoryError("Allocation failed".to_string()));
}
self.capacity.fetch_add(1, Ordering::Relaxed);
self.allocated_count.fetch_add(1, Ordering::Relaxed);
Ok(NonNull::new(ptr).expect("Operation failed"))
}
}
pub fn deallocate(&self, ptr: NonNull<u8>) {
self.free_blocks.lock().expect("Operation failed").push(ptr);
self.allocated_count.fetch_sub(1, Ordering::Relaxed);
}
pub fn compact(&self) -> Result<usize> {
let mut free_blocks = self.free_blocks.lock().expect("Operation failed");
let mut reclaimed = 0;
let keep_count = free_blocks.len() / 2;
let to_deallocate = free_blocks.split_off(keep_count);
for ptr in to_deallocate {
unsafe {
let layout = Layout::from_size_align(self.block_size, self.alignment)
.expect("Operation failed");
dealloc(ptr.as_ptr(), layout);
}
reclaimed += self.block_size;
}
self.capacity
.fetch_sub(reclaimed / self.block_size, Ordering::Relaxed);
Ok(reclaimed)
}
fn update_avg_allocation_time(&self, newtime: usize) {
let current_avg = self.pool_stats.avg_allocation_time.load(Ordering::Relaxed);
let new_avg = if current_avg == 0 {
newtime
} else {
(current_avg * 7 + newtime) / 8 };
self.pool_stats
.avg_allocation_time
.store(new_avg, Ordering::Relaxed);
}
}
impl SimdAlignedAllocator {
pub fn new() -> Self {
Self {
alignment_cache: HashMap::new(),
simd_stats: SimdStats::new(),
}
}
pub fn allocate_aligned(&mut self, size: usize, alignment: usize) -> Result<NonNull<u8>> {
let alignment = alignment.max(align_of::<usize>()).next_power_of_two();
let layout = Layout::from_size_align(size, alignment)
.map_err(|_| MetricsError::MemoryError("Invalid SIMD layout".to_string()))?;
let ptr = unsafe { alloc(layout) };
if ptr.is_null() {
return Err(MetricsError::MemoryError(
"SIMD allocation failed".to_string(),
));
}
self.simd_stats
.simd_memory_usage
.fetch_add(size, Ordering::Relaxed);
self.simd_stats
.allocations_by_alignment
.entry(alignment)
.or_insert_with(|| AtomicUsize::new(0))
.fetch_add(1, Ordering::Relaxed);
Ok(NonNull::new(ptr).expect("Operation failed"))
}
}
impl ArenaAllocator {
pub fn new(_default_arenasize: usize) -> Result<Self> {
let initial_arena = Arc::new(Mutex::new(Arena::new(_default_arenasize)?));
Ok(Self {
current_arena: initial_arena.clone(),
arenas: vec![initial_arena],
_default_arenasize,
arena_stats: ArenaStats::new(),
})
}
pub fn allocate(&mut self, size: usize, alignment: usize) -> Result<NonNull<u8>> {
let mut arena = self.current_arena.lock().expect("Operation failed");
if let Ok(ptr) = arena.allocate(size, alignment) {
Ok(ptr)
} else {
drop(arena);
let new_arena_size = self._default_arenasize.max(size * 2);
let new_arena = Arc::new(Mutex::new(Arena::new(new_arena_size)?));
self.arenas.push(new_arena.clone());
let mut arena = new_arena.lock().expect("Operation failed");
arena.allocate(size, alignment)
}
}
pub fn reset(&self) {
for arena in &self.arenas {
arena.lock().expect("Operation failed").reset();
}
}
pub fn compact(&self) -> Result<usize> {
self.arena_stats
.fragmentation_waste
.store(0, Ordering::Relaxed);
Ok(0)
}
}
impl Arena {
pub fn new(size: usize) -> Result<Self> {
let layout = Layout::from_size_align(size, 64) .map_err(|_| MetricsError::MemoryError("Invalid arena layout".to_string()))?;
let ptr = unsafe { alloc(layout) };
if ptr.is_null() {
return Err(MetricsError::MemoryError(
"Arena allocation failed".to_string(),
));
}
Ok(Self {
memory: NonNull::new(ptr).expect("Operation failed"),
size,
offset: 0,
alignment: 64,
})
}
pub fn allocate(&mut self, size: usize, alignment: usize) -> Result<NonNull<u8>> {
let aligned_offset = (self.offset + alignment - 1) & !(alignment - 1);
if aligned_offset + size > self.size {
return Err(MetricsError::MemoryError("Arena exhausted".to_string()));
}
let ptr = unsafe { self.memory.as_ptr().add(aligned_offset) };
self.offset = aligned_offset + size;
Ok(NonNull::new(ptr).expect("Operation failed"))
}
pub fn reset(&mut self) {
self.offset = 0;
}
}
impl MemoryMappingManager {
pub fn new() -> Self {
Self {
mappings: HashMap::new(),
mapping_stats: MappingStats::new(),
}
}
pub fn map_file(&self, _file_path: &str, _accessmode: AccessMode) -> Result<&MemoryMapping> {
Err(MetricsError::MemoryError(
"Memory mapping not implemented".to_string(),
))
}
}
impl LockFreeRecycler {
pub fn new() -> Self {
const NUM_SIZE_CLASSES: usize = 64;
Self {
free_lists: (0..NUM_SIZE_CLASSES)
.map(|_| AtomicPtr::new(std::ptr::null_mut()))
.collect(),
hazard_pointers: (0..NUM_SIZE_CLASSES)
.map(|_| AtomicPtr::new(std::ptr::null_mut()))
.collect(),
retired_nodes: CachePadded::new(Mutex::new(Vec::new())),
recycler_stats: RecyclerStats::new(),
}
}
pub fn reclaim_memory(&self) -> Result<usize> {
let mut reclaimed = 0;
let mut retired = self.retired_nodes.lock().expect("Operation failed");
for node_ptr in retired.drain(..) {
unsafe {
let node = Box::from_raw(node_ptr);
reclaimed += node.size;
}
}
self.recycler_stats
.memory_reclaimed
.fetch_add(reclaimed, Ordering::Relaxed);
Ok(reclaimed)
}
}
#[derive(Debug)]
pub struct SystemAllocator;
impl CustomAllocator for SystemAllocator {
fn allocate(&self, size: usize, alignment: usize) -> Result<NonNull<u8>> {
let layout = Layout::from_size_align(size, alignment)
.map_err(|_| MetricsError::MemoryError("Invalid layout".to_string()))?;
let ptr = unsafe { alloc(layout) };
if ptr.is_null() {
return Err(MetricsError::MemoryError(
"System allocation failed".to_string(),
));
}
Ok(NonNull::new(ptr).expect("Operation failed"))
}
fn deallocate(&self, ptr: NonNull<u8>, size: usize, alignment: usize) {
let layout = Layout::from_size_align(size, alignment).expect("Operation failed");
unsafe { dealloc(ptr.as_ptr(), layout) };
}
fn reallocate(
&self,
ptr: NonNull<u8>,
old_size: usize,
newsize: usize,
alignment: usize,
) -> Result<NonNull<u8>> {
let new_ptr = self.allocate(newsize, alignment)?;
unsafe {
std::ptr::copy_nonoverlapping(ptr.as_ptr(), new_ptr.as_ptr(), old_size.min(newsize));
}
self.deallocate(ptr, old_size, alignment);
Ok(new_ptr)
}
fn get_stats(&self) -> AllocatorStats {
AllocatorStats::new()
}
fn reset(&self) {
}
}
#[derive(Debug)]
pub struct PoolAllocator {
block_size: usize,
}
impl PoolAllocator {
pub fn new(_blocksize: usize) -> Self {
Self {
block_size: _blocksize,
}
}
}
impl CustomAllocator for PoolAllocator {
fn allocate(&self, size: usize, alignment: usize) -> Result<NonNull<u8>> {
if size > self.block_size {
return Err(MetricsError::MemoryError(
"Size exceeds pool block size".to_string(),
));
}
let layout = Layout::from_size_align(self.block_size, alignment)
.map_err(|_| MetricsError::MemoryError("Invalid pool layout".to_string()))?;
let ptr = unsafe { alloc(layout) };
if ptr.is_null() {
return Err(MetricsError::MemoryError(
"Pool allocation failed".to_string(),
));
}
Ok(NonNull::new(ptr).expect("Operation failed"))
}
fn deallocate(&self, ptr: NonNull<u8>, size: usize, alignment: usize) {
let layout = Layout::from_size_align(self.block_size, alignment).expect("Operation failed");
unsafe { dealloc(ptr.as_ptr(), layout) };
}
fn reallocate(
&self,
ptr: NonNull<u8>,
old_size: usize,
newsize: usize,
alignment: usize,
) -> Result<NonNull<u8>> {
if newsize <= self.block_size {
Ok(ptr) } else {
let new_ptr = self.allocate(newsize, alignment)?;
unsafe {
std::ptr::copy_nonoverlapping(
ptr.as_ptr(),
new_ptr.as_ptr(),
old_size.min(newsize),
);
}
self.deallocate(ptr, old_size, alignment);
Ok(new_ptr)
}
}
fn get_stats(&self) -> AllocatorStats {
AllocatorStats::new()
}
fn reset(&self) {
}
}
#[derive(Debug)]
pub struct SimdAllocatorWrapper;
impl SimdAllocatorWrapper {
pub fn new() -> Self {
Self
}
}
impl CustomAllocator for SimdAllocatorWrapper {
fn allocate(&self, size: usize, alignment: usize) -> Result<NonNull<u8>> {
let simd_alignment = alignment.max(32).next_power_of_two();
let layout = Layout::from_size_align(size, simd_alignment)
.map_err(|_| MetricsError::MemoryError("Invalid SIMD layout".to_string()))?;
let ptr = unsafe { alloc(layout) };
if ptr.is_null() {
return Err(MetricsError::MemoryError(
"SIMD allocation failed".to_string(),
));
}
Ok(NonNull::new(ptr).expect("Operation failed"))
}
fn deallocate(&self, ptr: NonNull<u8>, size: usize, alignment: usize) {
let simd_alignment = alignment.max(32).next_power_of_two();
let layout = Layout::from_size_align(size, simd_alignment).expect("Operation failed");
unsafe { dealloc(ptr.as_ptr(), layout) };
}
fn reallocate(
&self,
ptr: NonNull<u8>,
old_size: usize,
newsize: usize,
alignment: usize,
) -> Result<NonNull<u8>> {
let new_ptr = self.allocate(newsize, alignment)?;
unsafe {
std::ptr::copy_nonoverlapping(ptr.as_ptr(), new_ptr.as_ptr(), old_size.min(newsize));
}
self.deallocate(ptr, old_size, alignment);
Ok(new_ptr)
}
fn get_stats(&self) -> AllocatorStats {
AllocatorStats::new()
}
fn reset(&self) {
}
}
#[derive(Debug)]
pub struct ArenaAllocatorWrapper;
impl ArenaAllocatorWrapper {
pub fn new() -> Self {
Self
}
}
impl CustomAllocator for ArenaAllocatorWrapper {
fn allocate(&self, size: usize, alignment: usize) -> Result<NonNull<u8>> {
let layout = Layout::from_size_align(size, alignment)
.map_err(|_| MetricsError::MemoryError("Invalid arena layout".to_string()))?;
let ptr = unsafe { alloc(layout) };
if ptr.is_null() {
return Err(MetricsError::MemoryError(
"Arena allocation failed".to_string(),
));
}
Ok(NonNull::new(ptr).expect("Operation failed"))
}
fn deallocate(&self, ptr: NonNull<u8>, size: usize, alignment: usize) {
let layout = Layout::from_size_align(size, alignment).expect("Operation failed");
unsafe { dealloc(ptr.as_ptr(), layout) };
}
fn reallocate(
&self,
ptr: NonNull<u8>,
old_size: usize,
newsize: usize,
alignment: usize,
) -> Result<NonNull<u8>> {
let new_ptr = self.allocate(newsize, alignment)?;
unsafe {
std::ptr::copy_nonoverlapping(ptr.as_ptr(), new_ptr.as_ptr(), old_size.min(newsize));
}
self.deallocate(ptr, old_size, alignment);
Ok(new_ptr)
}
fn get_stats(&self) -> AllocatorStats {
AllocatorStats::new()
}
fn reset(&self) {
}
}
impl MemoryStats {
pub fn new() -> Self {
Self {
total_allocated: AtomicUsize::new(0),
total_deallocated: AtomicUsize::new(0),
peak_usage: AtomicUsize::new(0),
current_usage: AtomicUsize::new(0),
allocation_count: AtomicUsize::new(0),
deallocation_count: AtomicUsize::new(0),
fragmentation_ratio: AtomicUsize::new(0),
}
}
}
impl PoolStatistics {
pub fn new() -> Self {
Self {
hits: AtomicUsize::new(0),
misses: AtomicUsize::new(0),
utilization: AtomicUsize::new(0),
avg_allocation_time: AtomicUsize::new(0),
}
}
}
impl SimdStats {
pub fn new() -> Self {
Self {
allocations_by_alignment: HashMap::new(),
simd_memory_usage: AtomicUsize::new(0),
vectorization_efficiency: AtomicUsize::new(0),
}
}
}
impl ArenaStats {
pub fn new() -> Self {
Self {
arenas_created: AtomicUsize::new(0),
total_arena_memory: AtomicUsize::new(0),
arena_utilization: AtomicUsize::new(0),
fragmentation_waste: AtomicUsize::new(0),
}
}
}
impl MappingStats {
pub fn new() -> Self {
Self {
active_mappings: AtomicUsize::new(0),
total_mapped_memory: AtomicUsize::new(0),
cache_hits: AtomicUsize::new(0),
cache_misses: AtomicUsize::new(0),
}
}
}
impl RecyclerStats {
pub fn new() -> Self {
Self {
successful_recycles: AtomicUsize::new(0),
failed_recycles: AtomicUsize::new(0),
hazard_contentions: AtomicUsize::new(0),
memory_reclaimed: AtomicUsize::new(0),
}
}
}
impl AllocatorStats {
pub fn new() -> Self {
Self {
allocation_requests: AtomicUsize::new(0),
deallocation_requests: AtomicUsize::new(0),
bytes_allocated: AtomicUsize::new(0),
bytes_deallocated: AtomicUsize::new(0),
allocation_failures: AtomicUsize::new(0),
}
}
}
impl SlabStats {
pub fn new() -> Self {
Self {
slabs_allocated: AtomicUsize::new(0),
objects_allocated: AtomicUsize::new(0),
slab_utilization: AtomicUsize::new(0),
internal_fragmentation: AtomicUsize::new(0),
}
}
}
impl BuddyStats {
pub fn new() -> Self {
Self {
allocations_by_order: (0..32).map(|_| AtomicUsize::new(0)).collect(),
coalescing_operations: AtomicUsize::new(0),
splitting_operations: AtomicUsize::new(0),
external_fragmentation: AtomicUsize::new(0),
}
}
}
impl<T> ZeroCopyBuffer<T> {
pub fn len(&self) -> usize {
self.length
}
pub fn is_empty(&self) -> bool {
self.length == 0
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn push(&mut self, value: T) -> Result<()> {
if self.length >= self.capacity {
return Err(MetricsError::MemoryError(
"Buffer capacity exceeded".to_string(),
));
}
unsafe {
std::ptr::write(self.data.as_ptr().add(self.length), value);
}
self.length += 1;
Ok(())
}
pub fn as_slice(&self) -> &[T] {
unsafe { std::slice::from_raw_parts(self.data.as_ptr(), self.length) }
}
pub fn as_mut_slice(&mut self) -> &mut [T] {
unsafe { std::slice::from_raw_parts_mut(self.data.as_ptr(), self.length) }
}
pub fn resize(&mut self, newsize: usize) -> Result<()> {
if newsize <= self.capacity {
self.length = newsize;
Ok(())
} else {
let new_ptr = self.allocator.reallocate(
self.data.cast::<u8>(),
self.layout.size(),
newsize * size_of::<T>(),
self.layout.align(),
)?;
self.data = new_ptr.cast::<T>();
self.capacity = newsize;
self.length = newsize;
Ok(())
}
}
}
impl<T> Drop for ZeroCopyBuffer<T> {
fn drop(&mut self) {
for i in 0..self.length {
unsafe {
std::ptr::drop_in_place(self.data.as_ptr().add(i));
}
}
self.allocator.deallocate(
self.data.cast::<u8>(),
self.layout.size(),
self.layout.align(),
);
}
}
impl<'a, T> ZeroCopyArrayView<'a, T> {
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn get(&self, index: usize) -> Option<&T> {
if index < self.len {
unsafe { Some(&*self.data.as_ptr().add(index)) }
} else {
None
}
}
pub fn as_slice(&self) -> &[T] {
unsafe { std::slice::from_raw_parts(self.data.as_ptr(), self.len) }
}
pub fn subview(&self, start: usize, len: usize) -> Result<ZeroCopyArrayView<'a, T>> {
if start + len > self.len {
return Err(MetricsError::IndexError(
"Subview bounds exceed array".to_string(),
));
}
Ok(ZeroCopyArrayView {
data: unsafe { NonNull::new_unchecked(self.data.as_ptr().add(start)) },
len,
_lifetime: std::marker::PhantomData,
memory_manager: self.memory_manager,
})
}
}
impl<'a, T> ZeroCopyArrayViewMut<'a, T> {
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn get(&self, index: usize) -> Option<&T> {
if index < self.len {
unsafe { Some(&*self.data.as_ptr().add(index)) }
} else {
None
}
}
pub fn get_mut(&mut self, index: usize) -> Option<&mut T> {
if index < self.len {
unsafe { Some(&mut *self.data.as_ptr().add(index)) }
} else {
None
}
}
pub fn as_slice(&self) -> &[T] {
unsafe { std::slice::from_raw_parts(self.data.as_ptr(), self.len) }
}
pub fn as_mut_slice(&mut self) -> &mut [T] {
unsafe { std::slice::from_raw_parts_mut(self.data.as_ptr(), self.len) }
}
}
#[cfg(test)]
mod tests {
use super::*;
use scirs2_core::ndarray::Array1;
struct StreamingMAE;
impl StreamingMetric<f64> for StreamingMAE {
type State = (f64, usize);
fn init_state(&self) -> Self::State {
(0.0, 0)
}
fn update_state(
&self,
state: &mut Self::State,
batch_true: &[f64],
batch_pred: &[f64],
) -> Result<()> {
for (y_t, y_p) in batch_true.iter().zip(batch_pred.iter()) {
state.0 += (y_t - y_p).abs();
state.1 += 1;
}
Ok(())
}
fn finalize(&self, state: &Self::State) -> Result<f64> {
if state.1 == 0 {
return Err(MetricsError::DivisionByZero);
}
Ok(state.0 / state.1 as f64)
}
}
#[test]
fn test_chunked_streaming_metric() {
let y_true = Array1::from_vec(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
let y_pred = Array1::from_vec(vec![1.2, 2.3, 2.9, 4.1, 5.2]);
let chunked = ChunkedMetrics::new().with_chunk_size(2);
let mae = chunked
.compute_streaming(&y_true, &y_pred, &StreamingMAE)
.expect("Operation failed");
let expected_mae = y_true
.iter()
.zip(y_pred.iter())
.map(|(t, p)| (t - p).abs())
.sum::<f64>()
/ y_true.len() as f64;
assert!((mae - expected_mae).abs() < 1e-10);
}
#[test]
fn test_compute_rowwise() {
let data: Vec<f64> = (0..100).map(|x| x as f64).collect();
let row_op = |chunk: &[f64]| -> Result<f64> { Ok(chunk.iter().map(|x| x * x).sum()) };
let combine = |results: &[f64]| -> Result<f64> { Ok(results.iter().sum()) };
let chunked = ChunkedMetrics::new().with_chunk_size(10);
let result = chunked
.compute_rowwise(&data, row_op, combine)
.expect("Operation failed");
let expected: f64 = data.iter().map(|x| x * x).sum();
assert!((result - expected).abs() < 1e-10);
}
#[test]
fn test_incremental_metrics() {
let data = vec![(1.0, 1.2), (2.0, 1.8), (3.0, 3.1), (4.0, 4.2), (5.0, 4.9)];
let mse_update = |state: &mut f64, y_true: f64, y_pred: f64| -> Result<()> {
*state += (y_true - y_pred).powi(2);
Ok(())
};
let mse_finalize = |state: &f64, count: usize| -> Result<f64> {
if count == 0 {
return Err(MetricsError::DivisionByZero);
}
Ok(*state / count as f64)
};
let expected_mse =
data.iter().map(|&(t, p)| (t - p) * (t - p)).sum::<f64>() / data.len() as f64;
let mut incremental = IncrementalMetrics::<f64, f64>::new();
for &(y_true, y_pred) in &data {
incremental
.update(y_true, y_pred, mse_update)
.expect("Operation failed");
}
let mse = incremental
.finalize(mse_finalize)
.expect("Operation failed");
assert!((mse - expected_mse).abs() < 1e-10);
let (y_true, y_pred): (Vec<_>, Vec<_>) = data.iter().cloned().unzip();
let batch_update = |state: &mut f64, y_true: &[f64], y_pred: &[f64]| -> Result<()> {
for (t, p) in y_true.iter().zip(y_pred.iter()) {
*state += (t - p).powi(2);
}
Ok(())
};
let mut incremental_batch = IncrementalMetrics::<f64, f64>::new();
incremental_batch
.update_batch(&y_true, &y_pred, batch_update)
.expect("Operation failed");
let mse_batch = incremental_batch
.finalize(mse_finalize)
.expect("Operation failed");
assert!((mse_batch - expected_mse).abs() < 1e-10);
}
}