Skip to main content

ff_common/
pool.rs

1//! Frame buffer pooling abstractions.
2//!
3//! This module provides the [`FramePool`] trait, [`PooledBuffer`] type, and
4//! [`VecPool`] — a ready-to-use `Mutex<Vec<Vec<u8>>>` pool implementation —
5//! which enable memory pooling for decoded frames, reducing allocation
6//! overhead during video playback.
7
8use std::sync::{Arc, Mutex, Weak};
9
10/// A trait for frame buffer pooling.
11///
12/// Implementing this trait allows custom memory management strategies
13/// for decoded video frames. This is useful for reducing allocation
14/// pressure during real-time video playback.
15///
16/// # Thread Safety
17///
18/// Implementations must be `Send + Sync` to allow sharing across threads.
19///
20/// # Example Implementation
21///
22/// ```ignore
23/// use ff_common::{FramePool, PooledBuffer};
24/// use std::sync::{Arc, Mutex};
25///
26/// struct SimplePool {
27///     buffers: Mutex<Vec<Vec<u8>>>,
28///     buffer_size: usize,
29///}
30///
31/// impl FramePool for SimplePool {
32///     fn acquire(&self, size: usize) -> Option<PooledBuffer> {
33///         // Implementation...
34///         None
35///     }
36/// }
37/// ```
38pub trait FramePool: Send + Sync + std::fmt::Debug {
39    /// Acquires a buffer of at least the specified size from the pool.
40    ///
41    /// # Arguments
42    ///
43    /// * `size` - The minimum required buffer size in bytes.
44    ///
45    /// # Returns
46    ///
47    /// Returns `Some(PooledBuffer)` if a buffer is available, or `None` if
48    /// the pool is exhausted. When `None` is returned, the decoder will
49    /// allocate a new buffer directly.
50    ///
51    /// # Thread Safety
52    ///
53    /// This method may be called from multiple threads concurrently.
54    fn acquire(&self, size: usize) -> Option<PooledBuffer>;
55
56    /// Returns a buffer to the pool.
57    ///
58    /// This method is called automatically when a [`PooledBuffer`] is dropped.
59    /// The default implementation does nothing (the buffer is simply dropped).
60    ///
61    /// # Arguments
62    ///
63    /// * `buffer` - The buffer data to return to the pool.
64    fn release(&self, _buffer: Vec<u8>) {
65        // Default: just drop the buffer
66    }
67}
68
69/// A buffer acquired from a [`FramePool`].
70///
71/// When this buffer is dropped, it is automatically returned to its
72/// parent pool if the pool still exists. This enables zero-overhead
73/// buffer reuse during video decoding.
74///
75/// # Memory Management
76///
77/// The buffer holds a weak reference to its parent pool. If the pool
78/// is dropped before the buffer, the buffer's memory is simply freed
79/// rather than being returned to the pool.
80///
81/// # Cloning
82///
83/// When cloned, the new buffer becomes a standalone buffer (no pool reference).
84/// This prevents double-free issues where both the original and cloned buffer
85/// would attempt to return the same memory to the pool.
86#[derive(Debug)]
87pub struct PooledBuffer {
88    /// The actual buffer data
89    data: Vec<u8>,
90    /// Weak reference to the parent pool for returning the buffer
91    pool: Option<Weak<dyn FramePool>>,
92}
93
94impl PooledBuffer {
95    /// Creates a new pooled buffer with a reference to its parent pool.
96    ///
97    /// # Arguments
98    ///
99    /// * `data` - The buffer data.
100    /// * `pool` - A weak reference to the parent pool.
101    #[must_use]
102    pub fn new(data: Vec<u8>, pool: Weak<dyn FramePool>) -> Self {
103        Self {
104            data,
105            pool: Some(pool),
106        }
107    }
108
109    /// Creates a new pooled buffer without a parent pool.
110    ///
111    /// This is useful for buffers allocated outside of a pool context.
112    /// When dropped, the buffer's memory is simply freed.
113    #[must_use]
114    pub fn standalone(data: Vec<u8>) -> Self {
115        Self { data, pool: None }
116    }
117
118    /// Returns a reference to the buffer data.
119    #[must_use]
120    #[inline]
121    pub fn data(&self) -> &[u8] {
122        &self.data
123    }
124
125    /// Returns a mutable reference to the buffer data.
126    #[must_use]
127    #[inline]
128    pub fn data_mut(&mut self) -> &mut [u8] {
129        &mut self.data
130    }
131
132    /// Returns the length of the buffer in bytes.
133    #[must_use]
134    #[inline]
135    pub fn len(&self) -> usize {
136        self.data.len()
137    }
138
139    /// Returns `true` if the buffer is empty.
140    #[must_use]
141    #[inline]
142    pub fn is_empty(&self) -> bool {
143        self.data.is_empty()
144    }
145
146    /// Consumes the buffer and returns the underlying data.
147    ///
148    /// After calling this, the buffer will not be returned to the pool.
149    #[must_use]
150    pub fn into_inner(mut self) -> Vec<u8> {
151        // Take ownership to prevent Drop from returning to pool
152        self.pool = None;
153        std::mem::take(&mut self.data)
154    }
155}
156
157impl Clone for PooledBuffer {
158    /// Clones the buffer data, but the cloned buffer becomes standalone.
159    ///
160    /// The cloned buffer will NOT be returned to the pool when dropped.
161    /// This prevents double-free issues where both buffers would attempt
162    /// to return the same memory to the pool.
163    ///
164    /// Only the original buffer retains its pool reference.
165    fn clone(&self) -> Self {
166        Self {
167            data: self.data.clone(),
168            pool: None, // Cloned buffer is standalone
169        }
170    }
171}
172
173impl Drop for PooledBuffer {
174    fn drop(&mut self) {
175        if let Some(ref weak_pool) = self.pool
176            && let Some(pool) = weak_pool.upgrade()
177        {
178            // Return the buffer to the pool
179            let data = std::mem::take(&mut self.data);
180            pool.release(data);
181        }
182        // If pool is None or has been dropped, data is simply freed
183    }
184}
185
186impl AsRef<[u8]> for PooledBuffer {
187    fn as_ref(&self) -> &[u8] {
188        &self.data
189    }
190}
191
192impl AsMut<[u8]> for PooledBuffer {
193    fn as_mut(&mut self) -> &mut [u8] {
194        &mut self.data
195    }
196}
197
198/// A simple frame pool backed by a `Vec` of reusable buffers.
199///
200/// Stores up to `capacity` buffers and returns them to callers via
201/// [`FramePool::acquire`]. When exhausted, `acquire` returns `None` and
202/// the caller allocates a new buffer directly.
203///
204/// # Thread Safety
205///
206/// All access is protected by a [`Mutex`], making `VecPool` safe to share
207/// across threads via `Arc<VecPool>`.
208///
209/// # Examples
210///
211/// ```
212/// use ff_common::VecPool;
213///
214/// let pool = VecPool::new(32);
215/// assert_eq!(pool.available(), 0); // pool starts empty
216/// ```
217#[derive(Debug)]
218pub struct VecPool {
219    /// Pool of available buffers.
220    buffers: Mutex<Vec<Vec<u8>>>,
221    /// Maximum number of buffers to keep; excess releases are dropped.
222    capacity: usize,
223    /// Weak self-reference used to hand out auto-returning [`PooledBuffer`]s.
224    self_ref: Mutex<Weak<Self>>,
225}
226
227impl VecPool {
228    /// Creates a new pool with the given maximum buffer count.
229    ///
230    /// # Arguments
231    ///
232    /// * `capacity` - Maximum number of buffers to retain in the pool.
233    #[must_use]
234    pub fn new(capacity: usize) -> Arc<Self> {
235        let pool = Arc::new(Self {
236            buffers: Mutex::new(Vec::with_capacity(capacity)),
237            capacity,
238            self_ref: Mutex::new(Weak::new()),
239        });
240        if let Ok(mut r) = pool.self_ref.lock() {
241            *r = Arc::downgrade(&pool);
242        }
243        pool
244    }
245
246    /// Returns the maximum number of buffers this pool will retain.
247    #[must_use]
248    pub fn capacity(&self) -> usize {
249        self.capacity
250    }
251
252    /// Returns the number of buffers currently available in the pool.
253    ///
254    /// # Examples
255    ///
256    /// ```
257    /// use ff_common::VecPool;
258    ///
259    /// let pool = VecPool::new(8);
260    /// assert_eq!(pool.available(), 0);
261    /// ```
262    #[must_use]
263    pub fn available(&self) -> usize {
264        self.buffers.lock().map_or(0, |b| b.len())
265    }
266}
267
268impl FramePool for VecPool {
269    fn acquire(&self, size: usize) -> Option<PooledBuffer> {
270        if let Ok(mut buffers) = self.buffers.lock() {
271            let suitable_idx = buffers
272                .iter()
273                .enumerate()
274                .filter(|(_, b)| b.capacity() >= size)
275                .min_by_key(|(_, b)| b.capacity())
276                .map(|(idx, _)| idx);
277
278            if let Some(idx) = suitable_idx {
279                let mut buf = buffers.swap_remove(idx);
280                buf.resize(size, 0);
281                buf.fill(0);
282
283                let weak_ref = self
284                    .self_ref
285                    .lock()
286                    .ok()
287                    .and_then(|r| r.upgrade())
288                    .map(|arc| Arc::downgrade(&(arc as Arc<dyn FramePool>)))?;
289
290                return Some(PooledBuffer::new(buf, weak_ref));
291            }
292        }
293        None
294    }
295
296    fn release(&self, buffer: Vec<u8>) {
297        if let Ok(mut buffers) = self.buffers.lock()
298            && buffers.len() < self.capacity
299        {
300            buffers.push(buffer);
301        }
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308    use std::sync::Arc;
309
310    #[test]
311    fn test_pooled_buffer_standalone() {
312        let data = vec![1u8, 2, 3, 4, 5];
313        let buffer = PooledBuffer::standalone(data.clone());
314
315        assert_eq!(buffer.len(), 5);
316        assert!(!buffer.is_empty());
317        assert_eq!(buffer.data(), &[1, 2, 3, 4, 5]);
318    }
319
320    #[test]
321    fn test_pooled_buffer_data_mut() {
322        let mut buffer = PooledBuffer::standalone(vec![0u8; 4]);
323        buffer.data_mut()[0] = 42;
324        assert_eq!(buffer.data()[0], 42);
325    }
326
327    #[test]
328    fn test_pooled_buffer_into_inner() {
329        let buffer = PooledBuffer::standalone(vec![1, 2, 3]);
330        let inner = buffer.into_inner();
331        assert_eq!(inner, vec![1, 2, 3]);
332    }
333
334    #[test]
335    fn test_pooled_buffer_as_ref() {
336        let buffer = PooledBuffer::standalone(vec![1, 2, 3]);
337        let slice: &[u8] = buffer.as_ref();
338        assert_eq!(slice, &[1, 2, 3]);
339    }
340
341    #[test]
342    fn test_pooled_buffer_as_mut() {
343        let mut buffer = PooledBuffer::standalone(vec![1, 2, 3]);
344        let slice: &mut [u8] = buffer.as_mut();
345        slice[0] = 99;
346        assert_eq!(buffer.data(), &[99, 2, 3]);
347    }
348
349    #[test]
350    fn test_empty_buffer() {
351        let buffer = PooledBuffer::standalone(vec![]);
352        assert!(buffer.is_empty());
353        assert_eq!(buffer.len(), 0);
354    }
355
356    #[test]
357    fn test_pool_with_arc_release() {
358        use std::sync::Mutex;
359        use std::sync::atomic::{AtomicUsize, Ordering};
360
361        #[derive(Debug)]
362        struct ArcPool {
363            buffers: Mutex<Vec<Vec<u8>>>,
364            release_count: AtomicUsize,
365        }
366
367        impl FramePool for ArcPool {
368            fn acquire(&self, _size: usize) -> Option<PooledBuffer> {
369                None // Not used in this test
370            }
371
372            fn release(&self, buffer: Vec<u8>) {
373                if let Ok(mut buffers) = self.buffers.lock() {
374                    buffers.push(buffer);
375                    self.release_count.fetch_add(1, Ordering::SeqCst);
376                }
377            }
378        }
379
380        let pool = Arc::new(ArcPool {
381            buffers: Mutex::new(vec![]),
382            release_count: AtomicUsize::new(0),
383        });
384
385        // Create a buffer with pool reference
386        {
387            let _buffer =
388                PooledBuffer::new(vec![1, 2, 3], Arc::downgrade(&pool) as Weak<dyn FramePool>);
389            // Buffer is dropped here
390        }
391
392        // Verify the buffer was returned to the pool
393        assert_eq!(pool.release_count.load(Ordering::SeqCst), 1);
394        assert!(pool.buffers.lock().map(|b| b.len() == 1).unwrap_or(false));
395    }
396
397    #[test]
398    fn test_pool_dropped_before_buffer() {
399        #[derive(Debug)]
400        struct DroppablePool;
401
402        impl FramePool for DroppablePool {
403            fn acquire(&self, _size: usize) -> Option<PooledBuffer> {
404                None
405            }
406
407            fn release(&self, _buffer: Vec<u8>) {
408                // This should NOT be called if pool is dropped
409                panic!("release should not be called on dropped pool");
410            }
411        }
412
413        let buffer;
414        {
415            let pool = Arc::new(DroppablePool);
416            buffer = PooledBuffer::new(vec![1, 2, 3], Arc::downgrade(&pool) as Weak<dyn FramePool>);
417            // Pool is dropped here
418        }
419
420        // Buffer can still be used
421        assert_eq!(buffer.data(), &[1, 2, 3]);
422
423        // Dropping buffer should not panic (pool is already gone)
424        drop(buffer);
425    }
426
427    #[test]
428    fn test_pooled_buffer_clone_becomes_standalone() {
429        use std::sync::atomic::{AtomicUsize, Ordering};
430
431        #[derive(Debug)]
432        struct CountingPool {
433            release_count: AtomicUsize,
434        }
435
436        impl FramePool for CountingPool {
437            fn acquire(&self, _size: usize) -> Option<PooledBuffer> {
438                None
439            }
440
441            fn release(&self, _buffer: Vec<u8>) {
442                self.release_count.fetch_add(1, Ordering::SeqCst);
443            }
444        }
445
446        let pool = Arc::new(CountingPool {
447            release_count: AtomicUsize::new(0),
448        });
449
450        // Create pooled buffer
451        let buffer1 =
452            PooledBuffer::new(vec![1, 2, 3], Arc::downgrade(&pool) as Weak<dyn FramePool>);
453
454        // Clone it
455        let buffer2 = buffer1.clone();
456
457        // Both buffers have the same data
458        assert_eq!(buffer1.data(), &[1, 2, 3]);
459        assert_eq!(buffer2.data(), &[1, 2, 3]);
460
461        // Drop both buffers
462        drop(buffer1);
463        drop(buffer2);
464
465        // Only the original buffer should have been returned to pool (count = 1)
466        // The cloned buffer is standalone and should NOT return to pool
467        assert_eq!(pool.release_count.load(Ordering::SeqCst), 1);
468    }
469
470    #[test]
471    fn test_pooled_buffer_clone_data_independence() {
472        let buffer1 = PooledBuffer::standalone(vec![1, 2, 3]);
473        let mut buffer2 = buffer1.clone();
474
475        // Modify buffer2
476        buffer2.data_mut()[0] = 99;
477
478        // buffer1 should be unaffected (deep copy)
479        assert_eq!(buffer1.data(), &[1, 2, 3]);
480        assert_eq!(buffer2.data(), &[99, 2, 3]);
481    }
482
483    // ── VecPool tests ─────────────────────────────────────────────────────────
484
485    #[test]
486    fn vec_pool_should_start_empty() {
487        let pool = VecPool::new(32);
488        assert_eq!(pool.capacity(), 32);
489        assert_eq!(pool.available(), 0);
490    }
491
492    #[test]
493    fn vec_pool_acquire_should_return_none_when_empty() {
494        let pool = VecPool::new(8);
495        assert!(pool.acquire(1024).is_none());
496    }
497
498    #[test]
499    fn vec_pool_release_then_acquire_should_reuse_buffer() {
500        let pool = VecPool::new(8);
501
502        pool.release(vec![0u8; 1024]);
503        assert_eq!(pool.available(), 1);
504
505        let buf = pool.acquire(512).unwrap();
506        assert_eq!(buf.len(), 512);
507        assert_eq!(pool.available(), 0);
508    }
509
510    #[test]
511    fn vec_pool_buffer_should_auto_return_on_drop() {
512        let pool = VecPool::new(8);
513        pool.release(vec![0u8; 2048]);
514
515        {
516            let _buf = pool.acquire(1024).unwrap();
517            assert_eq!(pool.available(), 0);
518        }
519
520        assert_eq!(pool.available(), 1);
521    }
522
523    #[test]
524    fn vec_pool_should_not_exceed_capacity() {
525        let pool = VecPool::new(2);
526
527        pool.release(vec![0u8; 512]);
528        pool.release(vec![0u8; 512]);
529        pool.release(vec![0u8; 512]); // over capacity — dropped
530
531        assert_eq!(pool.available(), 2);
532    }
533
534    #[test]
535    fn vec_pool_acquire_should_choose_smallest_fitting_buffer() {
536        let pool = VecPool::new(8);
537        pool.release(vec![0u8; 512]);
538        pool.release(vec![0u8; 1024]);
539        pool.release(vec![0u8; 2048]);
540
541        // Request 1000 bytes — should get the 1024 buffer (smallest that fits)
542        let buf = pool.acquire(1000).unwrap();
543        assert!(buf.len() >= 1000);
544        assert_eq!(pool.available(), 2);
545    }
546
547    #[test]
548    fn vec_pool_acquire_should_return_none_when_no_suitable_buffer() {
549        let pool = VecPool::new(4);
550        pool.release(vec![0u8; 512]);
551
552        assert!(pool.acquire(1024).is_none());
553        // Original buffer must still be in the pool
554        assert_eq!(pool.available(), 1);
555    }
556
557    #[test]
558    fn vec_pool_acquired_buffer_should_return_on_drop_when_pool_was_empty() {
559        let pool = VecPool::new(4);
560        assert_eq!(pool.available(), 0);
561        assert!(pool.acquire(1024).is_none());
562
563        // Simulate what allocate_buffer does: attach fresh memory to the pool.
564        let pool_dyn: Arc<dyn FramePool> = Arc::clone(&pool) as Arc<dyn FramePool>;
565        let buf = PooledBuffer::new(vec![0u8; 1024], Arc::downgrade(&pool_dyn));
566        drop(buf);
567
568        assert_eq!(pool.available(), 1);
569    }
570
571    #[test]
572    fn vec_pool_should_grow_from_zero_via_connected_alloc() {
573        let pool = VecPool::new(8);
574        let pool_dyn: Arc<dyn FramePool> = Arc::clone(&pool) as Arc<dyn FramePool>;
575
576        // Allocate three buffers with pool reference (pool empty on each call).
577        let b1 = PooledBuffer::new(vec![0u8; 1024], Arc::downgrade(&pool_dyn));
578        let b2 = PooledBuffer::new(vec![0u8; 1024], Arc::downgrade(&pool_dyn));
579        let b3 = PooledBuffer::new(vec![0u8; 1024], Arc::downgrade(&pool_dyn));
580        assert_eq!(pool.available(), 0);
581
582        drop(b1);
583        drop(b2);
584        drop(b3);
585
586        assert_eq!(pool.available(), 3);
587
588        // Next acquire should succeed.
589        let buf = pool.acquire(512);
590        assert!(buf.is_some());
591        assert_eq!(pool.available(), 2);
592    }
593}