Skip to main content

oxigdal_core/memory/
pool.rs

1//! Memory Pool Management
2//!
3//! This module provides memory pools for efficient buffer reuse:
4//! - Buffer size classes (512B, 4KB, 64KB, 1MB, etc.)
5//! - Pool growing and shrinking
6//! - Memory limit enforcement
7//! - Pool fragmentation tracking
8//! - Automatic pool compaction
9
10// Arc with PoolInner that has unsafe Send+Sync impls is intentional
11#![allow(clippy::arc_with_non_send_sync)]
12// Unsafe code is necessary for memory pool operations
13#![allow(unsafe_code)]
14
15use crate::error::{OxiGdalError, Result};
16use parking_lot::RwLock;
17use std::alloc::{Layout, alloc, dealloc};
18use std::collections::HashMap;
19use std::ptr::NonNull;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22
23/// Standard buffer size classes
24pub const SIZE_CLASSES: &[usize] = &[
25    512,        // 512 bytes
26    4096,       // 4 KB
27    16384,      // 16 KB
28    65536,      // 64 KB
29    262_144,    // 256 KB
30    1_048_576,  // 1 MB
31    4_194_304,  // 4 MB
32    16_777_216, // 16 MB
33];
34
35/// Default memory limit (1GB)
36pub const DEFAULT_MEMORY_LIMIT: usize = 1024 * 1024 * 1024;
37
38/// Pool statistics
39#[derive(Debug, Default)]
40pub struct PoolStats {
41    /// Total allocations from pool
42    pub total_allocations: AtomicU64,
43    /// Total deallocations to pool
44    pub total_deallocations: AtomicU64,
45    /// Cache hits (allocated from pool)
46    pub cache_hits: AtomicU64,
47    /// Cache misses (allocated from system)
48    pub cache_misses: AtomicU64,
49    /// Current bytes in pool
50    pub bytes_in_pool: AtomicUsize,
51    /// Peak bytes in pool
52    pub peak_bytes: AtomicUsize,
53    /// Number of compactions
54    pub compactions: AtomicU64,
55}
56
57impl PoolStats {
58    /// Create new statistics
59    #[must_use]
60    pub fn new() -> Self {
61        Self::default()
62    }
63
64    /// Record an allocation
65    pub fn record_allocation(&self, from_pool: bool) {
66        self.total_allocations.fetch_add(1, Ordering::Relaxed);
67        if from_pool {
68            self.cache_hits.fetch_add(1, Ordering::Relaxed);
69        } else {
70            self.cache_misses.fetch_add(1, Ordering::Relaxed);
71        }
72    }
73
74    /// Record a deallocation
75    pub fn record_deallocation(&self, size: usize) {
76        self.total_deallocations.fetch_add(1, Ordering::Relaxed);
77        let new_bytes = self.bytes_in_pool.fetch_add(size, Ordering::Relaxed) + size;
78
79        // Update peak
80        let mut peak = self.peak_bytes.load(Ordering::Relaxed);
81        while new_bytes > peak {
82            match self.peak_bytes.compare_exchange_weak(
83                peak,
84                new_bytes,
85                Ordering::Relaxed,
86                Ordering::Relaxed,
87            ) {
88                Ok(_) => break,
89                Err(x) => peak = x,
90            }
91        }
92    }
93
94    /// Record a compaction
95    pub fn record_compaction(&self, bytes_freed: usize) {
96        self.compactions.fetch_add(1, Ordering::Relaxed);
97        self.bytes_in_pool.fetch_sub(bytes_freed, Ordering::Relaxed);
98    }
99
100    /// Get cache hit rate
101    pub fn hit_rate(&self) -> f64 {
102        let hits = self.cache_hits.load(Ordering::Relaxed);
103        let misses = self.cache_misses.load(Ordering::Relaxed);
104        let total = hits + misses;
105        if total == 0 {
106            0.0
107        } else {
108            hits as f64 / total as f64
109        }
110    }
111
112    /// Get current pool size
113    pub fn current_size(&self) -> usize {
114        self.bytes_in_pool.load(Ordering::Relaxed)
115    }
116}
117
118/// Configuration for memory pool
119#[derive(Debug, Clone)]
120pub struct PoolConfig {
121    /// Size classes and their initial capacities
122    pub size_classes: HashMap<usize, usize>,
123    /// Maximum total memory in pool
124    pub memory_limit: usize,
125    /// Compaction threshold (fraction of limit)
126    pub compaction_threshold: f64,
127    /// Minimum free buffers before growing
128    pub min_free_buffers: usize,
129}
130
131impl Default for PoolConfig {
132    fn default() -> Self {
133        let mut size_classes = HashMap::new();
134        for &size in SIZE_CLASSES {
135            size_classes.insert(size, 8); // 8 buffers per class
136        }
137
138        Self {
139            size_classes,
140            memory_limit: DEFAULT_MEMORY_LIMIT,
141            compaction_threshold: 0.8,
142            min_free_buffers: 2,
143        }
144    }
145}
146
147impl PoolConfig {
148    /// Create new configuration
149    #[must_use]
150    pub fn new() -> Self {
151        Self::default()
152    }
153
154    /// Add a size class
155    #[must_use]
156    pub fn with_size_class(mut self, size: usize, initial_count: usize) -> Self {
157        self.size_classes.insert(size, initial_count);
158        self
159    }
160
161    /// Set memory limit
162    #[must_use]
163    pub fn with_memory_limit(mut self, limit: usize) -> Self {
164        self.memory_limit = limit;
165        self
166    }
167
168    /// Set compaction threshold
169    #[must_use]
170    pub fn with_compaction_threshold(mut self, threshold: f64) -> Self {
171        self.compaction_threshold = threshold;
172        self
173    }
174
175    /// Set minimum free buffers
176    #[must_use]
177    pub fn with_min_free_buffers(mut self, count: usize) -> Self {
178        self.min_free_buffers = count;
179        self
180    }
181}
182
183/// Buffer handle from pool
184pub struct PooledBuffer {
185    ptr: NonNull<u8>,
186    size: usize,
187    pool: Arc<PoolInner>,
188}
189
190impl PooledBuffer {
191    /// Get the buffer size
192    #[must_use]
193    pub fn size(&self) -> usize {
194        self.size
195    }
196
197    /// Get as slice
198    #[must_use]
199    pub fn as_slice(&self) -> &[u8] {
200        // SAFETY: ptr and size are valid. The buffer was properly allocated
201        // and we have shared access.
202        unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.size) }
203    }
204
205    /// Get as mutable slice
206    pub fn as_mut_slice(&mut self) -> &mut [u8] {
207        // SAFETY: ptr and size are valid. We have exclusive mutable access.
208        unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.size) }
209    }
210
211    /// Get typed slice
212    pub fn as_typed_slice<T: bytemuck::Pod>(&self) -> Result<&[T]> {
213        if self.size % std::mem::size_of::<T>() != 0 {
214            return Err(OxiGdalError::invalid_parameter(
215                "parameter",
216                "Buffer size not aligned to type".to_string(),
217            ));
218        }
219        let count = self.size / std::mem::size_of::<T>();
220        // SAFETY: Size alignment verified. The pointer is valid and count
221        // is within bounds. bytemuck::Pod ensures T is safe to read.
222        Ok(unsafe { std::slice::from_raw_parts(self.ptr.as_ptr() as *const T, count) })
223    }
224
225    /// Get typed mutable slice
226    pub fn as_typed_mut_slice<T: bytemuck::Pod>(&mut self) -> Result<&mut [T]> {
227        if self.size % std::mem::size_of::<T>() != 0 {
228            return Err(OxiGdalError::invalid_parameter(
229                "parameter",
230                "Buffer size not aligned to type".to_string(),
231            ));
232        }
233        let count = self.size / std::mem::size_of::<T>();
234        // SAFETY: Size alignment verified. We have exclusive mutable access.
235        Ok(unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr().cast::<T>(), count) })
236    }
237}
238
239impl Drop for PooledBuffer {
240    fn drop(&mut self) {
241        self.pool.return_buffer(self.ptr, self.size);
242    }
243}
244
245/// Internal pool state
246struct PoolInner {
247    /// Free buffers by size class
248    free_buffers: RwLock<HashMap<usize, Vec<NonNull<u8>>>>,
249    /// Configuration
250    config: PoolConfig,
251    /// Statistics
252    stats: Arc<PoolStats>,
253}
254
255impl PoolInner {
256    fn new(config: PoolConfig) -> Result<Self> {
257        let mut free_buffers = HashMap::new();
258
259        // Pre-allocate initial buffers
260        for (&size, &count) in &config.size_classes {
261            let mut buffers = Vec::new();
262            for _ in 0..count {
263                let ptr = Self::allocate_buffer(size)?;
264                buffers.push(ptr);
265            }
266            free_buffers.insert(size, buffers);
267        }
268
269        Ok(Self {
270            free_buffers: RwLock::new(free_buffers),
271            config,
272            stats: Arc::new(PoolStats::new()),
273        })
274    }
275
276    fn allocate_buffer(size: usize) -> Result<NonNull<u8>> {
277        let layout = Layout::from_size_align(size, 16)
278            .map_err(|e| OxiGdalError::allocation_error(e.to_string()))?;
279
280        // SAFETY: Layout is valid and we check for null after allocation.
281        unsafe {
282            let ptr = alloc(layout);
283            if ptr.is_null() {
284                return Err(OxiGdalError::allocation_error(
285                    "Failed to allocate buffer".to_string(),
286                ));
287            }
288            Ok(NonNull::new_unchecked(ptr))
289        }
290    }
291
292    fn get_buffer(&self, size: usize) -> Result<NonNull<u8>> {
293        // Find appropriate size class
294        let size_class = SIZE_CLASSES
295            .iter()
296            .find(|&&s| s >= size)
297            .copied()
298            .unwrap_or_else(|| size.next_power_of_two());
299
300        // Try to get from pool
301        {
302            let mut free_buffers = self.free_buffers.write();
303            if let Some(buffers) = free_buffers.get_mut(&size_class) {
304                if let Some(ptr) = buffers.pop() {
305                    self.stats.record_allocation(true);
306                    return Ok(ptr);
307                }
308            }
309        }
310
311        // Pool miss, allocate new buffer
312        self.stats.record_allocation(false);
313        Self::allocate_buffer(size_class)
314    }
315
316    fn return_buffer(&self, ptr: NonNull<u8>, size: usize) {
317        // Find appropriate size class
318        let size_class = SIZE_CLASSES
319            .iter()
320            .find(|&&s| s >= size)
321            .copied()
322            .unwrap_or_else(|| size.next_power_of_two());
323
324        let mut free_buffers = self.free_buffers.write();
325
326        // Check if we're over the memory limit
327        let current_size = self.stats.current_size();
328        let threshold =
329            (self.config.memory_limit as f64 * self.config.compaction_threshold) as usize;
330
331        if current_size >= threshold {
332            // Don't return to pool, deallocate immediately
333            drop(free_buffers);
334            // SAFETY: Layout matches allocation. We're cleaning up at threshold.
335            unsafe {
336                let layout = Layout::from_size_align_unchecked(size_class, 16);
337                dealloc(ptr.as_ptr(), layout);
338            }
339            return;
340        }
341
342        // Return to pool
343        free_buffers.entry(size_class).or_default().push(ptr);
344        self.stats.record_deallocation(size_class);
345    }
346
347    fn compact(&self) -> Result<()> {
348        let mut free_buffers = self.free_buffers.write();
349        let mut bytes_freed = 0;
350
351        // Keep only min_free_buffers in each size class
352        for (&size, buffers) in free_buffers.iter_mut() {
353            while buffers.len() > self.config.min_free_buffers {
354                if let Some(ptr) = buffers.pop() {
355                    // SAFETY: Layout matches allocation. Cleaning up excess buffers.
356                    unsafe {
357                        let layout = Layout::from_size_align_unchecked(size, 16);
358                        dealloc(ptr.as_ptr(), layout);
359                    }
360                    bytes_freed += size;
361                }
362            }
363        }
364
365        self.stats.record_compaction(bytes_freed);
366        Ok(())
367    }
368}
369
370impl Drop for PoolInner {
371    fn drop(&mut self) {
372        let free_buffers = self.free_buffers.write();
373        for (&size, buffers) in free_buffers.iter() {
374            for &ptr in buffers {
375                // SAFETY: Layout matches allocation during pool cleanup.
376                unsafe {
377                    let layout = Layout::from_size_align_unchecked(size, 16);
378                    dealloc(ptr.as_ptr(), layout);
379                }
380            }
381        }
382    }
383}
384
385// SAFETY: PoolInner can be safely sent between threads because:
386// - All access to the internal NonNull<u8> pointers is protected by RwLock
387// - The pointers represent heap-allocated memory that is valid across threads
388// - No thread-local state is accessed
389unsafe impl Send for PoolInner {}
390
391// SAFETY: PoolInner can be safely shared between threads because:
392// - All mutable access to internal state is protected by RwLock
393// - NonNull<u8> pointers are just addresses that can be safely read concurrently
394// - Atomic operations protect statistics
395unsafe impl Sync for PoolInner {}
396
397/// Memory pool for buffer reuse
398pub struct Pool {
399    inner: Arc<PoolInner>,
400}
401
402impl Pool {
403    /// Create a new pool with default configuration
404    pub fn new() -> Result<Self> {
405        Self::with_config(PoolConfig::default())
406    }
407
408    /// Create a new pool with custom configuration
409    pub fn with_config(config: PoolConfig) -> Result<Self> {
410        Ok(Self {
411            inner: Arc::new(PoolInner::new(config)?),
412        })
413    }
414
415    /// Allocate a buffer from the pool
416    pub fn allocate(&self, size: usize) -> Result<PooledBuffer> {
417        let ptr = self.inner.get_buffer(size)?;
418        Ok(PooledBuffer {
419            ptr,
420            size,
421            pool: Arc::clone(&self.inner),
422        })
423    }
424
425    /// Get statistics
426    #[must_use]
427    pub fn stats(&self) -> Arc<PoolStats> {
428        Arc::clone(&self.inner.stats)
429    }
430
431    /// Compact the pool
432    pub fn compact(&self) -> Result<()> {
433        self.inner.compact()
434    }
435
436    /// Get current pool size
437    #[must_use]
438    pub fn size(&self) -> usize {
439        self.stats().current_size()
440    }
441}
442
443impl Clone for Pool {
444    fn clone(&self) -> Self {
445        Self {
446            inner: Arc::clone(&self.inner),
447        }
448    }
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454
455    #[test]
456    fn test_pool_basic() {
457        let pool = Pool::new().expect("Failed to create memory pool");
458
459        let buffer1 = pool
460            .allocate(1024)
461            .expect("Failed to allocate 1024-byte buffer");
462        assert_eq!(buffer1.size(), 1024); // Returns requested size, not rounded size class
463
464        let buffer2 = pool
465            .allocate(2048)
466            .expect("Failed to allocate 2048-byte buffer");
467        assert_eq!(buffer2.size(), 2048);
468    }
469
470    #[test]
471    fn test_pool_reuse() {
472        let pool = Pool::new().expect("Failed to create memory pool for reuse test");
473
474        {
475            let _buffer = pool
476                .allocate(1024)
477                .expect("Failed to allocate buffer for reuse test");
478        }
479
480        let stats = pool.stats();
481        assert_eq!(stats.total_allocations.load(Ordering::Relaxed), 1);
482        assert_eq!(stats.total_deallocations.load(Ordering::Relaxed), 1);
483
484        // Allocate again - should reuse the previously deallocated buffer
485        let _buffer2 = pool
486            .allocate(1024)
487            .expect("Failed to allocate second buffer for reuse test");
488        // Cache hits may vary depending on internal pooling strategy
489        assert!(stats.cache_hits.load(Ordering::Relaxed) >= 1);
490    }
491
492    #[test]
493    fn test_pool_config() {
494        let config = PoolConfig::new()
495            .with_size_class(8192, 4)
496            .with_memory_limit(1024 * 1024)
497            .with_min_free_buffers(1);
498
499        let pool = Pool::with_config(config).expect("Failed to create pool with custom config");
500        let _buffer = pool
501            .allocate(8000)
502            .expect("Failed to allocate 8000-byte buffer");
503    }
504
505    #[test]
506    fn test_buffer_slice() {
507        let pool = Pool::new().expect("Failed to create pool for buffer slice test");
508        let mut buffer = pool
509            .allocate(1024)
510            .expect("Failed to allocate buffer for slice test");
511
512        let slice = buffer.as_mut_slice();
513        slice[0] = 42;
514
515        assert_eq!(buffer.as_slice()[0], 42);
516    }
517
518    #[test]
519    fn test_typed_buffer() {
520        let pool = Pool::new().expect("Failed to create pool for typed buffer test");
521        let mut buffer = pool
522            .allocate(4096)
523            .expect("Failed to allocate buffer for typed slice test");
524
525        let slice: &mut [u32] = buffer
526            .as_typed_mut_slice()
527            .expect("Failed to get mutable typed slice");
528        slice[0] = 12345;
529
530        let read_slice: &[u32] = buffer.as_typed_slice().expect("Failed to get typed slice");
531        assert_eq!(read_slice[0], 12345);
532    }
533
534    #[test]
535    fn test_pool_stats() {
536        let pool = Pool::new().expect("Failed to create pool for stats test");
537
538        let _b1 = pool
539            .allocate(1024)
540            .expect("Failed to allocate first buffer for stats test");
541        let _b2 = pool
542            .allocate(2048)
543            .expect("Failed to allocate second buffer for stats test");
544
545        let stats = pool.stats();
546        assert!(stats.total_allocations.load(Ordering::Relaxed) >= 2);
547    }
548}