use crate::core::error::{Error, Result};
use crate::storage::unified_memory::*;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::mem;
use std::ops::{Deref, Range};
use std::ptr::NonNull;
use std::slice;
use std::sync::{Arc, Mutex, RwLock};
pub const CACHE_LINE_SIZE: usize = 64;
pub const PAGE_SIZE: usize = 4096;
#[derive(Debug)]
pub struct ZeroCopyView<T> {
data: NonNull<T>,
len: usize,
capacity: usize,
layout: MemoryLayout,
_storage_handle: Arc<StorageHandle>,
_phantom: PhantomData<T>,
}
unsafe impl<T: Send> Send for ZeroCopyView<T> {}
unsafe impl<T: Sync> Sync for ZeroCopyView<T> {}
impl<T> ZeroCopyView<T> {
pub unsafe fn new(
data: NonNull<T>,
len: usize,
capacity: usize,
layout: MemoryLayout,
storage_handle: Arc<StorageHandle>,
) -> Self {
Self {
data,
len,
capacity,
layout,
_storage_handle: storage_handle,
_phantom: PhantomData,
}
}
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn layout(&self) -> &MemoryLayout {
&self.layout
}
pub fn as_slice(&self) -> &[T] {
unsafe { slice::from_raw_parts(self.data.as_ptr(), self.len) }
}
pub unsafe fn as_mut_slice(&mut self) -> &mut [T] {
slice::from_raw_parts_mut(self.data.as_ptr(), self.len)
}
pub fn subview(&self, range: Range<usize>) -> Result<ZeroCopyView<T>> {
if range.start > self.len || range.end > self.len || range.start > range.end {
return Err(Error::InvalidOperation(
"Invalid range for subview".to_string(),
));
}
let new_len = range.end - range.start;
let new_data = unsafe { NonNull::new_unchecked(self.data.as_ptr().add(range.start)) };
Ok(unsafe {
ZeroCopyView::new(
new_data,
new_len,
self.capacity - range.start,
self.layout.clone(),
Arc::clone(&self._storage_handle),
)
})
}
pub fn as_ptr(&self) -> *const T {
self.data.as_ptr()
}
pub fn is_cache_aligned(&self) -> bool {
self.data.as_ptr() as usize % CACHE_LINE_SIZE == 0
}
pub fn memory_address(&self) -> usize {
self.data.as_ptr() as usize
}
}
impl<T> Deref for ZeroCopyView<T> {
type Target = [T];
fn deref(&self) -> &Self::Target {
self.as_slice()
}
}
#[derive(Debug, Clone)]
pub struct MemoryLayout {
pub start_address: usize,
pub element_size: usize,
pub stride: usize,
pub alignment: usize,
pub cache_aligned: bool,
pub numa_node: Option<u32>,
}
impl MemoryLayout {
pub fn new<T>() -> Self {
Self {
start_address: 0,
element_size: mem::size_of::<T>(),
stride: mem::size_of::<T>(),
alignment: mem::align_of::<T>(),
cache_aligned: false,
numa_node: None,
}
}
pub fn with_cache_alignment(mut self) -> Self {
self.cache_aligned = true;
self.alignment = self.alignment.max(CACHE_LINE_SIZE);
self
}
pub fn with_numa_node(mut self, node: u32) -> Self {
self.numa_node = Some(node);
self
}
}
pub struct CacheAwareAllocator {
cache_topology: CacheTopology,
memory_pools: HashMap<CacheLevel, MemoryPool>,
stats: AllocationStats,
}
impl CacheAwareAllocator {
pub fn new() -> Result<Self> {
let cache_topology = CacheTopology::detect()?;
let mut memory_pools = HashMap::new();
memory_pools.insert(CacheLevel::L1, MemoryPool::new(64 * 1024)?); memory_pools.insert(CacheLevel::L2, MemoryPool::new(512 * 1024)?); memory_pools.insert(CacheLevel::L3, MemoryPool::new(4 * 1024 * 1024)?); memory_pools.insert(CacheLevel::Memory, MemoryPool::new(64 * 1024 * 1024)?);
Ok(Self {
cache_topology,
memory_pools,
stats: AllocationStats::new(),
})
}
pub fn allocate_aligned<T>(
&mut self,
count: usize,
cache_level: CacheLevel,
) -> Result<ZeroCopyView<T>> {
let size = count * mem::size_of::<T>();
let alignment = CACHE_LINE_SIZE.max(mem::align_of::<T>());
let pool = self
.memory_pools
.get_mut(&cache_level)
.ok_or_else(|| Error::InvalidOperation("Cache level not supported".to_string()))?;
let allocation = pool.allocate_aligned(size, alignment)?;
let layout = MemoryLayout {
start_address: allocation.ptr as usize,
element_size: mem::size_of::<T>(),
stride: mem::size_of::<T>(),
alignment,
cache_aligned: true,
numa_node: self.cache_topology.numa_node,
};
let storage_handle = Arc::new(StorageHandle::new(
StorageId(allocation.ptr as u64),
StorageType::InMemory,
Box::new(allocation),
StorageMetadata::new(size),
));
self.stats.record_allocation(size);
unsafe {
Ok(ZeroCopyView::new(
NonNull::new(allocation.ptr as *mut T).ok_or_else(|| {
Error::InvalidOperation("Null pointer allocation".to_string())
})?,
count,
count,
layout,
storage_handle,
))
}
}
pub fn stats(&self) -> &AllocationStats {
&self.stats
}
pub fn cache_topology(&self) -> &CacheTopology {
&self.cache_topology
}
}
#[derive(Debug, Clone)]
pub struct CacheTopology {
pub l1_cache_size: usize,
pub l2_cache_size: usize,
pub l3_cache_size: usize,
pub cache_line_size: usize,
pub cpu_cores: usize,
pub numa_node: Option<u32>,
}
impl CacheTopology {
pub fn detect() -> Result<Self> {
Ok(Self {
l1_cache_size: 32 * 1024, l2_cache_size: 256 * 1024, l3_cache_size: 8 * 1024 * 1024, cache_line_size: CACHE_LINE_SIZE,
cpu_cores: num_cpus::get(),
numa_node: None,
})
}
pub fn optimal_cache_level(&self, size: usize) -> CacheLevel {
if size <= self.l1_cache_size / 2 {
CacheLevel::L1
} else if size <= self.l2_cache_size / 2 {
CacheLevel::L2
} else if size <= self.l3_cache_size / 2 {
CacheLevel::L3
} else {
CacheLevel::Memory
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum CacheLevel {
L1,
L2,
L3,
Memory,
}
pub struct MemoryPool {
size: usize,
free_blocks: Vec<MemoryBlock>,
used_blocks: Vec<MemoryBlock>,
base_ptr: NonNull<u8>,
current_offset: usize,
}
impl MemoryPool {
pub fn new(size: usize) -> Result<Self> {
let layout = std::alloc::Layout::from_size_align(size, PAGE_SIZE)
.map_err(|_| Error::InvalidOperation("Invalid memory layout".to_string()))?;
let ptr = unsafe { std::alloc::alloc(layout) };
let base_ptr = NonNull::new(ptr)
.ok_or_else(|| Error::InvalidOperation("Memory allocation failed".to_string()))?;
Ok(Self {
size,
free_blocks: vec![MemoryBlock {
ptr: ptr as *mut u8,
size,
alignment: PAGE_SIZE,
}],
used_blocks: Vec::new(),
base_ptr,
current_offset: 0,
})
}
pub fn allocate_aligned(&mut self, size: usize, alignment: usize) -> Result<MemoryBlock> {
let aligned_size = (size + alignment - 1) & !(alignment - 1);
for (i, block) in self.free_blocks.iter().enumerate() {
if block.size >= aligned_size {
let allocated_block = MemoryBlock {
ptr: block.ptr,
size: aligned_size,
alignment,
};
if block.size > aligned_size {
self.free_blocks[i] = MemoryBlock {
ptr: unsafe { block.ptr.add(aligned_size) },
size: block.size - aligned_size,
alignment: block.alignment,
};
} else {
self.free_blocks.remove(i);
}
self.used_blocks.push(allocated_block);
return Ok(allocated_block);
}
}
Err(Error::InvalidOperation(
"Not enough memory in pool".to_string(),
))
}
}
impl Drop for MemoryPool {
fn drop(&mut self) {
unsafe {
let layout = std::alloc::Layout::from_size_align_unchecked(self.size, PAGE_SIZE);
std::alloc::dealloc(self.base_ptr.as_ptr(), layout);
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct MemoryBlock {
pub ptr: *mut u8,
pub size: usize,
pub alignment: usize,
}
unsafe impl Send for MemoryBlock {}
unsafe impl Sync for MemoryBlock {}
#[derive(Debug, Clone)]
pub struct AllocationStats {
pub total_allocated: usize,
pub allocation_count: usize,
pub peak_usage: usize,
pub current_usage: usize,
pub cache_hit_rate: f64,
}
impl AllocationStats {
pub fn new() -> Self {
Self {
total_allocated: 0,
allocation_count: 0,
peak_usage: 0,
current_usage: 0,
cache_hit_rate: 0.0,
}
}
pub fn record_allocation(&mut self, size: usize) {
self.total_allocated += size;
self.allocation_count += 1;
self.current_usage += size;
if self.current_usage > self.peak_usage {
self.peak_usage = self.current_usage;
}
}
pub fn record_deallocation(&mut self, size: usize) {
self.current_usage = self.current_usage.saturating_sub(size);
}
}
pub struct MemoryMappedView<T> {
mmap: memmap2::Mmap,
len: usize,
layout: MemoryLayout,
_phantom: PhantomData<T>,
}
impl<T> MemoryMappedView<T> {
pub fn from_file(file: std::fs::File, len: usize) -> Result<Self> {
let mmap = unsafe {
memmap2::Mmap::map(&file)
.map_err(|e| Error::InvalidOperation(format!("Memory mapping failed: {}", e)))?
};
let layout = MemoryLayout {
start_address: mmap.as_ptr() as usize,
element_size: mem::size_of::<T>(),
stride: mem::size_of::<T>(),
alignment: mem::align_of::<T>(),
cache_aligned: false,
numa_node: None,
};
Ok(Self {
mmap,
len,
layout,
_phantom: PhantomData,
})
}
pub fn as_slice(&self) -> &[T] {
unsafe {
slice::from_raw_parts(
self.mmap.as_ptr() as *const T,
self.len.min(self.mmap.len() / mem::size_of::<T>()),
)
}
}
pub fn layout(&self) -> &MemoryLayout {
&self.layout
}
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
}
impl<T> Deref for MemoryMappedView<T> {
type Target = [T];
fn deref(&self) -> &Self::Target {
self.as_slice()
}
}
pub trait CacheAwareOps<T> {
fn linear_scan<F>(&self, predicate: F) -> Vec<usize>
where
F: Fn(&T) -> bool;
fn blocked_operation<U, F>(&self, other: &[U], block_size: usize, op: F) -> Vec<T>
where
F: Fn(&T, &U) -> T,
T: Clone,
U: Clone;
fn prefetch(&self, indices: &[usize]);
fn optimal_block_size(&self) -> usize;
}
impl<T> CacheAwareOps<T> for ZeroCopyView<T> {
fn linear_scan<F>(&self, predicate: F) -> Vec<usize>
where
F: Fn(&T) -> bool,
{
let mut results = Vec::new();
let slice = self.as_slice();
let block_size = self.optimal_block_size();
for (block_start, chunk) in slice.chunks(block_size).enumerate() {
for (i, item) in chunk.iter().enumerate() {
if predicate(item) {
results.push(block_start * block_size + i);
}
}
}
results
}
fn blocked_operation<U, F>(&self, other: &[U], block_size: usize, op: F) -> Vec<T>
where
F: Fn(&T, &U) -> T,
T: Clone,
U: Clone,
{
let slice = self.as_slice();
let mut result = Vec::with_capacity(slice.len().min(other.len()));
let pairs = slice.iter().zip(other.iter());
for chunk in pairs.collect::<Vec<_>>().chunks(block_size) {
for (a, b) in chunk {
result.push(op(a, b));
}
}
result
}
fn prefetch(&self, indices: &[usize]) {
let slice = self.as_slice();
for &index in indices {
if index < slice.len() {
unsafe {
let ptr = slice.as_ptr().add(index);
#[cfg(target_arch = "x86_64")]
std::arch::x86_64::_mm_prefetch(
ptr as *const i8,
std::arch::x86_64::_MM_HINT_T0,
);
}
}
}
}
fn optimal_block_size(&self) -> usize {
let cache_size = 32 * 1024; let element_size = mem::size_of::<T>();
(cache_size / element_size).max(64)
}
}
pub struct ZeroCopyManager {
allocator: Mutex<CacheAwareAllocator>,
active_views: RwLock<HashMap<usize, ViewMetadata>>,
stats: Mutex<ZeroCopyStats>,
}
impl ZeroCopyManager {
pub fn new() -> Result<Self> {
Ok(Self {
allocator: Mutex::new(CacheAwareAllocator::new()?),
active_views: RwLock::new(HashMap::new()),
stats: Mutex::new(ZeroCopyStats::new()),
})
}
pub fn create_view<T: Clone>(&self, data: Vec<T>) -> Result<ZeroCopyView<T>> {
let len = data.len();
let size = len * mem::size_of::<T>();
let cache_level = {
let allocator = self.allocator.lock().map_err(|_| {
Error::InvalidOperation("Failed to acquire allocator lock".to_string())
})?;
allocator.cache_topology().optimal_cache_level(size)
};
let mut allocator = self
.allocator
.lock()
.map_err(|_| Error::InvalidOperation("Failed to acquire allocator lock".to_string()))?;
let mut view = allocator.allocate_aligned(len, cache_level)?;
unsafe {
let dest = view.as_mut_slice();
for (i, item) in data.into_iter().enumerate() {
if i < dest.len() {
dest[i] = item;
}
}
}
let view_id = view.memory_address();
let metadata = ViewMetadata {
size,
cache_level,
creation_time: std::time::Instant::now(),
};
self.active_views
.write()
.map_err(|_| Error::InvalidOperation("Failed to acquire views lock".to_string()))?
.insert(view_id, metadata);
self.stats
.lock()
.map_err(|_| Error::InvalidOperation("Failed to acquire stats lock".to_string()))?
.record_view_creation(size);
Ok(view)
}
pub fn create_mmap_view<T>(&self, file_path: &str, len: usize) -> Result<MemoryMappedView<T>> {
let file = std::fs::File::open(file_path)
.map_err(|e| Error::InvalidOperation(format!("Failed to open file: {}", e)))?;
let view = MemoryMappedView::from_file(file, len)?;
self.stats
.lock()
.map_err(|_| Error::InvalidOperation("Failed to acquire stats lock".to_string()))?
.record_mmap_creation(len * mem::size_of::<T>());
Ok(view)
}
pub fn stats(&self) -> Result<ZeroCopyStats> {
self.stats
.lock()
.map(|stats| stats.clone())
.map_err(|_| Error::InvalidOperation("Failed to acquire stats lock".to_string()))
}
}
#[derive(Debug, Clone)]
struct ViewMetadata {
size: usize,
cache_level: CacheLevel,
creation_time: std::time::Instant,
}
#[derive(Debug, Clone)]
pub struct ZeroCopyStats {
pub views_created: usize,
pub mmap_views_created: usize,
pub total_memory: usize,
pub cache_hit_rate: f64,
pub avg_view_lifetime: std::time::Duration,
}
impl ZeroCopyStats {
pub fn new() -> Self {
Self {
views_created: 0,
mmap_views_created: 0,
total_memory: 0,
cache_hit_rate: 0.0,
avg_view_lifetime: std::time::Duration::ZERO,
}
}
pub fn record_view_creation(&mut self, size: usize) {
self.views_created += 1;
self.total_memory += size;
}
pub fn record_mmap_creation(&mut self, size: usize) {
self.mmap_views_created += 1;
self.total_memory += size;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cache_topology_detection() {
let topology = CacheTopology::detect().expect("operation should succeed");
assert!(topology.l1_cache_size > 0);
assert!(topology.l2_cache_size > 0);
assert!(topology.l3_cache_size > 0);
assert!(topology.cpu_cores > 0);
}
#[test]
fn test_memory_layout() {
let layout = MemoryLayout::new::<i64>().with_cache_alignment();
assert_eq!(layout.element_size, 8);
assert!(layout.cache_aligned);
assert!(layout.alignment >= CACHE_LINE_SIZE);
}
#[test]
fn test_zero_copy_manager() {
let manager = ZeroCopyManager::new().expect("operation should succeed");
let data = vec![1i32, 2, 3, 4, 5];
let view = manager.create_view(data).expect("operation should succeed");
assert_eq!(view.len(), 5);
assert_eq!(view.as_slice(), &[1, 2, 3, 4, 5]);
let stats = manager.stats().expect("operation should succeed");
assert_eq!(stats.views_created, 1);
}
#[test]
fn test_cache_aware_operations() {
let manager = ZeroCopyManager::new().expect("operation should succeed");
let data = (0..1000).collect::<Vec<i32>>();
let view = manager.create_view(data).expect("operation should succeed");
let evens = view.linear_scan(|&x| x % 2 == 0);
assert_eq!(evens.len(), 500);
let block_size = view.optimal_block_size();
assert!(block_size > 0);
}
#[test]
fn test_subview_creation() {
let manager = ZeroCopyManager::new().expect("operation should succeed");
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let view = manager.create_view(data).expect("operation should succeed");
let subview = view.subview(2..7).expect("operation should succeed");
assert_eq!(subview.len(), 5);
assert_eq!(subview.as_slice(), &[3, 4, 5, 6, 7]);
}
}