lens_core/pipeline/
memory.rs

1//! Zero-Copy Memory Management
2//!
3//! Implements zero-copy buffer management for the fused pipeline:
4//! - Memory-mapped buffers for large datasets
5//! - Segment views without data copying
6//! - Reference-counted buffer sharing
7//! - Efficient memory pool management
8
9use anyhow::{anyhow, Result};
10use bytes::{Bytes, BytesMut};
11use std::ops::Range;
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::Arc;
14use std::collections::VecDeque;
15use tokio::sync::{Mutex, RwLock};
16use tracing::{debug, warn};
17
18/// Zero-copy buffer with reference counting and segment views
19#[derive(Debug)]
20pub struct ZeroCopyBuffer {
21    data: Bytes,
22    size: usize,
23    ref_count: Arc<AtomicUsize>,
24    id: BufferId,
25}
26
27/// Unique buffer identifier
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
29pub struct BufferId(u64);
30
31impl BufferId {
32    fn new() -> Self {
33        static COUNTER: AtomicUsize = AtomicUsize::new(1);
34        Self(COUNTER.fetch_add(1, Ordering::Relaxed) as u64)
35    }
36}
37
38impl ZeroCopyBuffer {
39    /// Create a new zero-copy buffer with specified capacity
40    pub fn new(capacity: usize) -> Self {
41        let data = BytesMut::with_capacity(capacity).freeze();
42        
43        Self {
44            data,
45            size: 0,
46            ref_count: Arc::new(AtomicUsize::new(1)),
47            id: BufferId::new(),
48        }
49    }
50
51    /// Create buffer from existing data without copying
52    pub fn from_bytes(bytes: Bytes) -> Self {
53        let size = bytes.len();
54        
55        Self {
56            data: bytes,
57            size,
58            ref_count: Arc::new(AtomicUsize::new(1)),
59            id: BufferId::new(),
60        }
61    }
62
63    /// Create a segment view of this buffer
64    pub fn create_view(&self, offset: usize, length: usize) -> Result<SegmentView> {
65        if offset + length > self.data.len() {
66            return Err(anyhow!("Segment view bounds exceed buffer size"));
67        }
68
69        // Increment reference count for the view
70        self.ref_count.fetch_add(1, Ordering::Relaxed);
71
72        Ok(SegmentView {
73            buffer_id: self.id,
74            data: self.data.slice(offset..offset + length),
75            offset,
76            length,
77            buffer_ref: self.ref_count.clone(),
78            view_type: ViewType::Data,
79        })
80    }
81
82    /// Create a view of the entire buffer
83    pub fn create_full_view(&self) -> SegmentView {
84        self.ref_count.fetch_add(1, Ordering::Relaxed);
85        
86        SegmentView {
87            buffer_id: self.id,
88            data: self.data.clone(),
89            offset: 0,
90            length: self.data.len(),
91            buffer_ref: self.ref_count.clone(),
92            view_type: ViewType::Full,
93        }
94    }
95
96    /// Get buffer size
97    pub fn len(&self) -> usize {
98        self.data.len()
99    }
100
101    /// Check if buffer is empty
102    pub fn is_empty(&self) -> bool {
103        self.data.is_empty()
104    }
105
106    /// Get current reference count
107    pub fn ref_count(&self) -> usize {
108        self.ref_count.load(Ordering::Relaxed)
109    }
110
111    /// Get buffer ID
112    pub fn id(&self) -> BufferId {
113        self.id
114    }
115}
116
117impl Drop for ZeroCopyBuffer {
118    fn drop(&mut self) {
119        let remaining_refs = self.ref_count.fetch_sub(1, Ordering::Relaxed);
120        debug!("Buffer {:?} dropped, {} references remaining", self.id, remaining_refs - 1);
121    }
122}
123
124/// View type for different kinds of segment views
125#[derive(Debug, Clone, Copy, PartialEq)]
126pub enum ViewType {
127    /// View of data segment
128    Data,
129    /// View of metadata segment  
130    Metadata,
131    /// View of search results
132    Results,
133    /// Full buffer view
134    Full,
135    /// Compressed data view
136    Compressed,
137}
138
139/// Zero-copy segment view into a buffer
140#[derive(Debug, Clone)]
141pub struct SegmentView {
142    buffer_id: BufferId,
143    data: Bytes,
144    offset: usize,
145    length: usize,
146    buffer_ref: Arc<AtomicUsize>,
147    view_type: ViewType,
148}
149
150impl SegmentView {
151    /// Get the data as bytes (zero-copy)
152    pub fn as_bytes(&self) -> &Bytes {
153        &self.data
154    }
155
156    /// Get data as slice
157    pub fn as_slice(&self) -> &[u8] {
158        &self.data
159    }
160
161    /// Get view length
162    pub fn len(&self) -> usize {
163        self.length
164    }
165
166    /// Check if view is empty
167    pub fn is_empty(&self) -> bool {
168        self.length == 0
169    }
170
171    /// Get offset within parent buffer
172    pub fn offset(&self) -> usize {
173        self.offset
174    }
175
176    /// Get view type
177    pub fn view_type(&self) -> ViewType {
178        self.view_type
179    }
180
181    /// Get buffer ID this view belongs to
182    pub fn buffer_id(&self) -> BufferId {
183        self.buffer_id
184    }
185
186    /// Create a sub-view of this view (zero-copy)
187    pub fn slice(&self, range: Range<usize>) -> Result<SegmentView> {
188        if range.end > self.length {
189            return Err(anyhow!("Slice range exceeds view length"));
190        }
191
192        // Increment reference count for new view
193        self.buffer_ref.fetch_add(1, Ordering::Relaxed);
194
195        Ok(SegmentView {
196            buffer_id: self.buffer_id,
197            data: self.data.slice(range.start..range.end),
198            offset: self.offset + range.start,
199            length: range.end - range.start,
200            buffer_ref: self.buffer_ref.clone(),
201            view_type: self.view_type,
202        })
203    }
204
205    /// Convert to string (for text data)
206    pub fn to_string_lossy(&self) -> String {
207        String::from_utf8_lossy(&self.data).to_string()
208    }
209
210    /// Parse as JSON
211    pub fn parse_json<T>(&self) -> Result<T> 
212    where 
213        T: serde::de::DeserializeOwned 
214    {
215        let text = std::str::from_utf8(&self.data)?;
216        Ok(serde_json::from_str(text)?)
217    }
218}
219
220impl Drop for SegmentView {
221    fn drop(&mut self) {
222        let remaining_refs = self.buffer_ref.fetch_sub(1, Ordering::Relaxed);
223        if remaining_refs == 1 {
224            debug!("Last reference to buffer {:?} dropped", self.buffer_id);
225        }
226    }
227}
228
229/// Memory pool for efficient buffer management
230pub struct BufferPool {
231    small_buffers: Mutex<VecDeque<ZeroCopyBuffer>>,  // < 1KB
232    medium_buffers: Mutex<VecDeque<ZeroCopyBuffer>>, // 1KB - 100KB  
233    large_buffers: Mutex<VecDeque<ZeroCopyBuffer>>,  // > 100KB
234    stats: RwLock<PoolStats>,
235    config: PoolConfig,
236}
237
238#[derive(Debug, Clone)]
239pub struct PoolConfig {
240    pub small_buffer_size: usize,
241    pub medium_buffer_size: usize,
242    pub large_buffer_size: usize,
243    pub max_small_buffers: usize,
244    pub max_medium_buffers: usize,
245    pub max_large_buffers: usize,
246}
247
248impl Default for PoolConfig {
249    fn default() -> Self {
250        Self {
251            small_buffer_size: 1024,       // 1KB
252            medium_buffer_size: 100_1024,  // 100KB
253            large_buffer_size: 1024_1024,  // 1MB
254            max_small_buffers: 100,
255            max_medium_buffers: 50,
256            max_large_buffers: 10,
257        }
258    }
259}
260
261#[derive(Debug, Default, Clone)]
262pub struct PoolStats {
263    pub total_allocations: usize,
264    pub pool_hits: usize,
265    pub pool_misses: usize,
266    pub small_buffers_active: usize,
267    pub medium_buffers_active: usize,
268    pub large_buffers_active: usize,
269    pub total_memory_bytes: usize,
270}
271
272impl PoolStats {
273    pub fn hit_rate(&self) -> f64 {
274        if self.total_allocations == 0 {
275            0.0
276        } else {
277            self.pool_hits as f64 / self.total_allocations as f64
278        }
279    }
280}
281
282impl BufferPool {
283    /// Create a new buffer pool
284    pub fn new(config: PoolConfig) -> Self {
285        Self {
286            small_buffers: Mutex::new(VecDeque::new()),
287            medium_buffers: Mutex::new(VecDeque::new()),
288            large_buffers: Mutex::new(VecDeque::new()),
289            stats: RwLock::new(PoolStats::default()),
290            config,
291        }
292    }
293
294    /// Acquire a buffer from the pool or allocate new one
295    pub async fn acquire(&self, size: usize) -> Result<ZeroCopyBuffer> {
296        let mut stats = self.stats.write().await;
297        stats.total_allocations += 1;
298
299        let buffer = if size <= self.config.small_buffer_size {
300            self.acquire_small_buffer(&mut stats).await
301        } else if size <= self.config.medium_buffer_size {
302            self.acquire_medium_buffer(&mut stats).await
303        } else {
304            self.acquire_large_buffer(&mut stats, size).await
305        };
306
307        match buffer {
308            Some(buf) => {
309                stats.pool_hits += 1;
310                Ok(buf)
311            }
312            None => {
313                stats.pool_misses += 1;
314                Ok(ZeroCopyBuffer::new(size))
315            }
316        }
317    }
318
319    async fn acquire_small_buffer(&self, stats: &mut PoolStats) -> Option<ZeroCopyBuffer> {
320        let mut pool = self.small_buffers.lock().await;
321        let buffer = pool.pop_front();
322        
323        if buffer.is_some() {
324            stats.small_buffers_active += 1;
325        }
326        
327        buffer
328    }
329
330    async fn acquire_medium_buffer(&self, stats: &mut PoolStats) -> Option<ZeroCopyBuffer> {
331        let mut pool = self.medium_buffers.lock().await;
332        let buffer = pool.pop_front();
333        
334        if buffer.is_some() {
335            stats.medium_buffers_active += 1;
336        }
337        
338        buffer
339    }
340
341    async fn acquire_large_buffer(&self, stats: &mut PoolStats, size: usize) -> Option<ZeroCopyBuffer> {
342        let mut pool = self.large_buffers.lock().await;
343        
344        // Find a large buffer that's big enough
345        let pos = pool.iter().position(|buf| buf.len() >= size);
346        
347        if let Some(idx) = pos {
348            let buffer = pool.remove(idx);
349            stats.large_buffers_active += 1;
350            buffer
351        } else {
352            None
353        }
354    }
355
356    /// Return a buffer to the pool for reuse
357    pub async fn release(&self, buffer: ZeroCopyBuffer) -> Result<()> {
358        // Only pool buffers that have no other references
359        if buffer.ref_count() > 1 {
360            return Ok(()); // Let it drop naturally
361        }
362
363        let size = buffer.len();
364        let mut stats = self.stats.write().await;
365
366        if size <= self.config.small_buffer_size {
367            let mut pool = self.small_buffers.lock().await;
368            if pool.len() < self.config.max_small_buffers {
369                pool.push_back(buffer);
370                if stats.small_buffers_active > 0 {
371                    stats.small_buffers_active -= 1;
372                }
373            }
374        } else if size <= self.config.medium_buffer_size {
375            let mut pool = self.medium_buffers.lock().await;
376            if pool.len() < self.config.max_medium_buffers {
377                pool.push_back(buffer);
378                if stats.medium_buffers_active > 0 {
379                    stats.medium_buffers_active -= 1;
380                }
381            }
382        } else {
383            let mut pool = self.large_buffers.lock().await;
384            if pool.len() < self.config.max_large_buffers {
385                pool.push_back(buffer);
386                if stats.large_buffers_active > 0 {
387                    stats.large_buffers_active -= 1;
388                }
389            }
390        }
391
392        Ok(())
393    }
394
395    /// Get pool statistics
396    pub async fn stats(&self) -> PoolStats {
397        let stats = self.stats.read().await;
398        stats.clone()
399    }
400
401    /// Clear all pooled buffers
402    pub async fn clear(&self) -> Result<()> {
403        let mut small = self.small_buffers.lock().await;
404        let mut medium = self.medium_buffers.lock().await;
405        let mut large = self.large_buffers.lock().await;
406
407        small.clear();
408        medium.clear();
409        large.clear();
410
411        let mut stats = self.stats.write().await;
412        stats.small_buffers_active = 0;
413        stats.medium_buffers_active = 0;
414        stats.large_buffers_active = 0;
415
416        Ok(())
417    }
418}
419
420/// Memory manager for the entire pipeline
421pub struct PipelineMemoryManager {
422    buffer_pool: BufferPool,
423    active_buffers: RwLock<std::collections::HashMap<BufferId, Arc<ZeroCopyBuffer>>>,
424    memory_limit: usize,
425    current_usage: AtomicUsize,
426}
427
428impl PipelineMemoryManager {
429    /// Create a new memory manager
430    pub fn new(memory_limit_mb: usize) -> Self {
431        let config = PoolConfig::default();
432        let buffer_pool = BufferPool::new(config);
433        
434        Self {
435            buffer_pool,
436            active_buffers: RwLock::new(std::collections::HashMap::new()),
437            memory_limit: memory_limit_mb * 1024 * 1024, // Convert to bytes
438            current_usage: AtomicUsize::new(0),
439        }
440    }
441
442    /// Allocate a new buffer
443    pub async fn allocate(&self, size: usize) -> Result<Arc<ZeroCopyBuffer>> {
444        // Check memory limit
445        let current = self.current_usage.load(Ordering::Relaxed);
446        if current + size > self.memory_limit {
447            return Err(anyhow!("Memory limit exceeded: {} + {} > {}", 
448                             current, size, self.memory_limit));
449        }
450
451        let buffer = Arc::new(self.buffer_pool.acquire(size).await?);
452        let buffer_id = buffer.id();
453
454        // Track the buffer
455        {
456            let mut active = self.active_buffers.write().await;
457            active.insert(buffer_id, buffer.clone());
458        }
459
460        self.current_usage.fetch_add(size, Ordering::Relaxed);
461        
462        debug!("Allocated buffer {:?} of {} bytes", buffer_id, size);
463        Ok(buffer)
464    }
465
466    /// Deallocate a buffer
467    pub async fn deallocate(&self, buffer_id: BufferId) -> Result<()> {
468        let buffer = {
469            let mut active = self.active_buffers.write().await;
470            active.remove(&buffer_id)
471        };
472
473        if let Some(buffer) = buffer {
474            let size = buffer.len();
475            
476            // Try to return to pool
477            if Arc::strong_count(&buffer) == 1 {
478                // We have the only reference, safe to extract and pool
479                if let Ok(owned_buffer) = Arc::try_unwrap(buffer) {
480                    self.buffer_pool.release(owned_buffer).await?;
481                }
482            }
483            
484            self.current_usage.fetch_sub(size, Ordering::Relaxed);
485            debug!("Deallocated buffer {:?} of {} bytes", buffer_id, size);
486        }
487
488        Ok(())
489    }
490
491    /// Get current memory usage
492    pub fn current_usage(&self) -> usize {
493        self.current_usage.load(Ordering::Relaxed)
494    }
495
496    /// Get memory utilization percentage
497    pub fn utilization(&self) -> f64 {
498        self.current_usage() as f64 / self.memory_limit as f64
499    }
500
501    /// Get buffer pool statistics
502    pub async fn pool_stats(&self) -> PoolStats {
503        self.buffer_pool.stats().await
504    }
505
506    /// Force garbage collection of unused buffers
507    pub async fn gc(&self) -> Result<usize> {
508        let mut collected = 0;
509        let mut to_remove = Vec::new();
510
511        {
512            let active = self.active_buffers.read().await;
513            for (id, buffer) in active.iter() {
514                // If we have the only reference, it can be collected
515                if Arc::strong_count(buffer) == 1 {
516                    to_remove.push(*id);
517                }
518            }
519        }
520
521        for buffer_id in to_remove {
522            self.deallocate(buffer_id).await?;
523            collected += 1;
524        }
525
526        if collected > 0 {
527            debug!("Garbage collected {} unused buffers", collected);
528        }
529
530        Ok(collected)
531    }
532}
533
534#[cfg(test)]
535mod tests {
536    use super::*;
537
538    #[test]
539    fn test_zero_copy_buffer() {
540        let buffer = ZeroCopyBuffer::new(1024);
541        assert_eq!(buffer.len(), 0); // Empty until data is added
542        assert_eq!(buffer.ref_count(), 1);
543        
544        let view = buffer.create_view(0, 0).unwrap();
545        assert_eq!(buffer.ref_count(), 2); // Buffer + view
546        
547        drop(view);
548        assert_eq!(buffer.ref_count(), 1); // Just buffer
549    }
550
551    #[test]
552    fn test_segment_view() {
553        let data = Bytes::from("Hello, World!");
554        let buffer = ZeroCopyBuffer::from_bytes(data);
555        
556        let view = buffer.create_view(0, 5).unwrap();
557        assert_eq!(view.len(), 5);
558        assert_eq!(view.to_string_lossy(), "Hello");
559        
560        let sub_view = view.slice(1..4).unwrap();
561        assert_eq!(sub_view.len(), 3);
562        assert_eq!(sub_view.to_string_lossy(), "ell");
563    }
564
565    #[tokio::test]
566    async fn test_buffer_pool() {
567        let config = PoolConfig::default();
568        let pool = BufferPool::new(config);
569        
570        // First acquisition should be a miss
571        let buffer1 = pool.acquire(512).await.unwrap();
572        let stats = pool.stats().await;
573        assert_eq!(stats.total_allocations, 1);
574        assert_eq!(stats.pool_misses, 1);
575        
576        // Release and reacquire should be a hit
577        pool.release(buffer1).await.unwrap();
578        let _buffer2 = pool.acquire(512).await.unwrap();
579        
580        let stats = pool.stats().await;
581        assert_eq!(stats.total_allocations, 2);
582        assert_eq!(stats.pool_hits, 1);
583    }
584
585    #[tokio::test]
586    async fn test_memory_manager() {
587        let manager = PipelineMemoryManager::new(1); // 1MB limit
588        
589        let buffer = manager.allocate(1024).await.unwrap();
590        assert_eq!(manager.current_usage(), 1024); // Buffer allocated, usage increases
591        
592        // Test memory limit
593        let large_buffer = manager.allocate(2 * 1024 * 1024).await;
594        assert!(large_buffer.is_err()); // Should exceed 1MB limit
595        
596        manager.deallocate(buffer.id()).await.unwrap();
597    }
598
599    #[tokio::test]
600    async fn test_garbage_collection() {
601        let manager = PipelineMemoryManager::new(10); // 10MB limit
602        
603        let buffer = manager.allocate(1024).await.unwrap();
604        let buffer_id = buffer.id();
605        
606        // Buffer is still referenced, shouldn't be collected
607        let collected = manager.gc().await.unwrap();
608        assert_eq!(collected, 0);
609        
610        drop(buffer);
611        
612        // Now buffer can be collected
613        let collected = manager.gc().await.unwrap();
614        assert_eq!(collected, 1);
615    }
616}