Skip to main content

ff_common/
pool.rs

1//! Frame buffer pooling abstractions.
2//!
3//! This module provides the [`FramePool`] trait, [`PooledBuffer`] type, and
4//! [`VecPool`] — a ready-to-use `Mutex<Vec<Vec<u8>>>` pool implementation —
5//! which enable memory pooling for decoded frames, reducing allocation
6//! overhead during video playback.
7
8use std::sync::{Arc, Mutex, Weak};
9
10/// A trait for frame buffer pooling.
11///
12/// Implementing this trait allows custom memory management strategies
13/// for decoded video frames. This is useful for reducing allocation
14/// pressure during real-time video playback.
15///
16/// # Thread Safety
17///
18/// Implementations must be `Send + Sync` to allow sharing across threads.
19///
20/// # Example Implementation
21///
22/// ```ignore
23/// use ff_common::{FramePool, PooledBuffer};
24/// use std::sync::{Arc, Mutex};
25///
26/// struct SimplePool {
27///     buffers: Mutex<Vec<Vec<u8>>>,
28///     buffer_size: usize,
29///}
30///
31/// impl FramePool for SimplePool {
32///     fn acquire(&self, size: usize) -> Option<PooledBuffer> {
33///         // Implementation...
34///         None
35///     }
36/// }
37/// ```
38pub trait FramePool: Send + Sync + std::fmt::Debug {
39    /// Acquires a buffer of at least the specified size from the pool.
40    ///
41    /// # Arguments
42    ///
43    /// * `size` - The minimum required buffer size in bytes.
44    ///
45    /// # Returns
46    ///
47    /// Returns `Some(PooledBuffer)` if a buffer is available, or `None` if
48    /// the pool is exhausted. When `None` is returned, the decoder will
49    /// allocate a new buffer directly.
50    ///
51    /// # Thread Safety
52    ///
53    /// This method may be called from multiple threads concurrently.
54    fn acquire(&self, size: usize) -> Option<PooledBuffer>;
55
56    /// Returns a buffer to the pool.
57    ///
58    /// This method is called automatically when a [`PooledBuffer`] is dropped.
59    /// The default implementation does nothing (the buffer is simply dropped).
60    ///
61    /// # Arguments
62    ///
63    /// * `buffer` - The buffer data to return to the pool.
64    fn release(&self, _buffer: Vec<u8>) {
65        // Default: just drop the buffer
66    }
67}
68
69/// A buffer acquired from a [`FramePool`].
70///
71/// When this buffer is dropped, it is automatically returned to its
72/// parent pool if the pool still exists. This enables zero-overhead
73/// buffer reuse during video decoding.
74///
75/// # Memory Management
76///
77/// The buffer holds a weak reference to its parent pool. If the pool
78/// is dropped before the buffer, the buffer's memory is simply freed
79/// rather than being returned to the pool.
80///
81/// # Cloning
82///
83/// When cloned, the new buffer becomes a standalone buffer (no pool reference).
84/// This prevents double-free issues where both the original and cloned buffer
85/// would attempt to return the same memory to the pool.
86#[derive(Debug)]
87pub struct PooledBuffer {
88    /// The actual buffer data
89    data: Vec<u8>,
90    /// Weak reference to the parent pool for returning the buffer
91    pool: Option<Weak<dyn FramePool>>,
92}
93
94impl PooledBuffer {
95    /// Creates a new pooled buffer with a reference to its parent pool.
96    ///
97    /// # Arguments
98    ///
99    /// * `data` - The buffer data.
100    /// * `pool` - A weak reference to the parent pool.
101    #[must_use]
102    pub fn new(data: Vec<u8>, pool: Weak<dyn FramePool>) -> Self {
103        Self {
104            data,
105            pool: Some(pool),
106        }
107    }
108
109    /// Creates a new pooled buffer without a parent pool.
110    ///
111    /// This is useful for buffers allocated outside of a pool context.
112    /// When dropped, the buffer's memory is simply freed.
113    #[must_use]
114    pub fn standalone(data: Vec<u8>) -> Self {
115        Self { data, pool: None }
116    }
117
118    /// Returns a reference to the buffer data.
119    #[must_use]
120    #[inline]
121    pub fn data(&self) -> &[u8] {
122        &self.data
123    }
124
125    /// Returns a mutable reference to the buffer data.
126    #[must_use]
127    #[inline]
128    pub fn data_mut(&mut self) -> &mut [u8] {
129        &mut self.data
130    }
131
132    /// Returns the length of the buffer in bytes.
133    #[must_use]
134    #[inline]
135    pub fn len(&self) -> usize {
136        self.data.len()
137    }
138
139    /// Returns `true` if the buffer is empty.
140    #[must_use]
141    #[inline]
142    pub fn is_empty(&self) -> bool {
143        self.data.is_empty()
144    }
145
146    /// Consumes the buffer and returns the underlying data.
147    ///
148    /// After calling this, the buffer will not be returned to the pool.
149    #[must_use]
150    pub fn into_inner(mut self) -> Vec<u8> {
151        // Take ownership to prevent Drop from returning to pool
152        self.pool = None;
153        std::mem::take(&mut self.data)
154    }
155}
156
157impl Clone for PooledBuffer {
158    /// Clones the buffer data, but the cloned buffer becomes standalone.
159    ///
160    /// The cloned buffer will NOT be returned to the pool when dropped.
161    /// This prevents double-free issues where both buffers would attempt
162    /// to return the same memory to the pool.
163    ///
164    /// Only the original buffer retains its pool reference.
165    fn clone(&self) -> Self {
166        Self {
167            data: self.data.clone(),
168            pool: None, // Cloned buffer is standalone
169        }
170    }
171}
172
173impl Drop for PooledBuffer {
174    fn drop(&mut self) {
175        if let Some(ref weak_pool) = self.pool
176            && let Some(pool) = weak_pool.upgrade()
177        {
178            // Return the buffer to the pool
179            let data = std::mem::take(&mut self.data);
180            pool.release(data);
181        }
182        // If pool is None or has been dropped, data is simply freed
183    }
184}
185
186impl AsRef<[u8]> for PooledBuffer {
187    fn as_ref(&self) -> &[u8] {
188        &self.data
189    }
190}
191
192impl AsMut<[u8]> for PooledBuffer {
193    fn as_mut(&mut self) -> &mut [u8] {
194        &mut self.data
195    }
196}
197
198/// A simple frame pool backed by a `Vec` of reusable buffers.
199///
200/// Stores up to `capacity` buffers and returns them to callers via
201/// [`FramePool::acquire`]. When exhausted, `acquire` returns `None` and
202/// the caller allocates a new buffer directly.
203///
204/// # Thread Safety
205///
206/// All access is protected by a [`Mutex`], making `VecPool` safe to share
207/// across threads via `Arc<VecPool>`.
208///
209/// # Examples
210///
211/// ```
212/// use ff_common::VecPool;
213///
214/// let pool = VecPool::new(32);
215/// assert_eq!(pool.available(), 0); // pool starts empty
216/// ```
217#[derive(Debug)]
218pub struct VecPool {
219    /// Pool of available buffers.
220    buffers: Mutex<Vec<Vec<u8>>>,
221    /// Maximum number of buffers to keep; excess releases are dropped.
222    capacity: usize,
223    /// Weak self-reference used to hand out auto-returning [`PooledBuffer`]s.
224    self_ref: Mutex<Weak<Self>>,
225}
226
227impl VecPool {
228    /// Creates a new pool with the given maximum buffer count.
229    ///
230    /// # Arguments
231    ///
232    /// * `capacity` - Maximum number of buffers to retain in the pool.
233    #[must_use]
234    pub fn new(capacity: usize) -> Arc<Self> {
235        let pool = Arc::new(Self {
236            buffers: Mutex::new(Vec::with_capacity(capacity)),
237            capacity,
238            self_ref: Mutex::new(Weak::new()),
239        });
240        if let Ok(mut r) = pool.self_ref.lock() {
241            *r = Arc::downgrade(&pool);
242        }
243        pool
244    }
245
246    /// Returns the maximum number of buffers this pool will retain.
247    #[must_use]
248    pub fn capacity(&self) -> usize {
249        self.capacity
250    }
251
252    /// Returns the number of buffers currently available in the pool.
253    ///
254    /// # Examples
255    ///
256    /// ```
257    /// use ff_common::VecPool;
258    ///
259    /// let pool = VecPool::new(8);
260    /// assert_eq!(pool.available(), 0);
261    /// ```
262    #[must_use]
263    pub fn available(&self) -> usize {
264        self.buffers.lock().map_or(0, |b| b.len())
265    }
266}
267
268impl FramePool for VecPool {
269    fn acquire(&self, size: usize) -> Option<PooledBuffer> {
270        if let Ok(mut buffers) = self.buffers.lock() {
271            let suitable_idx = buffers
272                .iter()
273                .enumerate()
274                .filter(|(_, b)| b.capacity() >= size)
275                .min_by_key(|(_, b)| b.capacity())
276                .map(|(idx, _)| idx);
277
278            if let Some(idx) = suitable_idx {
279                let mut buf = buffers.swap_remove(idx);
280                buf.resize(size, 0);
281                buf.fill(0);
282
283                let weak_ref = self
284                    .self_ref
285                    .lock()
286                    .ok()
287                    .and_then(|r| r.upgrade())
288                    .map(|arc| Arc::downgrade(&(arc as Arc<dyn FramePool>)))?;
289
290                return Some(PooledBuffer::new(buf, weak_ref));
291            }
292        }
293        None
294    }
295
296    fn release(&self, buffer: Vec<u8>) {
297        if let Ok(mut buffers) = self.buffers.lock()
298            && buffers.len() < self.capacity
299        {
300            buffers.push(buffer);
301        }
302    }
303}
304
305/// Alias for [`VecPool`] kept for backwards compatibility with `ff-decode`.
306///
307/// Prefer [`VecPool`] for new code.
308pub type SimpleFramePool = VecPool;
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use std::sync::Arc;
314
315    #[test]
316    fn test_pooled_buffer_standalone() {
317        let data = vec![1u8, 2, 3, 4, 5];
318        let buffer = PooledBuffer::standalone(data.clone());
319
320        assert_eq!(buffer.len(), 5);
321        assert!(!buffer.is_empty());
322        assert_eq!(buffer.data(), &[1, 2, 3, 4, 5]);
323    }
324
325    #[test]
326    fn test_pooled_buffer_data_mut() {
327        let mut buffer = PooledBuffer::standalone(vec![0u8; 4]);
328        buffer.data_mut()[0] = 42;
329        assert_eq!(buffer.data()[0], 42);
330    }
331
332    #[test]
333    fn test_pooled_buffer_into_inner() {
334        let buffer = PooledBuffer::standalone(vec![1, 2, 3]);
335        let inner = buffer.into_inner();
336        assert_eq!(inner, vec![1, 2, 3]);
337    }
338
339    #[test]
340    fn test_pooled_buffer_as_ref() {
341        let buffer = PooledBuffer::standalone(vec![1, 2, 3]);
342        let slice: &[u8] = buffer.as_ref();
343        assert_eq!(slice, &[1, 2, 3]);
344    }
345
346    #[test]
347    fn test_pooled_buffer_as_mut() {
348        let mut buffer = PooledBuffer::standalone(vec![1, 2, 3]);
349        let slice: &mut [u8] = buffer.as_mut();
350        slice[0] = 99;
351        assert_eq!(buffer.data(), &[99, 2, 3]);
352    }
353
354    #[test]
355    fn test_empty_buffer() {
356        let buffer = PooledBuffer::standalone(vec![]);
357        assert!(buffer.is_empty());
358        assert_eq!(buffer.len(), 0);
359    }
360
361    #[test]
362    fn test_pool_with_arc_release() {
363        use std::sync::Mutex;
364        use std::sync::atomic::{AtomicUsize, Ordering};
365
366        #[derive(Debug)]
367        struct ArcPool {
368            buffers: Mutex<Vec<Vec<u8>>>,
369            release_count: AtomicUsize,
370        }
371
372        impl FramePool for ArcPool {
373            fn acquire(&self, _size: usize) -> Option<PooledBuffer> {
374                None // Not used in this test
375            }
376
377            fn release(&self, buffer: Vec<u8>) {
378                if let Ok(mut buffers) = self.buffers.lock() {
379                    buffers.push(buffer);
380                    self.release_count.fetch_add(1, Ordering::SeqCst);
381                }
382            }
383        }
384
385        let pool = Arc::new(ArcPool {
386            buffers: Mutex::new(vec![]),
387            release_count: AtomicUsize::new(0),
388        });
389
390        // Create a buffer with pool reference
391        {
392            let _buffer =
393                PooledBuffer::new(vec![1, 2, 3], Arc::downgrade(&pool) as Weak<dyn FramePool>);
394            // Buffer is dropped here
395        }
396
397        // Verify the buffer was returned to the pool
398        assert_eq!(pool.release_count.load(Ordering::SeqCst), 1);
399        assert!(pool.buffers.lock().map(|b| b.len() == 1).unwrap_or(false));
400    }
401
402    #[test]
403    fn test_pool_dropped_before_buffer() {
404        #[derive(Debug)]
405        struct DroppablePool;
406
407        impl FramePool for DroppablePool {
408            fn acquire(&self, _size: usize) -> Option<PooledBuffer> {
409                None
410            }
411
412            fn release(&self, _buffer: Vec<u8>) {
413                // This should NOT be called if pool is dropped
414                panic!("release should not be called on dropped pool");
415            }
416        }
417
418        let buffer;
419        {
420            let pool = Arc::new(DroppablePool);
421            buffer = PooledBuffer::new(vec![1, 2, 3], Arc::downgrade(&pool) as Weak<dyn FramePool>);
422            // Pool is dropped here
423        }
424
425        // Buffer can still be used
426        assert_eq!(buffer.data(), &[1, 2, 3]);
427
428        // Dropping buffer should not panic (pool is already gone)
429        drop(buffer);
430    }
431
432    #[test]
433    fn test_pooled_buffer_clone_becomes_standalone() {
434        use std::sync::atomic::{AtomicUsize, Ordering};
435
436        #[derive(Debug)]
437        struct CountingPool {
438            release_count: AtomicUsize,
439        }
440
441        impl FramePool for CountingPool {
442            fn acquire(&self, _size: usize) -> Option<PooledBuffer> {
443                None
444            }
445
446            fn release(&self, _buffer: Vec<u8>) {
447                self.release_count.fetch_add(1, Ordering::SeqCst);
448            }
449        }
450
451        let pool = Arc::new(CountingPool {
452            release_count: AtomicUsize::new(0),
453        });
454
455        // Create pooled buffer
456        let buffer1 =
457            PooledBuffer::new(vec![1, 2, 3], Arc::downgrade(&pool) as Weak<dyn FramePool>);
458
459        // Clone it
460        let buffer2 = buffer1.clone();
461
462        // Both buffers have the same data
463        assert_eq!(buffer1.data(), &[1, 2, 3]);
464        assert_eq!(buffer2.data(), &[1, 2, 3]);
465
466        // Drop both buffers
467        drop(buffer1);
468        drop(buffer2);
469
470        // Only the original buffer should have been returned to pool (count = 1)
471        // The cloned buffer is standalone and should NOT return to pool
472        assert_eq!(pool.release_count.load(Ordering::SeqCst), 1);
473    }
474
475    #[test]
476    fn test_pooled_buffer_clone_data_independence() {
477        let buffer1 = PooledBuffer::standalone(vec![1, 2, 3]);
478        let mut buffer2 = buffer1.clone();
479
480        // Modify buffer2
481        buffer2.data_mut()[0] = 99;
482
483        // buffer1 should be unaffected (deep copy)
484        assert_eq!(buffer1.data(), &[1, 2, 3]);
485        assert_eq!(buffer2.data(), &[99, 2, 3]);
486    }
487
488    // ── VecPool tests ─────────────────────────────────────────────────────────
489
490    #[test]
491    fn vec_pool_should_start_empty() {
492        let pool = VecPool::new(32);
493        assert_eq!(pool.capacity(), 32);
494        assert_eq!(pool.available(), 0);
495    }
496
497    #[test]
498    fn vec_pool_acquire_should_return_none_when_empty() {
499        let pool = VecPool::new(8);
500        assert!(pool.acquire(1024).is_none());
501    }
502
503    #[test]
504    fn vec_pool_release_then_acquire_should_reuse_buffer() {
505        let pool = VecPool::new(8);
506
507        pool.release(vec![0u8; 1024]);
508        assert_eq!(pool.available(), 1);
509
510        let buf = pool.acquire(512).unwrap();
511        assert_eq!(buf.len(), 512);
512        assert_eq!(pool.available(), 0);
513    }
514
515    #[test]
516    fn vec_pool_buffer_should_auto_return_on_drop() {
517        let pool = VecPool::new(8);
518        pool.release(vec![0u8; 2048]);
519
520        {
521            let _buf = pool.acquire(1024).unwrap();
522            assert_eq!(pool.available(), 0);
523        }
524
525        assert_eq!(pool.available(), 1);
526    }
527
528    #[test]
529    fn vec_pool_should_not_exceed_capacity() {
530        let pool = VecPool::new(2);
531
532        pool.release(vec![0u8; 512]);
533        pool.release(vec![0u8; 512]);
534        pool.release(vec![0u8; 512]); // over capacity — dropped
535
536        assert_eq!(pool.available(), 2);
537    }
538
539    #[test]
540    fn vec_pool_acquire_should_choose_smallest_fitting_buffer() {
541        let pool = VecPool::new(8);
542        pool.release(vec![0u8; 512]);
543        pool.release(vec![0u8; 1024]);
544        pool.release(vec![0u8; 2048]);
545
546        // Request 1000 bytes — should get the 1024 buffer (smallest that fits)
547        let buf = pool.acquire(1000).unwrap();
548        assert!(buf.len() >= 1000);
549        assert_eq!(pool.available(), 2);
550    }
551
552    #[test]
553    fn vec_pool_acquire_should_return_none_when_no_suitable_buffer() {
554        let pool = VecPool::new(4);
555        pool.release(vec![0u8; 512]);
556
557        assert!(pool.acquire(1024).is_none());
558        // Original buffer must still be in the pool
559        assert_eq!(pool.available(), 1);
560    }
561
562    #[test]
563    fn simple_frame_pool_alias_should_behave_identically_to_vec_pool() {
564        let pool = SimpleFramePool::new(4);
565        assert_eq!(pool.capacity(), 4);
566        assert_eq!(pool.available(), 0);
567    }
568}