1use anyhow::Result;
41use bytes::{BufMut, Bytes, BytesMut};
42use serde::{Deserialize, Serialize};
43use std::collections::VecDeque;
44use std::ops::Range;
45use std::sync::Arc;
46use tokio::sync::RwLock;
47
48pub struct ZeroCopyManager {
50 config: ZeroCopyConfig,
51 buffer_pool: Arc<RwLock<BufferPool>>,
52 stats: Arc<RwLock<ZeroCopyStats>>,
53}
54
55impl ZeroCopyManager {
56 pub fn new(config: ZeroCopyConfig) -> Result<Self> {
58 Ok(Self {
59 config: config.clone(),
60 buffer_pool: Arc::new(RwLock::new(BufferPool::new(config.buffer_pool_size))),
61 stats: Arc::new(RwLock::new(ZeroCopyStats::default())),
62 })
63 }
64
65 pub async fn create_buffer(&self, size: usize) -> Result<ZeroCopyBuffer> {
67 let mut stats = self.stats.write().await;
68 stats.buffers_allocated += 1;
69
70 let mut pool = self.buffer_pool.write().await;
72 if let Some(buf) = pool.acquire(size) {
73 stats.pool_hits += 1;
74 drop(pool);
75 drop(stats);
76 return Ok(ZeroCopyBuffer::from_bytes(buf));
77 }
78
79 stats.pool_misses += 1;
80 drop(pool);
81 drop(stats);
82
83 let mut buffer = BytesMut::with_capacity(size);
85 buffer.resize(size, 0);
86 Ok(ZeroCopyBuffer::from_bytes_mut(buffer))
87 }
88
89 pub async fn return_buffer(&self, buffer: Bytes) {
91 let mut pool = self.buffer_pool.write().await;
92 pool.release(buffer);
93
94 let mut stats = self.stats.write().await;
95 stats.buffers_returned += 1;
96 }
97
98 pub async fn stats(&self) -> ZeroCopyStats {
100 self.stats.read().await.clone()
101 }
102
103 pub async fn batch_process<F>(&self, buffers: Vec<Bytes>, processor: F) -> Result<Vec<Bytes>>
105 where
106 F: Fn(&[u8]) -> Vec<u8>,
107 {
108 let mut results = Vec::with_capacity(buffers.len());
109
110 for buffer in buffers {
112 let processed = processor(&buffer);
113 results.push(Bytes::from(processed));
114 }
115
116 let mut stats = self.stats.write().await;
117 stats.batch_operations += 1;
118 stats.total_bytes_processed += results.iter().map(|b| b.len() as u64).sum::<u64>();
119
120 Ok(results)
121 }
122
123 pub async fn splice(&self, buffers: Vec<Bytes>) -> Result<SplicedBuffer> {
125 let total_len = buffers.iter().map(|b| b.len()).sum();
126
127 let mut stats = self.stats.write().await;
128 stats.splice_operations += 1;
129 stats.bytes_saved += total_len as u64; Ok(SplicedBuffer {
132 buffers,
133 total_length: total_len,
134 })
135 }
136}
137
138impl Default for ZeroCopyManager {
139 fn default() -> Self {
140 Self::new(ZeroCopyConfig::default()).expect("Failed to create zero-copy manager")
141 }
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct ZeroCopyConfig {
147 pub enabled: bool,
149
150 pub buffer_pool_size: usize,
152
153 pub max_pooled_buffer_size: usize,
155
156 pub enable_simd: bool,
158
159 pub enable_mmap: bool,
161
162 pub reuse_threshold: usize,
164}
165
166impl Default for ZeroCopyConfig {
167 fn default() -> Self {
168 Self {
169 enabled: true,
170 buffer_pool_size: 1000,
171 max_pooled_buffer_size: 1024 * 1024, enable_simd: true,
173 enable_mmap: false,
174 reuse_threshold: 512, }
176 }
177}
178
179#[derive(Clone)]
181pub struct ZeroCopyBuffer {
182 data: Arc<BufferData>,
183}
184
185enum BufferData {
186 Owned(BytesMut),
187 Shared(Bytes),
188}
189
190impl ZeroCopyBuffer {
191 pub fn from_bytes_mut(buf: BytesMut) -> Self {
193 Self {
194 data: Arc::new(BufferData::Owned(buf)),
195 }
196 }
197
198 pub fn from_bytes(buf: Bytes) -> Self {
200 Self {
201 data: Arc::new(BufferData::Shared(buf)),
202 }
203 }
204
205 pub fn share(&self) -> Self {
207 Self {
208 data: Arc::clone(&self.data),
209 }
210 }
211
212 pub fn slice(&self, range: Range<usize>) -> Result<Bytes> {
214 match &*self.data {
215 BufferData::Owned(buf) => {
216 let bytes: Bytes = buf.clone().freeze();
217 Ok(bytes.slice(range))
218 }
219 BufferData::Shared(bytes) => Ok(bytes.slice(range)),
220 }
221 }
222
223 pub fn len(&self) -> usize {
225 match &*self.data {
226 BufferData::Owned(buf) => buf.len(),
227 BufferData::Shared(bytes) => bytes.len(),
228 }
229 }
230
231 pub fn is_empty(&self) -> bool {
233 self.len() == 0
234 }
235
236 pub fn as_bytes(&self) -> Bytes {
238 match &*self.data {
239 BufferData::Owned(buf) => buf.clone().freeze(),
240 BufferData::Shared(bytes) => bytes.clone(),
241 }
242 }
243
244 pub fn ref_count(&self) -> usize {
246 Arc::strong_count(&self.data)
247 }
248}
249
250pub struct SplicedBuffer {
252 buffers: Vec<Bytes>,
253 total_length: usize,
254}
255
256impl SplicedBuffer {
257 pub fn len(&self) -> usize {
259 self.total_length
260 }
261
262 pub fn is_empty(&self) -> bool {
264 self.total_length == 0
265 }
266
267 pub fn read_all(&self) -> Bytes {
269 let mut result = BytesMut::with_capacity(self.total_length);
270 for buffer in &self.buffers {
271 result.put_slice(buffer);
272 }
273 result.freeze()
274 }
275
276 pub fn segments(&self) -> impl Iterator<Item = &Bytes> {
278 self.buffers.iter()
279 }
280
281 pub fn segment_count(&self) -> usize {
283 self.buffers.len()
284 }
285}
286
287struct BufferPool {
289 buffers: VecDeque<Bytes>,
290 max_size: usize,
291}
292
293impl BufferPool {
294 fn new(max_size: usize) -> Self {
295 Self {
296 buffers: VecDeque::with_capacity(max_size),
297 max_size,
298 }
299 }
300
301 fn acquire(&mut self, _size: usize) -> Option<Bytes> {
302 self.buffers.pop_front()
303 }
304
305 fn release(&mut self, buffer: Bytes) {
306 if self.buffers.len() < self.max_size {
307 self.buffers.push_back(buffer);
308 }
309 }
311
312 fn size(&self) -> usize {
313 self.buffers.len()
314 }
315}
316
317#[derive(Debug, Clone, Default, Serialize, Deserialize)]
319pub struct ZeroCopyStats {
320 pub buffers_allocated: u64,
322
323 pub buffers_returned: u64,
325
326 pub pool_hits: u64,
328
329 pub pool_misses: u64,
331
332 pub total_bytes_processed: u64,
334
335 pub bytes_saved: u64,
337
338 pub batch_operations: u64,
340
341 pub splice_operations: u64,
343}
344
345impl ZeroCopyStats {
346 pub fn pool_hit_rate(&self) -> f64 {
348 let total_requests = self.pool_hits + self.pool_misses;
349 if total_requests == 0 {
350 0.0
351 } else {
352 self.pool_hits as f64 / total_requests as f64
353 }
354 }
355
356 pub fn avg_bytes_saved(&self) -> f64 {
358 if self.batch_operations + self.splice_operations == 0 {
359 0.0
360 } else {
361 self.bytes_saved as f64 / (self.batch_operations + self.splice_operations) as f64
362 }
363 }
364}
365
366pub struct SimdBatchProcessor {
368 chunk_size: usize,
369}
370
371impl SimdBatchProcessor {
372 pub fn new(chunk_size: usize) -> Self {
374 Self { chunk_size }
375 }
376
377 pub fn process_batch(&self, data: &[u8], operation: SimdOperation) -> Vec<u8> {
379 match operation {
380 SimdOperation::Copy => data.to_vec(),
381 SimdOperation::XorMask(mask) => self.xor_batch(data, mask),
382 SimdOperation::Sum => self.sum_batch(data),
383 SimdOperation::Max => self.max_batch(data),
384 }
385 }
386
387 fn xor_batch(&self, data: &[u8], mask: u8) -> Vec<u8> {
388 data.iter().map(|&b| b ^ mask).collect()
390 }
391
392 fn sum_batch(&self, data: &[u8]) -> Vec<u8> {
393 let sum: u64 = data.iter().map(|&b| b as u64).sum();
394 sum.to_le_bytes().to_vec()
395 }
396
397 fn max_batch(&self, data: &[u8]) -> Vec<u8> {
398 let max = data.iter().max().copied().unwrap_or(0);
399 vec![max]
400 }
401}
402
403#[derive(Debug, Clone, Copy)]
405pub enum SimdOperation {
406 Copy,
408 XorMask(u8),
410 Sum,
412 Max,
414}
415
416#[cfg(unix)]
418pub struct MemoryMappedBuffer {
419 #[allow(dead_code)]
420 path: std::path::PathBuf,
421 size: usize,
422}
423
424#[cfg(unix)]
425impl MemoryMappedBuffer {
426 pub fn from_file(_path: &std::path::Path) -> Result<Self> {
428 Ok(Self {
431 path: _path.to_path_buf(),
432 size: 0,
433 })
434 }
435
436 pub fn size(&self) -> usize {
438 self.size
439 }
440
441 pub fn slice(&self, _range: Range<usize>) -> Result<&[u8]> {
443 Ok(&[])
445 }
446}
447
448pub struct SharedRefBuffer<T> {
450 data: Arc<T>,
451}
452
453impl<T> SharedRefBuffer<T> {
454 pub fn new(data: T) -> Self {
456 Self {
457 data: Arc::new(data),
458 }
459 }
460
461 pub fn share(&self) -> Self {
463 Self {
464 data: Arc::clone(&self.data),
465 }
466 }
467
468 pub fn ref_count(&self) -> usize {
470 Arc::strong_count(&self.data)
471 }
472
473 pub fn get(&self) -> &T {
475 &self.data
476 }
477}
478
479impl<T> Clone for SharedRefBuffer<T> {
480 fn clone(&self) -> Self {
481 self.share()
482 }
483}
484
485#[cfg(test)]
486mod tests {
487 use super::*;
488
489 #[tokio::test]
490 async fn test_zero_copy_buffer_creation() {
491 let manager = ZeroCopyManager::default();
492 let buffer = manager.create_buffer(1024).await.unwrap();
493
494 assert_eq!(buffer.len(), 1024);
495 assert!(!buffer.is_empty());
496 }
497
498 #[tokio::test]
499 async fn test_buffer_sharing() {
500 let manager = ZeroCopyManager::default();
501 let buffer = manager.create_buffer(100).await.unwrap();
502
503 let shared1 = buffer.share();
504 let shared2 = buffer.share();
505
506 assert_eq!(buffer.ref_count(), shared1.ref_count());
508 assert_eq!(shared1.ref_count(), shared2.ref_count());
509 }
510
511 #[tokio::test]
512 async fn test_zero_copy_slicing() {
513 let _manager = ZeroCopyManager::default();
514 let mut buffer = BytesMut::with_capacity(100);
515 buffer.extend_from_slice(b"Hello, World!");
516
517 let zc_buffer = ZeroCopyBuffer::from_bytes_mut(buffer);
518 let slice = zc_buffer.slice(0..5).unwrap();
519
520 assert_eq!(&slice[..], b"Hello");
521 }
522
523 #[tokio::test]
524 async fn test_buffer_pool() {
525 let config = ZeroCopyConfig {
526 buffer_pool_size: 10,
527 ..Default::default()
528 };
529
530 let manager = ZeroCopyManager::new(config).unwrap();
531
532 let buffer = manager.create_buffer(512).await.unwrap();
534 let bytes = buffer.as_bytes();
535
536 manager.return_buffer(bytes.clone()).await;
538
539 let stats_before = manager.stats().await;
541 let _buffer2 = manager.create_buffer(512).await.unwrap();
542 let stats_after = manager.stats().await;
543
544 assert!(stats_after.pool_hits > stats_before.pool_hits);
545 }
546
547 #[tokio::test]
548 async fn test_splice_buffers() {
549 let manager = ZeroCopyManager::default();
550
551 let buf1 = Bytes::from("Hello, ");
552 let buf2 = Bytes::from("World!");
553
554 let spliced = manager.splice(vec![buf1, buf2]).await.unwrap();
555
556 assert_eq!(spliced.len(), 13);
557 assert_eq!(spliced.segment_count(), 2);
558
559 let combined = spliced.read_all();
560 assert_eq!(&combined[..], b"Hello, World!");
561 }
562
563 #[tokio::test]
564 async fn test_batch_processing() {
565 let manager = ZeroCopyManager::default();
566
567 let buffers = vec![
568 Bytes::from("data1"),
569 Bytes::from("data2"),
570 Bytes::from("data3"),
571 ];
572
573 let results = manager
574 .batch_process(buffers, |data| data.to_vec())
575 .await
576 .unwrap();
577
578 assert_eq!(results.len(), 3);
579 assert_eq!(&results[0][..], b"data1");
580 assert_eq!(&results[1][..], b"data2");
581 assert_eq!(&results[2][..], b"data3");
582 }
583
584 #[tokio::test]
585 async fn test_simd_batch_processor() {
586 let processor = SimdBatchProcessor::new(64);
587
588 let data = vec![1u8, 2, 3, 4, 5];
589
590 let xor_result = processor.process_batch(&data, SimdOperation::XorMask(0xFF));
591 assert_eq!(xor_result, vec![254, 253, 252, 251, 250]);
592
593 let max_result = processor.process_batch(&data, SimdOperation::Max);
594 assert_eq!(max_result, vec![5]);
595 }
596
597 #[tokio::test]
598 async fn test_shared_ref_buffer() {
599 let data = vec![1, 2, 3, 4, 5];
600 let buffer = SharedRefBuffer::new(data);
601
602 let shared1 = buffer.share();
603 let shared2 = buffer.share();
604
605 assert_eq!(buffer.ref_count(), 3); assert_eq!(shared1.get(), &vec![1, 2, 3, 4, 5]);
607 assert_eq!(shared2.get(), &vec![1, 2, 3, 4, 5]);
608 }
609
610 #[tokio::test]
611 async fn test_pool_hit_rate() {
612 let manager = ZeroCopyManager::default();
613
614 let buf1 = manager.create_buffer(512).await.unwrap();
616 manager.return_buffer(buf1.as_bytes()).await;
617
618 let _buf2 = manager.create_buffer(512).await.unwrap();
620
621 let stats = manager.stats().await;
622 assert!(stats.pool_hit_rate() > 0.0);
623 }
624
625 #[tokio::test]
626 async fn test_zero_copy_stats() {
627 let manager = ZeroCopyManager::default();
628
629 let _buf1 = manager.create_buffer(100).await.unwrap();
630 let _buf2 = manager.create_buffer(200).await.unwrap();
631
632 let stats = manager.stats().await;
633 assert_eq!(stats.buffers_allocated, 2);
634 }
635}