oxirs_stream/performance_optimizer/
memory.rs

1//! Memory management and pooling for performance optimization
2//!
3//! This module provides memory pooling capabilities to reduce allocation overhead
4//! and improve performance for high-throughput streaming operations.
5
6use super::config::MemoryPoolConfig;
7use anyhow::Result;
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::alloc::{alloc, dealloc, Layout};
11use std::collections::VecDeque;
12use std::ptr::NonNull;
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::Arc;
15use std::time::Instant;
16use tracing::{debug, info};
17
18/// Memory pool for efficient allocation and deallocation
19pub struct MemoryPool {
20    config: MemoryPoolConfig,
21    available_blocks: Arc<RwLock<VecDeque<MemoryBlock>>>,
22    stats: Arc<MemoryPoolStats>,
23    total_allocated: AtomicUsize,
24    peak_allocated: AtomicUsize,
25    last_compaction: Arc<RwLock<Instant>>,
26}
27
28/// Memory block structure
29#[derive(Debug)]
30pub struct MemoryBlock {
31    ptr: NonNull<u8>,
32    size: usize,
33    layout: Layout,
34    allocated_at: Instant,
35}
36
37// Safety: MemoryBlock is safe to send between threads as it represents
38// owned memory that won't be accessed concurrently
39unsafe impl Send for MemoryBlock {}
40unsafe impl Sync for MemoryBlock {}
41
42/// Memory pool statistics
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct MemoryPoolStats {
45    /// Total blocks allocated
46    pub total_allocated: usize,
47    /// Total blocks freed
48    pub total_freed: usize,
49    /// Currently allocated blocks
50    pub current_allocated: usize,
51    /// Peak allocated blocks
52    pub peak_allocated: usize,
53    /// Total bytes allocated
54    pub total_bytes_allocated: usize,
55    /// Total bytes freed
56    pub total_bytes_freed: usize,
57    /// Current bytes allocated
58    pub current_bytes_allocated: usize,
59    /// Peak bytes allocated
60    pub peak_bytes_allocated: usize,
61    /// Pool hits (reused blocks)
62    pub pool_hits: usize,
63    /// Pool misses (new allocations)
64    pub pool_misses: usize,
65    /// Fragmentation ratio
66    pub fragmentation_ratio: f64,
67    /// Average allocation size
68    pub average_allocation_size: f64,
69    /// Last compaction time (seconds since Unix epoch)
70    pub last_compaction: Option<u64>,
71}
72
73impl Default for MemoryPoolStats {
74    fn default() -> Self {
75        Self {
76            total_allocated: 0,
77            total_freed: 0,
78            current_allocated: 0,
79            peak_allocated: 0,
80            total_bytes_allocated: 0,
81            total_bytes_freed: 0,
82            current_bytes_allocated: 0,
83            peak_bytes_allocated: 0,
84            pool_hits: 0,
85            pool_misses: 0,
86            fragmentation_ratio: 0.0,
87            average_allocation_size: 0.0,
88            last_compaction: None,
89        }
90    }
91}
92
93impl MemoryPool {
94    /// Create a new memory pool with the given configuration
95    pub fn new(config: MemoryPoolConfig) -> Self {
96        Self {
97            config,
98            available_blocks: Arc::new(RwLock::new(VecDeque::new())),
99            stats: Arc::new(MemoryPoolStats::default()),
100            total_allocated: AtomicUsize::new(0),
101            peak_allocated: AtomicUsize::new(0),
102            last_compaction: Arc::new(RwLock::new(Instant::now())),
103        }
104    }
105
106    /// Allocate a memory block of the given size
107    pub fn allocate(&self, size: usize) -> Result<MemoryHandle> {
108        let layout = Layout::from_size_align(size, std::mem::align_of::<u8>())
109            .map_err(|e| anyhow::anyhow!("Invalid layout: {}", e))?;
110
111        // Try to reuse an existing block
112        if let Some(block) = self.try_reuse_block(size) {
113            self.update_stats_on_hit();
114            return Ok(MemoryHandle {
115                block,
116                pool: self.available_blocks.clone(),
117            });
118        }
119
120        // Allocate new block
121        let ptr = unsafe { alloc(layout) };
122        if ptr.is_null() {
123            return Err(anyhow::anyhow!("Failed to allocate memory"));
124        }
125
126        let block = MemoryBlock {
127            ptr: NonNull::new(ptr).unwrap(),
128            size,
129            layout,
130            allocated_at: Instant::now(),
131        };
132
133        self.update_stats_on_miss(size);
134
135        Ok(MemoryHandle {
136            block,
137            pool: self.available_blocks.clone(),
138        })
139    }
140
141    /// Try to reuse an existing memory block
142    fn try_reuse_block(&self, size: usize) -> Option<MemoryBlock> {
143        let mut available = self.available_blocks.write();
144
145        // Find a suitable block (first fit strategy)
146        for (i, block) in available.iter().enumerate() {
147            if block.size >= size {
148                return available.remove(i);
149            }
150        }
151
152        None
153    }
154
155    /// Update statistics on cache hit
156    fn update_stats_on_hit(&self) {
157        // In a real implementation, this would update atomic counters
158        // For now, we'll use a simplified approach
159        debug!("Memory pool hit");
160    }
161
162    /// Update statistics on cache miss
163    fn update_stats_on_miss(&self, size: usize) {
164        let current = self.total_allocated.fetch_add(1, Ordering::Relaxed) + 1;
165        let peak = self.peak_allocated.load(Ordering::Relaxed);
166
167        if current > peak {
168            self.peak_allocated.store(current, Ordering::Relaxed);
169        }
170
171        debug!("Memory pool miss, allocated block of size: {}", size);
172    }
173
174    /// Return a memory block to the pool
175    pub fn deallocate(&self, block: MemoryBlock) {
176        let mut available = self.available_blocks.write();
177
178        // Check if we should return the block to the pool or deallocate it
179        if available.len() < self.config.max_size / block.size {
180            available.push_back(block);
181            debug!("Returned block to pool");
182        } else {
183            // Pool is full, deallocate the block
184            unsafe {
185                dealloc(block.ptr.as_ptr(), block.layout);
186            }
187            debug!("Deallocated block (pool full)");
188        }
189
190        self.total_allocated.fetch_sub(1, Ordering::Relaxed);
191    }
192
193    /// Get memory pool statistics
194    pub fn stats(&self) -> MemoryPoolStats {
195        let available = self.available_blocks.read();
196        let current_allocated = self.total_allocated.load(Ordering::Relaxed);
197        let peak_allocated = self.peak_allocated.load(Ordering::Relaxed);
198
199        MemoryPoolStats {
200            total_allocated: current_allocated + available.len(),
201            total_freed: 0, // Would be tracked in a real implementation
202            current_allocated,
203            peak_allocated,
204            total_bytes_allocated: 0, // Would be tracked in a real implementation
205            total_bytes_freed: 0,
206            current_bytes_allocated: 0,
207            peak_bytes_allocated: 0,
208            pool_hits: 0,
209            pool_misses: 0,
210            fragmentation_ratio: self.calculate_fragmentation(),
211            average_allocation_size: 0.0,
212            last_compaction: Some(
213                std::time::SystemTime::now()
214                    .duration_since(std::time::UNIX_EPOCH)
215                    .unwrap()
216                    .as_secs(),
217            ),
218        }
219    }
220
221    /// Calculate fragmentation ratio
222    fn calculate_fragmentation(&self) -> f64 {
223        let available = self.available_blocks.read();
224        if available.is_empty() {
225            return 0.0;
226        }
227
228        let total_size: usize = available.iter().map(|b| b.size).sum();
229        let largest_block = available.iter().map(|b| b.size).max().unwrap_or(0);
230
231        if total_size == 0 {
232            0.0
233        } else {
234            1.0 - (largest_block as f64 / total_size as f64)
235        }
236    }
237
238    /// Compact the memory pool
239    pub fn compact(&self) -> Result<()> {
240        let mut available = self.available_blocks.write();
241        let mut last_compaction = self.last_compaction.write();
242
243        // Sort blocks by size for better allocation patterns
244        let mut blocks: Vec<_> = available.drain(..).collect();
245        blocks.sort_by_key(|b| b.size);
246
247        // Remove very old blocks to prevent memory leaks
248        let now = Instant::now();
249        let threshold = std::time::Duration::from_secs(300); // 5 minutes
250
251        blocks.retain(|block| now.duration_since(block.allocated_at) < threshold);
252
253        // Put blocks back
254        available.extend(blocks);
255        *last_compaction = now;
256
257        info!(
258            "Memory pool compacted, {} blocks remaining",
259            available.len()
260        );
261        Ok(())
262    }
263
264    /// Check if compaction is needed
265    pub fn needs_compaction(&self) -> bool {
266        let last_compaction = self.last_compaction.read();
267        let elapsed = last_compaction.elapsed();
268
269        elapsed.as_secs() > self.config.compaction_interval
270    }
271
272    /// Get the current pool size
273    pub fn pool_size(&self) -> usize {
274        self.available_blocks.read().len()
275    }
276
277    /// Get the total allocated memory
278    pub fn total_allocated(&self) -> usize {
279        self.total_allocated.load(Ordering::Relaxed)
280    }
281}
282
283/// Handle for allocated memory
284pub struct MemoryHandle {
285    block: MemoryBlock,
286    pool: Arc<RwLock<VecDeque<MemoryBlock>>>,
287}
288
289impl MemoryHandle {
290    /// Get a raw pointer to the allocated memory
291    pub fn as_ptr(&self) -> *mut u8 {
292        self.block.ptr.as_ptr()
293    }
294
295    /// Get the size of the allocated memory
296    pub fn size(&self) -> usize {
297        self.block.size
298    }
299
300    /// Get a slice view of the allocated memory
301    ///
302    /// # Safety
303    /// The caller must ensure that the memory block is properly initialized
304    /// and that the returned slice is not accessed after the block is deallocated
305    pub unsafe fn as_slice(&self) -> &[u8] {
306        std::slice::from_raw_parts(self.block.ptr.as_ptr(), self.block.size)
307    }
308
309    /// Get a mutable slice view of the allocated memory
310    ///
311    /// # Safety
312    /// The caller must ensure that the memory block is properly initialized
313    /// and that the returned slice is not accessed after the block is deallocated
314    pub unsafe fn as_mut_slice(&mut self) -> &mut [u8] {
315        std::slice::from_raw_parts_mut(self.block.ptr.as_ptr(), self.block.size)
316    }
317}
318
319impl Drop for MemoryHandle {
320    fn drop(&mut self) {
321        // Return the block to the pool
322        let block = MemoryBlock {
323            ptr: self.block.ptr,
324            size: self.block.size,
325            layout: self.block.layout,
326            allocated_at: self.block.allocated_at,
327        };
328
329        let mut available = self.pool.write();
330        available.push_back(block);
331    }
332}
333
334unsafe impl Send for MemoryHandle {}
335unsafe impl Sync for MemoryHandle {}
336
337/// Memory allocation strategy
338#[derive(Debug, Clone, Serialize, Deserialize, Default)]
339pub enum AllocationStrategy {
340    /// First fit - use the first block that fits
341    #[default]
342    FirstFit,
343    /// Best fit - use the smallest block that fits
344    BestFit,
345    /// Worst fit - use the largest block that fits
346    WorstFit,
347    /// Next fit - start searching from the last allocated position
348    NextFit,
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354
355    #[test]
356    fn test_memory_pool_creation() {
357        let config = MemoryPoolConfig::default();
358        let pool = MemoryPool::new(config);
359
360        assert_eq!(pool.pool_size(), 0);
361        assert_eq!(pool.total_allocated(), 0);
362    }
363
364    #[test]
365    fn test_memory_allocation() {
366        let config = MemoryPoolConfig::default();
367        let pool = MemoryPool::new(config);
368
369        let handle = pool.allocate(1024).unwrap();
370        assert_eq!(handle.size(), 1024);
371        assert_eq!(pool.total_allocated(), 1);
372    }
373
374    #[test]
375    fn test_memory_pool_stats() {
376        let config = MemoryPoolConfig::default();
377        let pool = MemoryPool::new(config);
378
379        let stats = pool.stats();
380        assert_eq!(stats.current_allocated, 0);
381        assert_eq!(stats.peak_allocated, 0);
382    }
383
384    #[test]
385    fn test_memory_pool_compaction() {
386        let config = MemoryPoolConfig::default();
387        let pool = MemoryPool::new(config);
388
389        assert!(pool.compact().is_ok());
390    }
391}