use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::ptr::NonNull;
use std::slice;
use memmap2::{MmapMut, MmapOptions};
use anyhow::{Result, Context};
use crossbeam_utils::CachePadded;
pub struct ZeroCopyMemoryManager {
shared_pools: Vec<Arc<SharedMemoryPool>>,
mmap_buffers: Vec<Arc<MemoryMappedBuffer>>,
dma_manager: Arc<DirectMemoryAccessManager>,
stats: Arc<ZeroCopyStats>,
}
pub struct SharedMemoryPool {
memory_region: MmapMut,
free_blocks: Vec<AtomicU64>,
block_size: usize,
total_blocks: usize,
allocator_head: CachePadded<AtomicUsize>,
pool_id: u32,
}
impl SharedMemoryPool {
pub fn new(pool_id: u32, total_size: usize, block_size: usize) -> Result<Self> {
let aligned_block_size = (block_size + 63) & !63;
let total_blocks = total_size / aligned_block_size;
let memory_region = MmapOptions::new()
.len(total_blocks * aligned_block_size)
.map_anon()
.context("Failed to create memory mapped region")?;
let bitmap_size = (total_blocks + 63) / 64;
let mut free_blocks = Vec::with_capacity(bitmap_size);
for i in 0..bitmap_size {
let bits = if i == bitmap_size - 1 && total_blocks % 64 != 0 {
let valid_bits = total_blocks % 64;
(1u64 << valid_bits) - 1
} else {
u64::MAX };
free_blocks.push(AtomicU64::new(bits));
}
log::info!("🚀 Created shared memory pool {} with {} blocks of {} bytes each",
pool_id, total_blocks, aligned_block_size);
Ok(Self {
memory_region,
free_blocks,
block_size: aligned_block_size,
total_blocks,
allocator_head: CachePadded::new(AtomicUsize::new(0)),
pool_id,
})
}
#[inline(always)]
pub fn allocate_block(&self) -> Option<ZeroCopyBlock> {
let start_index = self.allocator_head.load(Ordering::Relaxed) / 64;
for attempt in 0..self.free_blocks.len() {
let bitmap_index = (start_index + attempt) % self.free_blocks.len();
let bitmap = &self.free_blocks[bitmap_index];
let mut current = bitmap.load(Ordering::Acquire);
while current != 0 {
let bit_pos = current.trailing_zeros() as usize;
let mask = 1u64 << bit_pos;
match bitmap.compare_exchange_weak(
current,
current & !mask,
Ordering::AcqRel,
Ordering::Relaxed
) {
Ok(_) => {
let block_index = bitmap_index * 64 + bit_pos;
if block_index >= self.total_blocks {
bitmap.fetch_or(mask, Ordering::Relaxed);
break;
}
let offset = block_index * self.block_size;
let ptr = unsafe {
NonNull::new_unchecked(
self.memory_region.as_ptr().add(offset) as *mut u8
)
};
self.allocator_head.store(
(block_index + 1) * 64,
Ordering::Relaxed
);
return Some(ZeroCopyBlock {
ptr,
size: self.block_size,
pool_id: self.pool_id,
block_index,
});
}
Err(new_current) => {
current = new_current;
continue;
}
}
}
}
None }
#[inline(always)]
pub fn deallocate_block(&self, block: ZeroCopyBlock) {
if block.pool_id != self.pool_id {
log::error!("Attempting to deallocate block from wrong pool");
return;
}
let bitmap_index = block.block_index / 64;
let bit_pos = block.block_index % 64;
let mask = 1u64 << bit_pos;
if bitmap_index < self.free_blocks.len() {
self.free_blocks[bitmap_index].fetch_or(mask, Ordering::Release);
}
}
pub fn available_blocks(&self) -> usize {
self.free_blocks.iter()
.map(|bitmap| bitmap.load(Ordering::Relaxed).count_ones() as usize)
.sum()
}
}
pub struct ZeroCopyBlock {
ptr: NonNull<u8>,
size: usize,
pool_id: u32,
block_index: usize,
}
impl ZeroCopyBlock {
#[inline(always)]
pub fn as_ptr(&self) -> *mut u8 {
self.ptr.as_ptr()
}
#[inline(always)]
pub unsafe fn as_slice(&self) -> &[u8] {
slice::from_raw_parts(self.ptr.as_ptr(), self.size)
}
#[inline(always)]
pub unsafe fn as_mut_slice(&mut self) -> &mut [u8] {
slice::from_raw_parts_mut(self.ptr.as_ptr(), self.size)
}
#[inline(always)]
pub fn size(&self) -> usize {
self.size
}
#[inline(always)]
pub unsafe fn write_bytes(&mut self, data: &[u8]) -> Result<()> {
if data.len() > self.size {
return Err(anyhow::anyhow!("Data too large for block"));
}
super::hardware_optimizations::SIMDMemoryOps::memcpy_simd_optimized(
self.ptr.as_ptr(),
data.as_ptr(),
data.len()
);
Ok(())
}
#[inline(always)]
pub unsafe fn read_bytes(&self, len: usize) -> Result<&[u8]> {
if len > self.size {
return Err(anyhow::anyhow!("Read length exceeds block size"));
}
Ok(slice::from_raw_parts(self.ptr.as_ptr(), len))
}
}
unsafe impl Send for ZeroCopyBlock {}
unsafe impl Sync for ZeroCopyBlock {}
pub struct MemoryMappedBuffer {
mmap: MmapMut,
read_pos: CachePadded<AtomicUsize>,
write_pos: CachePadded<AtomicUsize>,
size: usize,
buffer_id: u64,
}
impl MemoryMappedBuffer {
pub fn new(buffer_id: u64, size: usize) -> Result<Self> {
let mmap = MmapOptions::new()
.len(size)
.map_anon()
.context("Failed to create memory mapped buffer")?;
log::info!("🚀 Created memory mapped buffer {} with size {} bytes", buffer_id, size);
Ok(Self {
mmap,
read_pos: CachePadded::new(AtomicUsize::new(0)),
write_pos: CachePadded::new(AtomicUsize::new(0)),
size,
buffer_id,
})
}
#[inline(always)]
pub fn write_data(&self, data: &[u8]) -> Result<usize> {
let data_len = data.len();
let current_write = self.write_pos.load(Ordering::Relaxed);
let current_read = self.read_pos.load(Ordering::Acquire);
let available_space = if current_write >= current_read {
self.size - (current_write - current_read) - 1
} else {
current_read - current_write - 1
};
if data_len > available_space {
return Err(anyhow::anyhow!("Insufficient buffer space"));
}
unsafe {
let write_ptr = self.mmap.as_ptr().add(current_write) as *mut u8;
if current_write + data_len <= self.size {
super::hardware_optimizations::SIMDMemoryOps::memcpy_simd_optimized(
write_ptr, data.as_ptr(), data_len
);
} else {
let first_part = self.size - current_write;
let second_part = data_len - first_part;
super::hardware_optimizations::SIMDMemoryOps::memcpy_simd_optimized(
write_ptr, data.as_ptr(), first_part
);
super::hardware_optimizations::SIMDMemoryOps::memcpy_simd_optimized(
self.mmap.as_ptr() as *mut u8,
data.as_ptr().add(first_part),
second_part
);
}
}
let new_write_pos = (current_write + data_len) % self.size;
self.write_pos.store(new_write_pos, Ordering::Release);
Ok(data_len)
}
#[inline(always)]
pub fn read_data(&self, buffer: &mut [u8]) -> Result<usize> {
let buffer_len = buffer.len();
let current_read = self.read_pos.load(Ordering::Relaxed);
let current_write = self.write_pos.load(Ordering::Acquire);
let available_data = if current_write >= current_read {
current_write - current_read
} else {
self.size - (current_read - current_write)
};
if available_data == 0 {
return Ok(0); }
let read_len = buffer_len.min(available_data);
unsafe {
let read_ptr = self.mmap.as_ptr().add(current_read);
if current_read + read_len <= self.size {
super::hardware_optimizations::SIMDMemoryOps::memcpy_simd_optimized(
buffer.as_mut_ptr(), read_ptr, read_len
);
} else {
let first_part = self.size - current_read;
let second_part = read_len - first_part;
super::hardware_optimizations::SIMDMemoryOps::memcpy_simd_optimized(
buffer.as_mut_ptr(), read_ptr, first_part
);
super::hardware_optimizations::SIMDMemoryOps::memcpy_simd_optimized(
buffer.as_mut_ptr().add(first_part),
self.mmap.as_ptr(),
second_part
);
}
}
let new_read_pos = (current_read + read_len) % self.size;
self.read_pos.store(new_read_pos, Ordering::Release);
Ok(read_len)
}
#[inline(always)]
pub fn available_data(&self) -> usize {
let current_read = self.read_pos.load(Ordering::Relaxed);
let current_write = self.write_pos.load(Ordering::Relaxed);
if current_write >= current_read {
current_write - current_read
} else {
self.size - (current_read - current_write)
}
}
#[inline(always)]
pub fn available_space(&self) -> usize {
self.size - self.available_data() - 1
}
}
pub struct DirectMemoryAccessManager {
dma_channels: Vec<Arc<DMAChannel>>,
channel_allocator: AtomicUsize,
dma_stats: Arc<DMAStats>,
}
impl DirectMemoryAccessManager {
pub fn new(num_channels: usize) -> Result<Self> {
let mut dma_channels = Vec::with_capacity(num_channels);
for i in 0..num_channels {
dma_channels.push(Arc::new(DMAChannel::new(i)?));
}
log::info!("🚀 Created DMA manager with {} channels", num_channels);
Ok(Self {
dma_channels,
channel_allocator: AtomicUsize::new(0),
dma_stats: Arc::new(DMAStats::new()),
})
}
#[inline(always)]
pub async fn dma_transfer(&self, src: &[u8], dst: &mut [u8]) -> Result<usize> {
if src.len() != dst.len() {
return Err(anyhow::anyhow!("Source and destination sizes don't match"));
}
let channel_index = self.channel_allocator.fetch_add(1, Ordering::Relaxed) % self.dma_channels.len();
let channel = &self.dma_channels[channel_index];
let transferred = channel.transfer(src, dst).await?;
self.dma_stats.bytes_transferred.fetch_add(transferred as u64, Ordering::Relaxed);
self.dma_stats.transfers_completed.fetch_add(1, Ordering::Relaxed);
Ok(transferred)
}
}
pub struct DMAChannel {
channel_id: usize,
transfer_queue: crossbeam_queue::ArrayQueue<DMATransfer>,
status: AtomicU64,
}
impl DMAChannel {
pub fn new(channel_id: usize) -> Result<Self> {
Ok(Self {
channel_id,
transfer_queue: crossbeam_queue::ArrayQueue::new(1024),
status: AtomicU64::new(0),
})
}
#[inline(always)]
pub async fn transfer(&self, src: &[u8], dst: &mut [u8]) -> Result<usize> {
let transfer_size = src.len();
unsafe {
super::hardware_optimizations::SIMDMemoryOps::memcpy_simd_optimized(
dst.as_mut_ptr(),
src.as_ptr(),
transfer_size
);
}
Ok(transfer_size)
}
}
#[derive(Debug)]
pub struct DMATransfer {
pub src_addr: usize,
pub dst_addr: usize,
pub size: usize,
pub flags: u32,
}
pub struct DMAStats {
pub bytes_transferred: AtomicU64,
pub transfers_completed: AtomicU64,
pub transfer_errors: AtomicU64,
}
impl DMAStats {
pub fn new() -> Self {
Self {
bytes_transferred: AtomicU64::new(0),
transfers_completed: AtomicU64::new(0),
transfer_errors: AtomicU64::new(0),
}
}
}
pub struct ZeroCopyStats {
pub blocks_allocated: AtomicU64,
pub blocks_freed: AtomicU64,
pub bytes_transferred: AtomicU64,
pub mmap_buffer_usage: AtomicU64,
}
impl ZeroCopyStats {
pub fn new() -> Self {
Self {
blocks_allocated: AtomicU64::new(0),
blocks_freed: AtomicU64::new(0),
bytes_transferred: AtomicU64::new(0),
mmap_buffer_usage: AtomicU64::new(0),
}
}
pub fn print_stats(&self) {
let allocated = self.blocks_allocated.load(Ordering::Relaxed);
let freed = self.blocks_freed.load(Ordering::Relaxed);
let bytes = self.bytes_transferred.load(Ordering::Relaxed);
let mmap_usage = self.mmap_buffer_usage.load(Ordering::Relaxed);
log::info!("🚀 Zero-Copy Stats:");
log::info!(" 📦 Blocks: Allocated={}, Freed={}, Active={}",
allocated, freed, allocated.saturating_sub(freed));
log::info!(" 📊 Bytes Transferred: {} ({:.2} MB)",
bytes, bytes as f64 / 1024.0 / 1024.0);
log::info!(" 💾 Memory Mapped Usage: {} ({:.2} MB)",
mmap_usage, mmap_usage as f64 / 1024.0 / 1024.0);
}
}
impl ZeroCopyMemoryManager {
pub fn new() -> Result<Self> {
let mut shared_pools = Vec::new();
let mut mmap_buffers = Vec::new();
shared_pools.push(Arc::new(SharedMemoryPool::new(0, 1024 * 1024 * 1024, 64 * 1024)?));
shared_pools.push(Arc::new(SharedMemoryPool::new(1, 4 * 1024 * 1024 * 1024, 1024 * 1024)?));
shared_pools.push(Arc::new(SharedMemoryPool::new(2, 8 * 1024 * 1024 * 1024, 16 * 1024 * 1024)?));
for i in 0..8 {
mmap_buffers.push(Arc::new(MemoryMappedBuffer::new(i, 256 * 1024 * 1024)?)); }
let dma_manager = Arc::new(DirectMemoryAccessManager::new(16)?); let stats = Arc::new(ZeroCopyStats::new());
log::info!("🚀 Zero-Copy Memory Manager initialized");
log::info!(" 📦 Memory Pools: {}", shared_pools.len());
log::info!(" 💾 Mapped Buffers: {}", mmap_buffers.len());
log::info!(" 🔄 DMA Channels: 16");
Ok(Self {
shared_pools,
mmap_buffers,
dma_manager,
stats,
})
}
#[inline(always)]
pub fn allocate(&self, size: usize) -> Option<ZeroCopyBlock> {
let pool = if size <= 64 * 1024 {
&self.shared_pools[0] } else if size <= 1024 * 1024 {
&self.shared_pools[1] } else {
&self.shared_pools[2] };
if let Some(block) = pool.allocate_block() {
self.stats.blocks_allocated.fetch_add(1, Ordering::Relaxed);
Some(block)
} else {
None
}
}
#[inline(always)]
pub fn deallocate(&self, block: ZeroCopyBlock) {
let pool_id = block.pool_id as usize;
if pool_id < self.shared_pools.len() {
self.shared_pools[pool_id].deallocate_block(block);
self.stats.blocks_freed.fetch_add(1, Ordering::Relaxed);
}
}
#[inline(always)]
pub fn get_mmap_buffer(&self, buffer_id: usize) -> Option<Arc<MemoryMappedBuffer>> {
self.mmap_buffers.get(buffer_id).cloned()
}
#[inline(always)]
pub fn get_dma_manager(&self) -> Arc<DirectMemoryAccessManager> {
self.dma_manager.clone()
}
pub fn get_stats(&self) -> Arc<ZeroCopyStats> {
self.stats.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_shared_memory_pool() -> Result<()> {
let pool = SharedMemoryPool::new(0, 1024 * 1024, 4096)?;
let block1 = pool.allocate_block().expect("Should allocate block");
assert_eq!(block1.size(), 4096);
let block2 = pool.allocate_block().expect("Should allocate another block");
assert_eq!(block2.size(), 4096);
pool.deallocate_block(block1);
pool.deallocate_block(block2);
Ok(())
}
#[tokio::test]
async fn test_memory_mapped_buffer() -> Result<()> {
let buffer = MemoryMappedBuffer::new(0, 1024 * 1024)?;
let test_data = b"Hello, Zero-Copy World!";
let written = buffer.write_data(test_data)?;
assert_eq!(written, test_data.len());
let mut read_buffer = vec![0u8; test_data.len()];
let read = buffer.read_data(&mut read_buffer)?;
assert_eq!(read, test_data.len());
assert_eq!(&read_buffer, test_data);
Ok(())
}
#[tokio::test]
async fn test_dma_transfer() -> Result<()> {
let dma_manager = DirectMemoryAccessManager::new(4)?;
let src = vec![1u8, 2, 3, 4, 5, 6, 7, 8];
let mut dst = vec![0u8; 8];
let transferred = dma_manager.dma_transfer(&src, &mut dst).await?;
assert_eq!(transferred, 8);
assert_eq!(src, dst);
Ok(())
}
#[tokio::test]
async fn test_zero_copy_manager() -> Result<()> {
let manager = ZeroCopyMemoryManager::new()?;
let small_block = manager.allocate(1024).expect("Should allocate small block");
assert_eq!(small_block.size(), 65536);
let large_block = manager.allocate(5 * 1024 * 1024).expect("Should allocate large block");
assert_eq!(large_block.size(), 16 * 1024 * 1024);
manager.deallocate(small_block);
manager.deallocate(large_block);
Ok(())
}
}