use anyhow::Result;
use bytes::{BufMut, Bytes, BytesMut};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct ZeroCopyManager {
config: ZeroCopyConfig,
buffer_pool: Arc<RwLock<BufferPool>>,
stats: Arc<RwLock<ZeroCopyStats>>,
}
impl ZeroCopyManager {
pub fn new(config: ZeroCopyConfig) -> Result<Self> {
Ok(Self {
config: config.clone(),
buffer_pool: Arc::new(RwLock::new(BufferPool::new(config.buffer_pool_size))),
stats: Arc::new(RwLock::new(ZeroCopyStats::default())),
})
}
pub async fn create_buffer(&self, size: usize) -> Result<ZeroCopyBuffer> {
let mut stats = self.stats.write().await;
stats.buffers_allocated += 1;
let mut pool = self.buffer_pool.write().await;
if let Some(buf) = pool.acquire(size) {
stats.pool_hits += 1;
drop(pool);
drop(stats);
return Ok(ZeroCopyBuffer::from_bytes(buf));
}
stats.pool_misses += 1;
drop(pool);
drop(stats);
let mut buffer = BytesMut::with_capacity(size);
buffer.resize(size, 0);
Ok(ZeroCopyBuffer::from_bytes_mut(buffer))
}
pub async fn return_buffer(&self, buffer: Bytes) {
let mut pool = self.buffer_pool.write().await;
pool.release(buffer);
let mut stats = self.stats.write().await;
stats.buffers_returned += 1;
}
pub async fn stats(&self) -> ZeroCopyStats {
self.stats.read().await.clone()
}
pub async fn batch_process<F>(&self, buffers: Vec<Bytes>, processor: F) -> Result<Vec<Bytes>>
where
F: Fn(&[u8]) -> Vec<u8>,
{
let mut results = Vec::with_capacity(buffers.len());
for buffer in buffers {
let processed = processor(&buffer);
results.push(Bytes::from(processed));
}
let mut stats = self.stats.write().await;
stats.batch_operations += 1;
stats.total_bytes_processed += results.iter().map(|b| b.len() as u64).sum::<u64>();
Ok(results)
}
pub async fn splice(&self, buffers: Vec<Bytes>) -> Result<SplicedBuffer> {
let total_len = buffers.iter().map(|b| b.len()).sum();
let mut stats = self.stats.write().await;
stats.splice_operations += 1;
stats.bytes_saved += total_len as u64;
Ok(SplicedBuffer {
buffers,
total_length: total_len,
})
}
}
impl Default for ZeroCopyManager {
fn default() -> Self {
Self::new(ZeroCopyConfig::default()).expect("Failed to create zero-copy manager")
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ZeroCopyConfig {
pub enabled: bool,
pub buffer_pool_size: usize,
pub max_pooled_buffer_size: usize,
pub enable_simd: bool,
pub enable_mmap: bool,
pub reuse_threshold: usize,
}
impl Default for ZeroCopyConfig {
fn default() -> Self {
Self {
enabled: true,
buffer_pool_size: 1000,
max_pooled_buffer_size: 1024 * 1024, enable_simd: true,
enable_mmap: false,
reuse_threshold: 512, }
}
}
#[derive(Clone)]
pub struct ZeroCopyBuffer {
data: Arc<BufferData>,
}
enum BufferData {
Owned(BytesMut),
Shared(Bytes),
}
impl ZeroCopyBuffer {
pub fn from_bytes_mut(buf: BytesMut) -> Self {
Self {
data: Arc::new(BufferData::Owned(buf)),
}
}
pub fn from_bytes(buf: Bytes) -> Self {
Self {
data: Arc::new(BufferData::Shared(buf)),
}
}
pub fn share(&self) -> Self {
Self {
data: Arc::clone(&self.data),
}
}
pub fn slice(&self, range: Range<usize>) -> Result<Bytes> {
match &*self.data {
BufferData::Owned(buf) => {
let bytes: Bytes = buf.clone().freeze();
Ok(bytes.slice(range))
}
BufferData::Shared(bytes) => Ok(bytes.slice(range)),
}
}
pub fn len(&self) -> usize {
match &*self.data {
BufferData::Owned(buf) => buf.len(),
BufferData::Shared(bytes) => bytes.len(),
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn as_bytes(&self) -> Bytes {
match &*self.data {
BufferData::Owned(buf) => buf.clone().freeze(),
BufferData::Shared(bytes) => bytes.clone(),
}
}
pub fn ref_count(&self) -> usize {
Arc::strong_count(&self.data)
}
}
pub struct SplicedBuffer {
buffers: Vec<Bytes>,
total_length: usize,
}
impl SplicedBuffer {
pub fn len(&self) -> usize {
self.total_length
}
pub fn is_empty(&self) -> bool {
self.total_length == 0
}
pub fn read_all(&self) -> Bytes {
let mut result = BytesMut::with_capacity(self.total_length);
for buffer in &self.buffers {
result.put_slice(buffer);
}
result.freeze()
}
pub fn segments(&self) -> impl Iterator<Item = &Bytes> {
self.buffers.iter()
}
pub fn segment_count(&self) -> usize {
self.buffers.len()
}
}
struct BufferPool {
buffers: VecDeque<Bytes>,
max_size: usize,
}
impl BufferPool {
fn new(max_size: usize) -> Self {
Self {
buffers: VecDeque::with_capacity(max_size),
max_size,
}
}
fn acquire(&mut self, _size: usize) -> Option<Bytes> {
self.buffers.pop_front()
}
fn release(&mut self, buffer: Bytes) {
if self.buffers.len() < self.max_size {
self.buffers.push_back(buffer);
}
}
fn size(&self) -> usize {
self.buffers.len()
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ZeroCopyStats {
pub buffers_allocated: u64,
pub buffers_returned: u64,
pub pool_hits: u64,
pub pool_misses: u64,
pub total_bytes_processed: u64,
pub bytes_saved: u64,
pub batch_operations: u64,
pub splice_operations: u64,
}
impl ZeroCopyStats {
pub fn pool_hit_rate(&self) -> f64 {
let total_requests = self.pool_hits + self.pool_misses;
if total_requests == 0 {
0.0
} else {
self.pool_hits as f64 / total_requests as f64
}
}
pub fn avg_bytes_saved(&self) -> f64 {
if self.batch_operations + self.splice_operations == 0 {
0.0
} else {
self.bytes_saved as f64 / (self.batch_operations + self.splice_operations) as f64
}
}
}
pub struct SimdBatchProcessor {
chunk_size: usize,
}
impl SimdBatchProcessor {
pub fn new(chunk_size: usize) -> Self {
Self { chunk_size }
}
pub fn process_batch(&self, data: &[u8], operation: SimdOperation) -> Vec<u8> {
match operation {
SimdOperation::Copy => data.to_vec(),
SimdOperation::XorMask(mask) => self.xor_batch(data, mask),
SimdOperation::Sum => self.sum_batch(data),
SimdOperation::Max => self.max_batch(data),
}
}
fn xor_batch(&self, data: &[u8], mask: u8) -> Vec<u8> {
data.iter().map(|&b| b ^ mask).collect()
}
fn sum_batch(&self, data: &[u8]) -> Vec<u8> {
let sum: u64 = data.iter().map(|&b| b as u64).sum();
sum.to_le_bytes().to_vec()
}
fn max_batch(&self, data: &[u8]) -> Vec<u8> {
let max = data.iter().max().copied().unwrap_or(0);
vec![max]
}
}
#[derive(Debug, Clone, Copy)]
pub enum SimdOperation {
Copy,
XorMask(u8),
Sum,
Max,
}
#[cfg(unix)]
pub struct MemoryMappedBuffer {
#[allow(dead_code)]
path: std::path::PathBuf,
size: usize,
}
#[cfg(unix)]
impl MemoryMappedBuffer {
pub fn from_file(_path: &std::path::Path) -> Result<Self> {
Ok(Self {
path: _path.to_path_buf(),
size: 0,
})
}
pub fn size(&self) -> usize {
self.size
}
pub fn slice(&self, _range: Range<usize>) -> Result<&[u8]> {
Ok(&[])
}
}
pub struct SharedRefBuffer<T> {
data: Arc<T>,
}
impl<T> SharedRefBuffer<T> {
pub fn new(data: T) -> Self {
Self {
data: Arc::new(data),
}
}
pub fn share(&self) -> Self {
Self {
data: Arc::clone(&self.data),
}
}
pub fn ref_count(&self) -> usize {
Arc::strong_count(&self.data)
}
pub fn get(&self) -> &T {
&self.data
}
}
impl<T> Clone for SharedRefBuffer<T> {
fn clone(&self) -> Self {
self.share()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_zero_copy_buffer_creation() {
let manager = ZeroCopyManager::default();
let buffer = manager.create_buffer(1024).await.unwrap();
assert_eq!(buffer.len(), 1024);
assert!(!buffer.is_empty());
}
#[tokio::test]
async fn test_buffer_sharing() {
let manager = ZeroCopyManager::default();
let buffer = manager.create_buffer(100).await.unwrap();
let shared1 = buffer.share();
let shared2 = buffer.share();
assert_eq!(buffer.ref_count(), shared1.ref_count());
assert_eq!(shared1.ref_count(), shared2.ref_count());
}
#[tokio::test]
async fn test_zero_copy_slicing() {
let _manager = ZeroCopyManager::default();
let mut buffer = BytesMut::with_capacity(100);
buffer.extend_from_slice(b"Hello, World!");
let zc_buffer = ZeroCopyBuffer::from_bytes_mut(buffer);
let slice = zc_buffer.slice(0..5).unwrap();
assert_eq!(&slice[..], b"Hello");
}
#[tokio::test]
async fn test_buffer_pool() {
let config = ZeroCopyConfig {
buffer_pool_size: 10,
..Default::default()
};
let manager = ZeroCopyManager::new(config).unwrap();
let buffer = manager.create_buffer(512).await.unwrap();
let bytes = buffer.as_bytes();
manager.return_buffer(bytes.clone()).await;
let stats_before = manager.stats().await;
let _buffer2 = manager.create_buffer(512).await.unwrap();
let stats_after = manager.stats().await;
assert!(stats_after.pool_hits > stats_before.pool_hits);
}
#[tokio::test]
async fn test_splice_buffers() {
let manager = ZeroCopyManager::default();
let buf1 = Bytes::from("Hello, ");
let buf2 = Bytes::from("World!");
let spliced = manager.splice(vec![buf1, buf2]).await.unwrap();
assert_eq!(spliced.len(), 13);
assert_eq!(spliced.segment_count(), 2);
let combined = spliced.read_all();
assert_eq!(&combined[..], b"Hello, World!");
}
#[tokio::test]
async fn test_batch_processing() {
let manager = ZeroCopyManager::default();
let buffers = vec![
Bytes::from("data1"),
Bytes::from("data2"),
Bytes::from("data3"),
];
let results = manager
.batch_process(buffers, |data| data.to_vec())
.await
.unwrap();
assert_eq!(results.len(), 3);
assert_eq!(&results[0][..], b"data1");
assert_eq!(&results[1][..], b"data2");
assert_eq!(&results[2][..], b"data3");
}
#[tokio::test]
async fn test_simd_batch_processor() {
let processor = SimdBatchProcessor::new(64);
let data = vec![1u8, 2, 3, 4, 5];
let xor_result = processor.process_batch(&data, SimdOperation::XorMask(0xFF));
assert_eq!(xor_result, vec![254, 253, 252, 251, 250]);
let max_result = processor.process_batch(&data, SimdOperation::Max);
assert_eq!(max_result, vec![5]);
}
#[tokio::test]
async fn test_shared_ref_buffer() {
let data = vec![1, 2, 3, 4, 5];
let buffer = SharedRefBuffer::new(data);
let shared1 = buffer.share();
let shared2 = buffer.share();
assert_eq!(buffer.ref_count(), 3); assert_eq!(shared1.get(), &vec![1, 2, 3, 4, 5]);
assert_eq!(shared2.get(), &vec![1, 2, 3, 4, 5]);
}
#[tokio::test]
async fn test_pool_hit_rate() {
let manager = ZeroCopyManager::default();
let buf1 = manager.create_buffer(512).await.unwrap();
manager.return_buffer(buf1.as_bytes()).await;
let _buf2 = manager.create_buffer(512).await.unwrap();
let stats = manager.stats().await;
assert!(stats.pool_hit_rate() > 0.0);
}
#[tokio::test]
async fn test_zero_copy_stats() {
let manager = ZeroCopyManager::default();
let _buf1 = manager.create_buffer(100).await.unwrap();
let _buf2 = manager.create_buffer(200).await.unwrap();
let stats = manager.stats().await;
assert_eq!(stats.buffers_allocated, 2);
}
}