1use anyhow::{anyhow, Result};
10use bytes::{Bytes, BytesMut};
11use std::ops::Range;
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::Arc;
14use std::collections::VecDeque;
15use tokio::sync::{Mutex, RwLock};
16use tracing::{debug, warn};
17
18#[derive(Debug)]
20pub struct ZeroCopyBuffer {
21 data: Bytes,
22 size: usize,
23 ref_count: Arc<AtomicUsize>,
24 id: BufferId,
25}
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
29pub struct BufferId(u64);
30
31impl BufferId {
32 fn new() -> Self {
33 static COUNTER: AtomicUsize = AtomicUsize::new(1);
34 Self(COUNTER.fetch_add(1, Ordering::Relaxed) as u64)
35 }
36}
37
38impl ZeroCopyBuffer {
39 pub fn new(capacity: usize) -> Self {
41 let data = BytesMut::with_capacity(capacity).freeze();
42
43 Self {
44 data,
45 size: 0,
46 ref_count: Arc::new(AtomicUsize::new(1)),
47 id: BufferId::new(),
48 }
49 }
50
51 pub fn from_bytes(bytes: Bytes) -> Self {
53 let size = bytes.len();
54
55 Self {
56 data: bytes,
57 size,
58 ref_count: Arc::new(AtomicUsize::new(1)),
59 id: BufferId::new(),
60 }
61 }
62
63 pub fn create_view(&self, offset: usize, length: usize) -> Result<SegmentView> {
65 if offset + length > self.data.len() {
66 return Err(anyhow!("Segment view bounds exceed buffer size"));
67 }
68
69 self.ref_count.fetch_add(1, Ordering::Relaxed);
71
72 Ok(SegmentView {
73 buffer_id: self.id,
74 data: self.data.slice(offset..offset + length),
75 offset,
76 length,
77 buffer_ref: self.ref_count.clone(),
78 view_type: ViewType::Data,
79 })
80 }
81
82 pub fn create_full_view(&self) -> SegmentView {
84 self.ref_count.fetch_add(1, Ordering::Relaxed);
85
86 SegmentView {
87 buffer_id: self.id,
88 data: self.data.clone(),
89 offset: 0,
90 length: self.data.len(),
91 buffer_ref: self.ref_count.clone(),
92 view_type: ViewType::Full,
93 }
94 }
95
96 pub fn len(&self) -> usize {
98 self.data.len()
99 }
100
101 pub fn is_empty(&self) -> bool {
103 self.data.is_empty()
104 }
105
106 pub fn ref_count(&self) -> usize {
108 self.ref_count.load(Ordering::Relaxed)
109 }
110
111 pub fn id(&self) -> BufferId {
113 self.id
114 }
115}
116
117impl Drop for ZeroCopyBuffer {
118 fn drop(&mut self) {
119 let remaining_refs = self.ref_count.fetch_sub(1, Ordering::Relaxed);
120 debug!("Buffer {:?} dropped, {} references remaining", self.id, remaining_refs - 1);
121 }
122}
123
124#[derive(Debug, Clone, Copy, PartialEq)]
126pub enum ViewType {
127 Data,
129 Metadata,
131 Results,
133 Full,
135 Compressed,
137}
138
139#[derive(Debug, Clone)]
141pub struct SegmentView {
142 buffer_id: BufferId,
143 data: Bytes,
144 offset: usize,
145 length: usize,
146 buffer_ref: Arc<AtomicUsize>,
147 view_type: ViewType,
148}
149
150impl SegmentView {
151 pub fn as_bytes(&self) -> &Bytes {
153 &self.data
154 }
155
156 pub fn as_slice(&self) -> &[u8] {
158 &self.data
159 }
160
161 pub fn len(&self) -> usize {
163 self.length
164 }
165
166 pub fn is_empty(&self) -> bool {
168 self.length == 0
169 }
170
171 pub fn offset(&self) -> usize {
173 self.offset
174 }
175
176 pub fn view_type(&self) -> ViewType {
178 self.view_type
179 }
180
181 pub fn buffer_id(&self) -> BufferId {
183 self.buffer_id
184 }
185
186 pub fn slice(&self, range: Range<usize>) -> Result<SegmentView> {
188 if range.end > self.length {
189 return Err(anyhow!("Slice range exceeds view length"));
190 }
191
192 self.buffer_ref.fetch_add(1, Ordering::Relaxed);
194
195 Ok(SegmentView {
196 buffer_id: self.buffer_id,
197 data: self.data.slice(range.start..range.end),
198 offset: self.offset + range.start,
199 length: range.end - range.start,
200 buffer_ref: self.buffer_ref.clone(),
201 view_type: self.view_type,
202 })
203 }
204
205 pub fn to_string_lossy(&self) -> String {
207 String::from_utf8_lossy(&self.data).to_string()
208 }
209
210 pub fn parse_json<T>(&self) -> Result<T>
212 where
213 T: serde::de::DeserializeOwned
214 {
215 let text = std::str::from_utf8(&self.data)?;
216 Ok(serde_json::from_str(text)?)
217 }
218}
219
220impl Drop for SegmentView {
221 fn drop(&mut self) {
222 let remaining_refs = self.buffer_ref.fetch_sub(1, Ordering::Relaxed);
223 if remaining_refs == 1 {
224 debug!("Last reference to buffer {:?} dropped", self.buffer_id);
225 }
226 }
227}
228
229pub struct BufferPool {
231 small_buffers: Mutex<VecDeque<ZeroCopyBuffer>>, medium_buffers: Mutex<VecDeque<ZeroCopyBuffer>>, large_buffers: Mutex<VecDeque<ZeroCopyBuffer>>, stats: RwLock<PoolStats>,
235 config: PoolConfig,
236}
237
238#[derive(Debug, Clone)]
239pub struct PoolConfig {
240 pub small_buffer_size: usize,
241 pub medium_buffer_size: usize,
242 pub large_buffer_size: usize,
243 pub max_small_buffers: usize,
244 pub max_medium_buffers: usize,
245 pub max_large_buffers: usize,
246}
247
248impl Default for PoolConfig {
249 fn default() -> Self {
250 Self {
251 small_buffer_size: 1024, medium_buffer_size: 100_1024, large_buffer_size: 1024_1024, max_small_buffers: 100,
255 max_medium_buffers: 50,
256 max_large_buffers: 10,
257 }
258 }
259}
260
261#[derive(Debug, Default, Clone)]
262pub struct PoolStats {
263 pub total_allocations: usize,
264 pub pool_hits: usize,
265 pub pool_misses: usize,
266 pub small_buffers_active: usize,
267 pub medium_buffers_active: usize,
268 pub large_buffers_active: usize,
269 pub total_memory_bytes: usize,
270}
271
272impl PoolStats {
273 pub fn hit_rate(&self) -> f64 {
274 if self.total_allocations == 0 {
275 0.0
276 } else {
277 self.pool_hits as f64 / self.total_allocations as f64
278 }
279 }
280}
281
282impl BufferPool {
283 pub fn new(config: PoolConfig) -> Self {
285 Self {
286 small_buffers: Mutex::new(VecDeque::new()),
287 medium_buffers: Mutex::new(VecDeque::new()),
288 large_buffers: Mutex::new(VecDeque::new()),
289 stats: RwLock::new(PoolStats::default()),
290 config,
291 }
292 }
293
294 pub async fn acquire(&self, size: usize) -> Result<ZeroCopyBuffer> {
296 let mut stats = self.stats.write().await;
297 stats.total_allocations += 1;
298
299 let buffer = if size <= self.config.small_buffer_size {
300 self.acquire_small_buffer(&mut stats).await
301 } else if size <= self.config.medium_buffer_size {
302 self.acquire_medium_buffer(&mut stats).await
303 } else {
304 self.acquire_large_buffer(&mut stats, size).await
305 };
306
307 match buffer {
308 Some(buf) => {
309 stats.pool_hits += 1;
310 Ok(buf)
311 }
312 None => {
313 stats.pool_misses += 1;
314 Ok(ZeroCopyBuffer::new(size))
315 }
316 }
317 }
318
319 async fn acquire_small_buffer(&self, stats: &mut PoolStats) -> Option<ZeroCopyBuffer> {
320 let mut pool = self.small_buffers.lock().await;
321 let buffer = pool.pop_front();
322
323 if buffer.is_some() {
324 stats.small_buffers_active += 1;
325 }
326
327 buffer
328 }
329
330 async fn acquire_medium_buffer(&self, stats: &mut PoolStats) -> Option<ZeroCopyBuffer> {
331 let mut pool = self.medium_buffers.lock().await;
332 let buffer = pool.pop_front();
333
334 if buffer.is_some() {
335 stats.medium_buffers_active += 1;
336 }
337
338 buffer
339 }
340
341 async fn acquire_large_buffer(&self, stats: &mut PoolStats, size: usize) -> Option<ZeroCopyBuffer> {
342 let mut pool = self.large_buffers.lock().await;
343
344 let pos = pool.iter().position(|buf| buf.len() >= size);
346
347 if let Some(idx) = pos {
348 let buffer = pool.remove(idx);
349 stats.large_buffers_active += 1;
350 buffer
351 } else {
352 None
353 }
354 }
355
356 pub async fn release(&self, buffer: ZeroCopyBuffer) -> Result<()> {
358 if buffer.ref_count() > 1 {
360 return Ok(()); }
362
363 let size = buffer.len();
364 let mut stats = self.stats.write().await;
365
366 if size <= self.config.small_buffer_size {
367 let mut pool = self.small_buffers.lock().await;
368 if pool.len() < self.config.max_small_buffers {
369 pool.push_back(buffer);
370 if stats.small_buffers_active > 0 {
371 stats.small_buffers_active -= 1;
372 }
373 }
374 } else if size <= self.config.medium_buffer_size {
375 let mut pool = self.medium_buffers.lock().await;
376 if pool.len() < self.config.max_medium_buffers {
377 pool.push_back(buffer);
378 if stats.medium_buffers_active > 0 {
379 stats.medium_buffers_active -= 1;
380 }
381 }
382 } else {
383 let mut pool = self.large_buffers.lock().await;
384 if pool.len() < self.config.max_large_buffers {
385 pool.push_back(buffer);
386 if stats.large_buffers_active > 0 {
387 stats.large_buffers_active -= 1;
388 }
389 }
390 }
391
392 Ok(())
393 }
394
395 pub async fn stats(&self) -> PoolStats {
397 let stats = self.stats.read().await;
398 stats.clone()
399 }
400
401 pub async fn clear(&self) -> Result<()> {
403 let mut small = self.small_buffers.lock().await;
404 let mut medium = self.medium_buffers.lock().await;
405 let mut large = self.large_buffers.lock().await;
406
407 small.clear();
408 medium.clear();
409 large.clear();
410
411 let mut stats = self.stats.write().await;
412 stats.small_buffers_active = 0;
413 stats.medium_buffers_active = 0;
414 stats.large_buffers_active = 0;
415
416 Ok(())
417 }
418}
419
420pub struct PipelineMemoryManager {
422 buffer_pool: BufferPool,
423 active_buffers: RwLock<std::collections::HashMap<BufferId, Arc<ZeroCopyBuffer>>>,
424 memory_limit: usize,
425 current_usage: AtomicUsize,
426}
427
428impl PipelineMemoryManager {
429 pub fn new(memory_limit_mb: usize) -> Self {
431 let config = PoolConfig::default();
432 let buffer_pool = BufferPool::new(config);
433
434 Self {
435 buffer_pool,
436 active_buffers: RwLock::new(std::collections::HashMap::new()),
437 memory_limit: memory_limit_mb * 1024 * 1024, current_usage: AtomicUsize::new(0),
439 }
440 }
441
442 pub async fn allocate(&self, size: usize) -> Result<Arc<ZeroCopyBuffer>> {
444 let current = self.current_usage.load(Ordering::Relaxed);
446 if current + size > self.memory_limit {
447 return Err(anyhow!("Memory limit exceeded: {} + {} > {}",
448 current, size, self.memory_limit));
449 }
450
451 let buffer = Arc::new(self.buffer_pool.acquire(size).await?);
452 let buffer_id = buffer.id();
453
454 {
456 let mut active = self.active_buffers.write().await;
457 active.insert(buffer_id, buffer.clone());
458 }
459
460 self.current_usage.fetch_add(size, Ordering::Relaxed);
461
462 debug!("Allocated buffer {:?} of {} bytes", buffer_id, size);
463 Ok(buffer)
464 }
465
466 pub async fn deallocate(&self, buffer_id: BufferId) -> Result<()> {
468 let buffer = {
469 let mut active = self.active_buffers.write().await;
470 active.remove(&buffer_id)
471 };
472
473 if let Some(buffer) = buffer {
474 let size = buffer.len();
475
476 if Arc::strong_count(&buffer) == 1 {
478 if let Ok(owned_buffer) = Arc::try_unwrap(buffer) {
480 self.buffer_pool.release(owned_buffer).await?;
481 }
482 }
483
484 self.current_usage.fetch_sub(size, Ordering::Relaxed);
485 debug!("Deallocated buffer {:?} of {} bytes", buffer_id, size);
486 }
487
488 Ok(())
489 }
490
491 pub fn current_usage(&self) -> usize {
493 self.current_usage.load(Ordering::Relaxed)
494 }
495
496 pub fn utilization(&self) -> f64 {
498 self.current_usage() as f64 / self.memory_limit as f64
499 }
500
501 pub async fn pool_stats(&self) -> PoolStats {
503 self.buffer_pool.stats().await
504 }
505
506 pub async fn gc(&self) -> Result<usize> {
508 let mut collected = 0;
509 let mut to_remove = Vec::new();
510
511 {
512 let active = self.active_buffers.read().await;
513 for (id, buffer) in active.iter() {
514 if Arc::strong_count(buffer) == 1 {
516 to_remove.push(*id);
517 }
518 }
519 }
520
521 for buffer_id in to_remove {
522 self.deallocate(buffer_id).await?;
523 collected += 1;
524 }
525
526 if collected > 0 {
527 debug!("Garbage collected {} unused buffers", collected);
528 }
529
530 Ok(collected)
531 }
532}
533
534#[cfg(test)]
535mod tests {
536 use super::*;
537
538 #[test]
539 fn test_zero_copy_buffer() {
540 let buffer = ZeroCopyBuffer::new(1024);
541 assert_eq!(buffer.len(), 0); assert_eq!(buffer.ref_count(), 1);
543
544 let view = buffer.create_view(0, 0).unwrap();
545 assert_eq!(buffer.ref_count(), 2); drop(view);
548 assert_eq!(buffer.ref_count(), 1); }
550
551 #[test]
552 fn test_segment_view() {
553 let data = Bytes::from("Hello, World!");
554 let buffer = ZeroCopyBuffer::from_bytes(data);
555
556 let view = buffer.create_view(0, 5).unwrap();
557 assert_eq!(view.len(), 5);
558 assert_eq!(view.to_string_lossy(), "Hello");
559
560 let sub_view = view.slice(1..4).unwrap();
561 assert_eq!(sub_view.len(), 3);
562 assert_eq!(sub_view.to_string_lossy(), "ell");
563 }
564
565 #[tokio::test]
566 async fn test_buffer_pool() {
567 let config = PoolConfig::default();
568 let pool = BufferPool::new(config);
569
570 let buffer1 = pool.acquire(512).await.unwrap();
572 let stats = pool.stats().await;
573 assert_eq!(stats.total_allocations, 1);
574 assert_eq!(stats.pool_misses, 1);
575
576 pool.release(buffer1).await.unwrap();
578 let _buffer2 = pool.acquire(512).await.unwrap();
579
580 let stats = pool.stats().await;
581 assert_eq!(stats.total_allocations, 2);
582 assert_eq!(stats.pool_hits, 1);
583 }
584
585 #[tokio::test]
586 async fn test_memory_manager() {
587 let manager = PipelineMemoryManager::new(1); let buffer = manager.allocate(1024).await.unwrap();
590 assert_eq!(manager.current_usage(), 1024); let large_buffer = manager.allocate(2 * 1024 * 1024).await;
594 assert!(large_buffer.is_err()); manager.deallocate(buffer.id()).await.unwrap();
597 }
598
599 #[tokio::test]
600 async fn test_garbage_collection() {
601 let manager = PipelineMemoryManager::new(10); let buffer = manager.allocate(1024).await.unwrap();
604 let buffer_id = buffer.id();
605
606 let collected = manager.gc().await.unwrap();
608 assert_eq!(collected, 0);
609
610 drop(buffer);
611
612 let collected = manager.gc().await.unwrap();
614 assert_eq!(collected, 1);
615 }
616}