Skip to main content

ff_decode/
pool.rs

1//! Frame pool for memory reuse.
2//!
3//! This module provides the [`SimpleFramePool`] which enables memory pooling
4//! for decoded frames, reducing allocation overhead during video playback.
5//!
6//! # Examples
7//!
8//! ```ignore
9//! use ff_decode::SimpleFramePool;
10//!
11//! // Create a decoder with a simple frame pool (automatically initialized)
12//! let pool = SimpleFramePool::new(32);
13//! let decoder = VideoDecoder::open("video.mp4")?
14//!     .frame_pool(pool)
15//!     .build()?;
16//! ```
17
18use std::sync::{Arc, Mutex, Weak};
19
20// Re-export types from ff-common
21pub use ff_common::{FramePool, PooledBuffer};
22
23/// A simple frame pool implementation with fixed capacity.
24///
25/// This pool stores a fixed number of frame buffers and reuses them
26/// during video decoding. When the pool is empty, callers must allocate
27/// new buffers directly.
28///
29/// # Thread Safety
30///
31/// This implementation uses a [`Mutex`] internally, making it safe to
32/// share across threads.
33///
34/// # Examples
35///
36/// ```ignore
37/// use ff_decode::{VideoDecoder, SimpleFramePool};
38///
39/// // Create a pool with capacity for 32 frames (automatically initialized)
40/// let pool = SimpleFramePool::new(32);
41///
42/// let decoder = VideoDecoder::open("video.mp4")?
43///     .frame_pool(pool)
44///     .build()?;
45///
46/// // Frames are acquired from the pool during decoding
47/// for frame in decoder.frames().take(100) {
48///     let frame = frame?;
49///     // Process frame...
50/// }
51/// ```
52#[derive(Debug)]
53pub struct SimpleFramePool {
54    /// Maximum number of buffers to keep in the pool
55    max_capacity: usize,
56    /// Pool of available buffers
57    buffers: Mutex<Vec<Vec<u8>>>,
58    /// Weak self-reference for creating `PooledBuffers`
59    self_ref: Mutex<Weak<Self>>,
60}
61
62impl SimpleFramePool {
63    /// Creates a new frame pool with the specified maximum capacity.
64    ///
65    /// This function uses RAII (Resource Acquisition Is Initialization) pattern
66    /// and automatically initializes the pool's self-reference, eliminating the
67    /// need for a separate initialization step.
68    ///
69    /// # Arguments
70    ///
71    /// * `max_capacity` - Maximum number of buffers to keep in the pool.
72    ///   When the pool is full, returned buffers are dropped instead of
73    ///   being stored.
74    ///
75    /// # Examples
76    ///
77    /// ```
78    /// use ff_decode::SimpleFramePool;
79    /// use std::sync::Arc;
80    ///
81    /// // Create a pool for 32 frames - automatically initialized
82    /// let pool = SimpleFramePool::new(32);
83    ///
84    /// // Use with decoder
85    /// # /*
86    /// let decoder = VideoDecoder::open("video.mp4")?
87    ///     .frame_pool(pool)
88    ///     .build()?;
89    /// # */
90    /// ```
91    #[must_use]
92    pub fn new(max_capacity: usize) -> Arc<Self> {
93        let pool = Arc::new(Self {
94            max_capacity,
95            buffers: Mutex::new(Vec::with_capacity(max_capacity)),
96            self_ref: Mutex::new(Weak::new()),
97        });
98
99        // Auto-initialize self-reference using RAII pattern
100        if let Ok(mut self_ref) = pool.self_ref.lock() {
101            *self_ref = Arc::downgrade(&pool);
102        }
103
104        pool
105    }
106
107    /// Returns the maximum capacity of this pool.
108    #[must_use]
109    pub fn max_capacity(&self) -> usize {
110        self.max_capacity
111    }
112
113    /// Returns the current number of buffers available in the pool.
114    ///
115    /// # Examples
116    ///
117    /// ```
118    /// use ff_decode::SimpleFramePool;
119    ///
120    /// let pool = SimpleFramePool::new(32);
121    /// assert_eq!(pool.available(), 0); // Pool starts empty
122    /// ```
123    #[must_use]
124    pub fn available(&self) -> usize {
125        self.buffers.lock().map_or(0, |b| b.len())
126    }
127}
128
129impl FramePool for SimpleFramePool {
130    fn acquire(&self, size: usize) -> Option<PooledBuffer> {
131        // Try to get a buffer from the pool
132        if let Ok(mut buffers) = self.buffers.lock() {
133            // Find a buffer that has sufficient capacity
134            // We prefer to find the smallest buffer that fits to avoid wasting memory
135            let suitable_idx = buffers
136                .iter()
137                .enumerate()
138                .filter(|(_, b)| b.capacity() >= size)
139                .min_by_key(|(_, b)| b.capacity())
140                .map(|(idx, _)| idx);
141
142            if let Some(idx) = suitable_idx {
143                let mut buf = buffers.swap_remove(idx);
144
145                // Resize to the requested size (within existing capacity)
146                buf.resize(size, 0);
147
148                // Zero the entire buffer to ensure clean state
149                // Note: resize() only zeros new elements, not existing ones
150                buf.fill(0);
151
152                // Get weak reference to self for the PooledBuffer
153                let weak_ref = self
154                    .self_ref
155                    .lock()
156                    .ok()
157                    .and_then(|r| r.upgrade())
158                    .map(|arc| Arc::downgrade(&(arc as Arc<dyn FramePool>)))?;
159
160                // Return pooled buffer
161                return Some(PooledBuffer::new(buf, weak_ref));
162            }
163        }
164
165        // Pool is empty or no suitable buffer found - return None
166        // Caller will allocate a new buffer
167        None
168    }
169
170    fn release(&self, buffer: Vec<u8>) {
171        if let Ok(mut buffers) = self.buffers.lock() {
172            // Only keep the buffer if we haven't reached max capacity
173            if buffers.len() < self.max_capacity {
174                buffers.push(buffer);
175            }
176            // Otherwise, drop the buffer (it will be freed)
177        }
178    }
179}
180
181#[cfg(test)]
182#[allow(clippy::panic)]
183mod tests {
184    use super::*;
185    use std::sync::{Arc, Mutex, atomic::AtomicUsize, atomic::Ordering};
186
187    #[derive(Debug)]
188    struct TestPool {
189        buffers: Mutex<Vec<Vec<u8>>>,
190        acquire_count: AtomicUsize,
191        release_count: AtomicUsize,
192    }
193
194    impl TestPool {
195        fn new(count: usize, size: usize) -> Self {
196            let buffers: Vec<Vec<u8>> = (0..count).map(|_| vec![0u8; size]).collect();
197            Self {
198                buffers: Mutex::new(buffers),
199                acquire_count: AtomicUsize::new(0),
200                release_count: AtomicUsize::new(0),
201            }
202        }
203    }
204
205    impl FramePool for TestPool {
206        fn acquire(&self, size: usize) -> Option<PooledBuffer> {
207            let mut buffers = self.buffers.lock().ok()?;
208            // Find a buffer of sufficient size
209            if let Some(idx) = buffers.iter().position(|b| b.len() >= size) {
210                let buf = buffers.swap_remove(idx);
211                self.acquire_count.fetch_add(1, Ordering::SeqCst);
212                // We can't return a proper PooledBuffer here since we need Arc<Self>
213                // For testing, return a standalone buffer
214                Some(PooledBuffer::standalone(buf))
215            } else {
216                None
217            }
218        }
219
220        fn release(&self, buffer: Vec<u8>) {
221            if let Ok(mut buffers) = self.buffers.lock() {
222                buffers.push(buffer);
223                self.release_count.fetch_add(1, Ordering::SeqCst);
224            }
225        }
226    }
227
228    #[test]
229    fn test_pooled_buffer_standalone() {
230        let data = vec![1u8, 2, 3, 4, 5];
231        let buffer = PooledBuffer::standalone(data.clone());
232
233        assert_eq!(buffer.len(), 5);
234        assert!(!buffer.is_empty());
235        assert_eq!(buffer.data(), &[1, 2, 3, 4, 5]);
236    }
237
238    #[test]
239    fn test_pooled_buffer_data_mut() {
240        let mut buffer = PooledBuffer::standalone(vec![0u8; 4]);
241        buffer.data_mut()[0] = 42;
242        assert_eq!(buffer.data()[0], 42);
243    }
244
245    #[test]
246    fn test_pooled_buffer_into_inner() {
247        let buffer = PooledBuffer::standalone(vec![1, 2, 3]);
248        let inner = buffer.into_inner();
249        assert_eq!(inner, vec![1, 2, 3]);
250    }
251
252    #[test]
253    fn test_pooled_buffer_as_ref() {
254        let buffer = PooledBuffer::standalone(vec![1, 2, 3]);
255        let slice: &[u8] = buffer.as_ref();
256        assert_eq!(slice, &[1, 2, 3]);
257    }
258
259    #[test]
260    fn test_pooled_buffer_as_mut() {
261        let mut buffer = PooledBuffer::standalone(vec![1, 2, 3]);
262        let slice: &mut [u8] = buffer.as_mut();
263        slice[0] = 99;
264        assert_eq!(buffer.data(), &[99, 2, 3]);
265    }
266
267    #[test]
268    fn test_empty_buffer() {
269        let buffer = PooledBuffer::standalone(vec![]);
270        assert!(buffer.is_empty());
271        assert_eq!(buffer.len(), 0);
272    }
273
274    #[test]
275    fn test_pool_acquire() {
276        let pool = TestPool::new(2, 1024);
277        let buffer = pool.acquire(512);
278        assert!(buffer.is_some());
279        assert!(buffer.as_ref().is_some_and(|b| b.len() >= 512));
280    }
281
282    #[test]
283    fn test_pool_acquire_too_large() {
284        let pool = TestPool::new(2, 512);
285        let buffer = pool.acquire(1024);
286        assert!(buffer.is_none());
287    }
288
289    #[test]
290    fn test_pool_with_arc_release() {
291        #[derive(Debug)]
292        struct ArcPool {
293            buffers: Mutex<Vec<Vec<u8>>>,
294            release_count: AtomicUsize,
295        }
296
297        impl FramePool for ArcPool {
298            fn acquire(&self, _size: usize) -> Option<PooledBuffer> {
299                None // Not used in this test
300            }
301
302            fn release(&self, buffer: Vec<u8>) {
303                if let Ok(mut buffers) = self.buffers.lock() {
304                    buffers.push(buffer);
305                    self.release_count.fetch_add(1, Ordering::SeqCst);
306                }
307            }
308        }
309
310        let pool = Arc::new(ArcPool {
311            buffers: Mutex::new(vec![]),
312            release_count: AtomicUsize::new(0),
313        });
314
315        // Create a buffer with pool reference
316        {
317            let _buffer =
318                PooledBuffer::new(vec![1, 2, 3], Arc::downgrade(&pool) as Weak<dyn FramePool>);
319            // Buffer is dropped here
320        }
321
322        // Verify the buffer was returned to the pool
323        assert_eq!(pool.release_count.load(Ordering::SeqCst), 1);
324        assert!(pool.buffers.lock().map(|b| b.len() == 1).unwrap_or(false));
325    }
326
327    #[test]
328    fn test_pool_dropped_before_buffer() {
329        #[derive(Debug)]
330        struct DroppablePool;
331
332        impl FramePool for DroppablePool {
333            fn acquire(&self, _size: usize) -> Option<PooledBuffer> {
334                None
335            }
336
337            fn release(&self, _buffer: Vec<u8>) {
338                // This should NOT be called if pool is dropped
339                panic!("release should not be called on dropped pool");
340            }
341        }
342
343        let buffer;
344        {
345            let pool = Arc::new(DroppablePool);
346            buffer = PooledBuffer::new(vec![1, 2, 3], Arc::downgrade(&pool) as Weak<dyn FramePool>);
347            // Pool is dropped here
348        }
349
350        // Buffer can still be used
351        assert_eq!(buffer.data(), &[1, 2, 3]);
352
353        // Dropping buffer should not panic (pool is already gone)
354        drop(buffer);
355    }
356
357    // SimpleFramePool tests
358    #[test]
359    fn test_simple_frame_pool_new() {
360        let pool = SimpleFramePool::new(32);
361        assert_eq!(pool.max_capacity(), 32);
362        assert_eq!(pool.available(), 0);
363    }
364
365    #[test]
366    fn test_simple_frame_pool_acquire_empty() {
367        let pool = SimpleFramePool::new(8);
368
369        // Pool is empty, should return None
370        let buffer = pool.acquire(1024);
371        assert!(buffer.is_none());
372    }
373
374    #[test]
375    fn test_simple_frame_pool_acquire_and_release() {
376        let pool = SimpleFramePool::new(8);
377
378        // Manually add a buffer to the pool
379        pool.release(vec![0u8; 1024]);
380        assert_eq!(pool.available(), 1);
381
382        // Acquire the buffer
383        let buffer = pool.acquire(512);
384        assert!(buffer.is_some());
385        assert_eq!(pool.available(), 0);
386
387        let buffer = buffer.unwrap();
388        assert_eq!(buffer.len(), 512);
389    }
390
391    #[test]
392    fn test_simple_frame_pool_buffer_auto_return() {
393        let pool = SimpleFramePool::new(8);
394
395        // Add a buffer and acquire it
396        pool.release(vec![0u8; 2048]);
397        assert_eq!(pool.available(), 1);
398
399        {
400            let _buffer = pool.acquire(1024).unwrap();
401            assert_eq!(pool.available(), 0);
402            // Buffer is dropped here
403        }
404
405        // Buffer should be returned to pool
406        assert_eq!(pool.available(), 1);
407    }
408
409    #[test]
410    fn test_simple_frame_pool_max_capacity() {
411        let pool = SimpleFramePool::new(2);
412
413        // Add 3 buffers, but pool only holds 2
414        pool.release(vec![0u8; 512]);
415        pool.release(vec![0u8; 512]);
416        pool.release(vec![0u8; 512]);
417
418        // Pool should only contain 2 buffers
419        assert_eq!(pool.available(), 2);
420    }
421
422    #[test]
423    fn test_simple_frame_pool_buffer_reuse() {
424        let pool = SimpleFramePool::new(4);
425
426        // Add buffer to pool
427        pool.release(vec![42u8; 1024]);
428        assert_eq!(pool.available(), 1);
429
430        // Acquire and check it gets resized
431        let buffer = pool.acquire(512).unwrap();
432        assert_eq!(buffer.len(), 512);
433        assert!(buffer.data().iter().all(|&b| b == 0)); // Should be zeroed by resize
434
435        drop(buffer);
436        assert_eq!(pool.available(), 1);
437
438        // Acquire same-size buffer - should reuse from pool
439        let buffer = pool.acquire(512).unwrap();
440        assert_eq!(buffer.len(), 512);
441        assert!(buffer.data().iter().all(|&b| b == 0));
442
443        drop(buffer);
444        assert_eq!(pool.available(), 1);
445
446        // Acquire larger buffer within capacity - should reuse from pool
447        let buffer = pool.acquire(1024).unwrap();
448        assert_eq!(buffer.len(), 1024);
449    }
450
451    #[test]
452    fn test_simple_frame_pool_find_suitable_buffer() {
453        let pool = SimpleFramePool::new(8);
454
455        // Add buffers of different sizes
456        pool.release(vec![0u8; 512]);
457        pool.release(vec![0u8; 1024]);
458        pool.release(vec![0u8; 2048]);
459        assert_eq!(pool.available(), 3);
460
461        // Request 1000 bytes - should get the 1024 buffer
462        let buffer = pool.acquire(1000).unwrap();
463        assert!(buffer.len() >= 1000);
464        assert_eq!(pool.available(), 2);
465
466        drop(buffer);
467        assert_eq!(pool.available(), 3);
468    }
469
470    #[test]
471    fn test_simple_frame_pool_acquire_too_large() {
472        let pool = SimpleFramePool::new(4);
473
474        // Add small buffer
475        pool.release(vec![0u8; 512]);
476
477        // Request larger buffer than available
478        let buffer = pool.acquire(1024);
479        assert!(buffer.is_none());
480
481        // Original buffer should still be in pool
482        assert_eq!(pool.available(), 1);
483    }
484
485    #[test]
486    fn test_simple_frame_pool_concurrent_access() {
487        use std::thread;
488
489        let pool = SimpleFramePool::new(16);
490
491        // Pre-fill pool with buffers
492        for _ in 0..8 {
493            pool.release(vec![0u8; 1024]);
494        }
495
496        let pool1 = Arc::clone(&pool);
497        let pool2 = Arc::clone(&pool);
498
499        let handle1 = thread::spawn(move || {
500            for _ in 0..10 {
501                if let Some(buffer) = pool1.acquire(512) {
502                    drop(buffer);
503                }
504            }
505        });
506
507        let handle2 = thread::spawn(move || {
508            for _ in 0..10 {
509                if let Some(buffer) = pool2.acquire(512) {
510                    drop(buffer);
511                }
512            }
513        });
514
515        handle1.join().unwrap();
516        handle2.join().unwrap();
517
518        // All buffers should be back in the pool
519        assert!(pool.available() <= 16);
520    }
521}