webp_screenshot_rust/
memory_pool.rs

1//! Memory pool for efficient buffer management
2
3use parking_lot::Mutex;
4use std::collections::VecDeque;
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use crate::error::{MemoryPoolError, MemoryPoolResult};
10
11/// A buffer that automatically returns to the pool when dropped
12pub struct PooledBuffer {
13    data: Option<Vec<u8>>,
14    size: usize,
15    pool: Option<Arc<MemoryPool>>,
16}
17
18impl PooledBuffer {
19    /// Get the buffer data
20    pub fn data(&self) -> &[u8] {
21        self.data.as_ref().map(|v| v.as_slice()).unwrap_or(&[])
22    }
23
24    /// Get mutable buffer data
25    pub fn data_mut(&mut self) -> &mut [u8] {
26        self.data.as_mut().map(|v| v.as_mut_slice()).unwrap_or(&mut [])
27    }
28
29    /// Get the buffer size
30    pub fn size(&self) -> usize {
31        self.size
32    }
33
34    /// Take ownership of the buffer data (removes from pool management)
35    pub fn into_vec(mut self) -> Vec<u8> {
36        // CRITICAL: Decrement current_memory_usage since buffer is leaving pool management
37        // The caller takes ownership and the memory will be freed by the OS when Vec<u8> is dropped
38        if let Some(ref pool) = self.pool {
39            pool.stats.current_memory_usage.fetch_sub(
40                self.size,
41                Ordering::Relaxed,
42            );
43        }
44
45        self.pool = None; // Prevent returning to pool
46        self.data.take().unwrap_or_default()
47    }
48}
49
50impl Drop for PooledBuffer {
51    fn drop(&mut self) {
52        if let (Some(pool), Some(data)) = (self.pool.take(), self.data.take()) {
53            pool.release_internal(data, self.size);
54        }
55    }
56}
57
58/// Buffer metadata for pool management
59#[derive(Debug)]
60struct BufferEntry {
61    buffer: Vec<u8>,
62    size: usize,
63    last_used: Instant,
64    use_count: u32,
65}
66
67impl BufferEntry {
68    fn new(buffer: Vec<u8>, size: usize) -> Self {
69        Self {
70            buffer,
71            size,
72            last_used: Instant::now(),
73            use_count: 0,
74        }
75    }
76
77    fn matches_size(&self, requested_size: usize) -> bool {
78        self.size >= requested_size && self.size <= requested_size * 2
79    }
80
81    fn is_expired(&self, timeout: Duration) -> bool {
82        self.last_used.elapsed() > timeout
83    }
84}
85
86/// Statistics for the memory pool
87#[derive(Debug, Clone, Default)]
88pub struct PoolStats {
89    pub available_buffers: usize,
90    pub total_buffers_created: u64,
91    pub total_memory_allocated: usize,
92    pub peak_memory_usage: usize,
93    pub memory_reuse_count: u64,
94    pub current_memory_usage: usize,  // Active (in-use) allocations only
95    pub pooled_memory: usize,          // Memory held in pool buffers
96    pub buffer_hits: u64,
97    pub buffer_misses: u64,
98}
99
100/// Memory pool for efficient buffer reuse
101pub struct MemoryPool {
102    inner: Arc<Mutex<MemoryPoolInner>>,
103    stats: Arc<PoolStatsAtomic>,
104    config: PoolConfig,
105}
106
107struct MemoryPoolInner {
108    buffers: VecDeque<BufferEntry>,
109}
110
111struct PoolStatsAtomic {
112    total_buffers_created: AtomicU64,
113    memory_reuse_count: AtomicU64,
114    peak_memory_usage: AtomicUsize,
115    current_memory_usage: AtomicUsize,
116    buffer_hits: AtomicU64,
117    buffer_misses: AtomicU64,
118}
119
120/// Configuration for the memory pool
121#[derive(Debug, Clone)]
122pub struct PoolConfig {
123    /// Maximum number of buffers to keep in the pool
124    pub max_buffers: usize,
125    /// Maximum total memory to keep allocated (bytes)
126    pub max_memory: usize,
127    /// Buffer expiration timeout
128    pub buffer_timeout: Duration,
129    /// Whether to pre-allocate buffers
130    pub preallocate: bool,
131    /// Default buffer size for pre-allocation
132    pub default_buffer_size: usize,
133}
134
135impl Default for PoolConfig {
136    fn default() -> Self {
137        Self {
138            max_buffers: 10,
139            max_memory: 500 * 1024 * 1024, // 500 MB (2x to account for OCR processing time)
140            buffer_timeout: Duration::from_secs(60),
141            preallocate: false,
142            default_buffer_size: 1920 * 1080 * 4, // Full HD RGBA
143        }
144    }
145}
146
147impl MemoryPool {
148    /// Create a new memory pool with default configuration
149    pub fn new() -> Arc<Self> {
150        Self::with_config(PoolConfig::default())
151    }
152
153    /// Create a new memory pool with custom configuration
154    pub fn with_config(config: PoolConfig) -> Arc<Self> {
155        let pool = Arc::new(Self {
156            inner: Arc::new(Mutex::new(MemoryPoolInner {
157                buffers: VecDeque::new(),
158            })),
159            stats: Arc::new(PoolStatsAtomic {
160                total_buffers_created: AtomicU64::new(0),
161                memory_reuse_count: AtomicU64::new(0),
162                peak_memory_usage: AtomicUsize::new(0),
163                current_memory_usage: AtomicUsize::new(0),
164                buffer_hits: AtomicU64::new(0),
165                buffer_misses: AtomicU64::new(0),
166            }),
167            config,
168        });
169
170        if pool.config.preallocate {
171            pool.preallocate_buffers();
172        }
173
174        pool
175    }
176
177    /// Pre-allocate buffers for better performance
178    fn preallocate_buffers(&self) {
179        let mut inner = self.inner.lock();
180        let num_buffers = self.config.max_buffers.min(4);
181
182        for _ in 0..num_buffers {
183            let buffer = vec![0u8; self.config.default_buffer_size];
184            let entry = BufferEntry::new(buffer, self.config.default_buffer_size);
185            inner.buffers.push_back(entry);
186
187            self.stats
188                .total_buffers_created
189                .fetch_add(1, Ordering::Relaxed);
190            // Pre-allocated buffers are in the pool, so they're not "active"
191            // Don't add to current_memory_usage
192        }
193    }
194
195    /// Acquire a buffer from the pool
196    pub fn acquire(self: &Arc<Self>, size: usize) -> MemoryPoolResult<PooledBuffer> {
197        if size == 0 {
198            return Err(MemoryPoolError::InvalidBufferSize { size });
199        }
200
201        let mut inner = self.inner.lock();
202
203        // Clean up expired buffers
204        self.cleanup_expired_buffers(&mut inner);
205
206        // Diagnostic logging for large allocations
207        if size > 10 * 1024 * 1024 {
208            let current = self.stats.current_memory_usage.load(Ordering::Relaxed);
209            eprintln!(
210                "[MemoryPool] Large allocation requested: {:.2} MB, current pool usage: {:.2} MB / {:.2} MB",
211                size as f64 / (1024.0 * 1024.0),
212                current as f64 / (1024.0 * 1024.0),
213                self.config.max_memory as f64 / (1024.0 * 1024.0)
214            );
215        }
216
217        // Try to find a suitable buffer
218        if let Some(index) = self.find_suitable_buffer(&inner, size) {
219            let mut entry = inner.buffers.remove(index).unwrap();
220            entry.last_used = Instant::now();
221            entry.use_count += 1;
222
223            // When taking buffer from pool, it becomes "active" again
224            // Increment current_memory_usage to track active allocations
225            self.stats.current_memory_usage.fetch_add(
226                entry.size,
227                Ordering::Relaxed,
228            );
229
230            // Resize if necessary
231            if entry.buffer.len() < size {
232                let size_diff = size - entry.size;
233                entry.buffer.resize(size, 0);
234                self.stats.current_memory_usage.fetch_add(
235                    size_diff,
236                    Ordering::Relaxed,
237                );
238                entry.size = size;
239            }
240
241            self.stats.buffer_hits.fetch_add(1, Ordering::Relaxed);
242            self.stats.memory_reuse_count.fetch_add(1, Ordering::Relaxed);
243
244            return Ok(PooledBuffer {
245                data: Some(entry.buffer),
246                size,
247                pool: Some(Arc::clone(self)),
248            });
249        }
250
251        drop(inner); // Release lock before allocation
252
253        // No suitable buffer found, allocate new one
254        self.stats.buffer_misses.fetch_add(1, Ordering::Relaxed);
255        self.allocate_new_buffer(size)
256    }
257
258    /// Find a suitable buffer in the pool
259    fn find_suitable_buffer(&self, inner: &MemoryPoolInner, size: usize) -> Option<usize> {
260        inner
261            .buffers
262            .iter()
263            .position(|entry| entry.matches_size(size))
264    }
265
266    /// Allocate a new buffer
267    fn allocate_new_buffer(self: &Arc<Self>, size: usize) -> MemoryPoolResult<PooledBuffer> {
268        // Check if we can allocate more memory
269        let current_memory = self.stats.current_memory_usage.load(Ordering::Relaxed);
270        if current_memory + size > self.config.max_memory {
271            // Pool is full - allow direct allocation without tracking in pool
272            // This provides a fallback when pool is exhausted
273            eprintln!(
274                "[MemoryPool] Pool limit reached ({} MB), allocating {:.2} MB directly without pooling",
275                self.config.max_memory / (1024 * 1024),
276                size as f64 / (1024.0 * 1024.0)
277            );
278
279            let buffer = vec![0u8; size];
280            return Ok(PooledBuffer {
281                data: Some(buffer),
282                size,
283                pool: None, // No pool reference - buffer will be dropped normally
284            });
285        }
286
287        let buffer = vec![0u8; size];
288        self.stats
289            .total_buffers_created
290            .fetch_add(1, Ordering::Relaxed);
291        self.stats
292            .current_memory_usage
293            .fetch_add(size, Ordering::Relaxed);
294        self.update_peak_memory();
295
296        Ok(PooledBuffer {
297            data: Some(buffer),
298            size,
299            pool: Some(Arc::clone(self)),
300        })
301    }
302
303    /// Internal method to release a buffer back to the pool
304    fn release_internal(&self, buffer: Vec<u8>, size: usize) {
305        let mut inner = self.inner.lock();
306
307        // IMPORTANT: When buffer is returned to pool, it's no longer "active"
308        // Decrement current_memory_usage as it only tracks ACTIVE allocations
309        // NOTE: This is also done in into_vec() when buffer leaves pool management entirely
310        // Pooled buffers don't count toward the limit for new allocations
311        self.stats
312            .current_memory_usage
313            .fetch_sub(size, Ordering::Relaxed);
314
315        // Check if we should keep this buffer in the pool
316        if inner.buffers.len() >= self.config.max_buffers {
317            // Pool is full, drop the buffer (already decremented memory above)
318            return;
319        }
320
321        // Keep buffer in pool for reuse
322        let entry = BufferEntry::new(buffer, size);
323        inner.buffers.push_back(entry);
324    }
325
326    /// Clean up expired buffers
327    fn cleanup_expired_buffers(&self, inner: &mut MemoryPoolInner) {
328        let timeout = self.config.buffer_timeout;
329
330        // Simply remove expired buffers from pool
331        // No need to adjust current_memory_usage since pooled buffers aren't counted
332        inner.buffers.retain(|entry| !entry.is_expired(timeout));
333    }
334
335    /// Update peak memory usage
336    fn update_peak_memory(&self) {
337        let current = self.stats.current_memory_usage.load(Ordering::Relaxed);
338        let mut peak = self.stats.peak_memory_usage.load(Ordering::Relaxed);
339
340        while current > peak {
341            match self.stats.peak_memory_usage.compare_exchange(
342                peak,
343                current,
344                Ordering::Relaxed,
345                Ordering::Relaxed,
346            ) {
347                Ok(_) => break,
348                Err(x) => peak = x,
349            }
350        }
351    }
352
353    /// Clear all buffers from the pool
354    pub fn clear(&self) {
355        let mut inner = self.inner.lock();
356        // Simply clear pooled buffers
357        // No need to adjust current_memory_usage since pooled buffers aren't counted
358        inner.buffers.clear();
359    }
360
361    /// Get current pool statistics
362    pub fn stats(&self) -> PoolStats {
363        let inner = self.inner.lock();
364
365        let pooled_memory: usize = inner.buffers.iter().map(|e| e.size).sum();
366
367        PoolStats {
368            available_buffers: inner.buffers.len(),
369            total_buffers_created: self.stats.total_buffers_created.load(Ordering::Relaxed),
370            total_memory_allocated: pooled_memory + self.stats.current_memory_usage.load(Ordering::Relaxed),
371            peak_memory_usage: self.stats.peak_memory_usage.load(Ordering::Relaxed),
372            memory_reuse_count: self.stats.memory_reuse_count.load(Ordering::Relaxed),
373            current_memory_usage: self.stats.current_memory_usage.load(Ordering::Relaxed),
374            pooled_memory,
375            buffer_hits: self.stats.buffer_hits.load(Ordering::Relaxed),
376            buffer_misses: self.stats.buffer_misses.load(Ordering::Relaxed),
377        }
378    }
379
380    /// Get hit rate percentage
381    pub fn hit_rate(&self) -> f64 {
382        let hits = self.stats.buffer_hits.load(Ordering::Relaxed);
383        let misses = self.stats.buffer_misses.load(Ordering::Relaxed);
384        let total = hits + misses;
385
386        if total == 0 {
387            0.0
388        } else {
389            (hits as f64 / total as f64) * 100.0
390        }
391    }
392}
393
394impl Default for MemoryPool {
395    fn default() -> Self {
396        Arc::try_unwrap(Self::new()).unwrap_or_else(|arc| (*arc).clone())
397    }
398}
399
400impl Clone for MemoryPool {
401    fn clone(&self) -> Self {
402        Self {
403            inner: Arc::clone(&self.inner),
404            stats: Arc::clone(&self.stats),
405            config: self.config.clone(),
406        }
407    }
408}
409
410/// Global memory pool instance
411static GLOBAL_POOL: once_cell::sync::Lazy<Arc<MemoryPool>> =
412    once_cell::sync::Lazy::new(|| MemoryPool::new());
413
414/// Get the global memory pool instance
415pub fn global_pool() -> Arc<MemoryPool> {
416    Arc::clone(&GLOBAL_POOL)
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422
423    #[test]
424    fn test_pool_acquire_release() {
425        let pool = MemoryPool::new();
426        let buffer = pool.acquire(1024).unwrap();
427        assert_eq!(buffer.size(), 1024);
428        drop(buffer);
429
430        let stats = pool.stats();
431        assert_eq!(stats.available_buffers, 1);
432    }
433
434    #[test]
435    fn test_pool_reuse() {
436        let pool = MemoryPool::new();
437
438        let buffer1 = pool.acquire(1024).unwrap();
439        drop(buffer1);
440
441        let buffer2 = pool.acquire(1024).unwrap();
442        drop(buffer2);
443
444        let stats = pool.stats();
445        assert_eq!(stats.memory_reuse_count, 1);
446        assert_eq!(stats.total_buffers_created, 1);
447    }
448
449    #[test]
450    fn test_pool_size_matching() {
451        let pool = MemoryPool::new();
452
453        let buffer1 = pool.acquire(1024).unwrap();
454        drop(buffer1);
455
456        // Should reuse the 1024 buffer for 512 request
457        let buffer2 = pool.acquire(512).unwrap();
458        drop(buffer2);
459
460        let stats = pool.stats();
461        assert_eq!(stats.memory_reuse_count, 1);
462    }
463
464    #[test]
465    fn test_hit_rate() {
466        let pool = MemoryPool::new();
467
468        let _buffer1 = pool.acquire(1024).unwrap();
469        drop(_buffer1);
470        let _buffer2 = pool.acquire(1024).unwrap();
471
472        assert!(pool.hit_rate() > 0.0);
473    }
474}