streamkit_core/
frame_pool.rs

1// SPDX-FileCopyrightText: © 2025 StreamKit Contributors
2//
3// SPDX-License-Identifier: MPL-2.0
4
5//! Generic buffer pooling for frame/sample reuse.
6//!
7//! The pool is intentionally simple:
8//! - fixed size buckets (by element count)
9//! - bounded buffers per bucket
10//! - `PooledFrameData<T>` returns its backing buffer to the pool on drop
11//!
12//! This is primarily used to amortize per-frame allocations in hot paths like Opus decode.
13
14use std::ops::{Deref, DerefMut};
15use std::sync::{Arc, Mutex, Weak};
16
17#[derive(Debug, Clone)]
18pub struct PoolStats {
19    pub hits: u64,
20    pub misses: u64,
21    pub buckets: Vec<BucketStats>,
22}
23
24#[derive(Debug, Clone)]
25pub struct BucketStats {
26    pub bucket_size: usize,
27    pub available: usize,
28    pub max_per_bucket: usize,
29}
30
31#[derive(Clone)]
32pub struct PoolHandle<T>(Weak<Mutex<PoolInner<T>>>);
33
34impl<T> PoolHandle<T> {
35    fn upgrade(&self) -> Option<Arc<Mutex<PoolInner<T>>>> {
36        self.0.upgrade()
37    }
38}
39
40struct PoolInner<T> {
41    bucket_sizes: Vec<usize>,
42    max_per_bucket: usize,
43    buckets: Vec<Vec<Vec<T>>>,
44    hits: u64,
45    misses: u64,
46}
47
48impl<T> PoolInner<T> {
49    fn bucket_index_for_min_len(&self, min_len: usize) -> Option<usize> {
50        self.bucket_sizes.iter().position(|&size| size >= min_len)
51    }
52
53    fn bucket_index_for_storage_len(&self, storage_len: usize) -> Option<usize> {
54        self.bucket_sizes.iter().position(|&size| size == storage_len)
55    }
56}
57
58/// Thread-safe pool for `Vec<T>` buffers.
59pub struct FramePool<T> {
60    inner: Arc<Mutex<PoolInner<T>>>,
61}
62
63impl<T> Clone for FramePool<T> {
64    fn clone(&self) -> Self {
65        Self { inner: self.inner.clone() }
66    }
67}
68
69impl<T> FramePool<T> {
70    /// Create a pool with fixed buckets.
71    ///
72    /// `bucket_sizes` should be sorted ascending; this function will sort/dedup defensively.
73    pub fn with_buckets(mut bucket_sizes: Vec<usize>, max_per_bucket: usize) -> Self {
74        bucket_sizes.sort_unstable();
75        bucket_sizes.dedup();
76        let buckets = (0..bucket_sizes.len()).map(|_| Vec::new()).collect();
77        Self {
78            inner: Arc::new(Mutex::new(PoolInner {
79                bucket_sizes,
80                max_per_bucket,
81                buckets,
82                hits: 0,
83                misses: 0,
84            })),
85        }
86    }
87
88    pub fn handle(&self) -> PoolHandle<T> {
89        PoolHandle(Arc::downgrade(&self.inner))
90    }
91
92    pub fn stats(&self) -> PoolStats {
93        let Ok(guard) = self.inner.lock() else {
94            return PoolStats { hits: 0, misses: 0, buckets: Vec::new() };
95        };
96        PoolStats {
97            hits: guard.hits,
98            misses: guard.misses,
99            buckets: guard
100                .bucket_sizes
101                .iter()
102                .enumerate()
103                .map(|(idx, &bucket_size)| BucketStats {
104                    bucket_size,
105                    available: guard.buckets[idx].len(),
106                    max_per_bucket: guard.max_per_bucket,
107                })
108                .collect(),
109        }
110    }
111}
112
113impl<T: Clone + Default> FramePool<T> {
114    pub fn preallocated(bucket_sizes: &[usize], buffers_per_bucket: usize) -> Self {
115        let pool = Self::with_buckets(bucket_sizes.to_vec(), buffers_per_bucket);
116        let Ok(mut guard) = pool.inner.lock() else {
117            return pool;
118        };
119
120        for idx in 0..guard.bucket_sizes.len() {
121            let bucket_size = guard.bucket_sizes[idx];
122            for _ in 0..buffers_per_bucket {
123                guard.buckets[idx].push(vec![T::default(); bucket_size]);
124            }
125        }
126        drop(guard);
127        pool
128    }
129
130    /// Get pooled storage for at least `min_len` elements.
131    ///
132    /// If `min_len` doesn't fit in any bucket, returns a non-pooled buffer of exact size.
133    pub fn get(&self, min_len: usize) -> PooledFrameData<T> {
134        let (handle, bucket_idx, bucket_size, maybe_buf) = {
135            let Ok(mut guard) = self.inner.lock() else {
136                return PooledFrameData::from_vec(vec![T::default(); min_len]);
137            };
138            let Some(bucket_idx) = guard.bucket_index_for_min_len(min_len) else {
139                guard.misses += 1;
140                return PooledFrameData::from_vec(vec![T::default(); min_len]);
141            };
142            let bucket_size = guard.bucket_sizes[bucket_idx];
143            let buf = guard.buckets[bucket_idx].pop();
144            if buf.is_some() {
145                guard.hits += 1;
146            } else {
147                guard.misses += 1;
148            }
149            (self.handle(), bucket_idx, bucket_size, buf)
150        };
151
152        let data = maybe_buf.unwrap_or_else(|| vec![T::default(); bucket_size]);
153        PooledFrameData::from_pool(data, min_len, handle, bucket_idx)
154    }
155}
156
157/// A pooled buffer with a logical length.
158///
159/// For pooled instances, `data.len()` is the bucket size and `len` is the logical slice length.
160pub struct PooledFrameData<T> {
161    data: Vec<T>,
162    len: usize,
163    pool: Option<PoolHandle<T>>,
164    bucket_idx: Option<usize>,
165}
166
167impl<T: std::fmt::Debug> std::fmt::Debug for PooledFrameData<T> {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        f.debug_struct("PooledFrameData")
170            .field("len", &self.len)
171            .field("storage_len", &self.data.len())
172            .field("pooled", &self.pool.is_some())
173            .finish_non_exhaustive()
174    }
175}
176
177impl<T> PooledFrameData<T> {
178    pub const fn from_vec(data: Vec<T>) -> Self {
179        let len = data.len();
180        Self { data, len, pool: None, bucket_idx: None }
181    }
182
183    fn from_pool(data: Vec<T>, len: usize, pool: PoolHandle<T>, bucket_idx: usize) -> Self {
184        let len = len.min(data.len());
185        Self { data, len, pool: Some(pool), bucket_idx: Some(bucket_idx) }
186    }
187
188    #[must_use]
189    pub const fn len(&self) -> usize {
190        self.len
191    }
192
193    #[must_use]
194    pub const fn is_empty(&self) -> bool {
195        self.len == 0
196    }
197
198    #[must_use]
199    pub const fn storage_len(&self) -> usize {
200        self.data.len()
201    }
202
203    pub fn as_slice(&self) -> &[T] {
204        &self.data[..self.len]
205    }
206
207    pub fn as_mut_slice(&mut self) -> &mut [T] {
208        &mut self.data[..self.len]
209    }
210
211    /// Set the logical length (must be <= storage length).
212    pub fn truncate(&mut self, new_len: usize) {
213        self.len = new_len.min(self.data.len());
214    }
215
216    /// Consume into a detached Vec of exactly the logical length.
217    pub fn into_vec(mut self) -> Vec<T> {
218        self.pool = None;
219        self.bucket_idx = None;
220        let logical_len = self.len;
221        let mut data = std::mem::take(&mut self.data);
222        data.truncate(logical_len);
223        data
224    }
225}
226
227impl<T: Clone + Default> Clone for PooledFrameData<T> {
228    fn clone(&self) -> Self {
229        // Fast path: if pooled and pool is alive, try to allocate from a bucket to avoid heap alloc.
230        if let Some(pool) = &self.pool {
231            if let Some(inner) = pool.upgrade() {
232                if let Ok(mut guard) = inner.lock() {
233                    if let Some(bucket_idx) = guard.bucket_index_for_min_len(self.len) {
234                        let bucket_size = guard.bucket_sizes[bucket_idx];
235                        let mut data = guard
236                            .buckets
237                            .get_mut(bucket_idx)
238                            .and_then(std::vec::Vec::pop)
239                            .unwrap_or_else(|| vec![T::default(); bucket_size]);
240                        guard.hits += 1;
241
242                        data[..self.len].clone_from_slice(self.as_slice());
243                        return Self::from_pool(data, self.len, pool.clone(), bucket_idx);
244                    }
245                }
246            }
247        }
248
249        Self::from_vec(self.as_slice().to_vec())
250    }
251}
252
253impl<T> Deref for PooledFrameData<T> {
254    type Target = [T];
255
256    fn deref(&self) -> &Self::Target {
257        self.as_slice()
258    }
259}
260
261impl<T> DerefMut for PooledFrameData<T> {
262    fn deref_mut(&mut self) -> &mut Self::Target {
263        self.as_mut_slice()
264    }
265}
266
267impl<T> Drop for PooledFrameData<T> {
268    fn drop(&mut self) {
269        let Some(pool) = self.pool.take() else { return };
270        let Some(bucket_idx) = self.bucket_idx.take() else { return };
271        let Some(inner) = pool.upgrade() else { return };
272        let Ok(mut guard) = inner.lock() else { return };
273
274        // Only return buffers that match an existing bucket exactly.
275        let Some(expected_bucket_idx) = guard.bucket_index_for_storage_len(self.data.len()) else {
276            return;
277        };
278        if expected_bucket_idx != bucket_idx {
279            return;
280        }
281
282        if guard.buckets[bucket_idx].len() >= guard.max_per_bucket {
283            return;
284        }
285
286        // Restore logical length to full storage, to keep future clones/copies simple.
287        self.len = self.data.len();
288        guard.buckets[bucket_idx].push(std::mem::take(&mut self.data));
289    }
290}
291
292pub type AudioFramePool = FramePool<f32>;
293pub type PooledSamples = PooledFrameData<f32>;
294
295pub const DEFAULT_AUDIO_BUCKET_SIZES: &[usize] = &[960, 1920, 3840, 7680];
296pub const DEFAULT_AUDIO_BUFFERS_PER_BUCKET: usize = 32;
297
298impl FramePool<f32> {
299    pub fn audio_default() -> Self {
300        Self::preallocated(DEFAULT_AUDIO_BUCKET_SIZES, DEFAULT_AUDIO_BUFFERS_PER_BUCKET)
301    }
302}
303
304#[cfg(test)]
305#[allow(clippy::unwrap_used)]
306mod tests {
307    use super::*;
308
309    #[test]
310    fn returns_to_pool_on_drop() {
311        let pool = FramePool::<u8>::preallocated(&[10], 1);
312        let handle = pool.handle();
313        assert_eq!(pool.stats().buckets[0].available, 1);
314
315        {
316            let mut buf = pool.get(5);
317            assert_eq!(buf.len(), 5);
318            assert_eq!(buf.storage_len(), 10);
319            buf.as_mut_slice().fill(7);
320            assert_eq!(pool.stats().buckets[0].available, 0);
321            drop(handle);
322        }
323
324        // Returned to pool.
325        assert_eq!(pool.stats().buckets[0].available, 1);
326    }
327
328    #[test]
329    fn clone_prefers_pool_when_available() {
330        let pool = FramePool::<u8>::preallocated(&[4], 2);
331        let mut a = pool.get(3);
332        a.as_mut_slice().copy_from_slice(&[1, 2, 3]);
333        let b = a.clone();
334        assert_eq!(b.as_slice(), &[1, 2, 3]);
335        // Dropping both should return 2 buffers back to the pool.
336        drop(a);
337        drop(b);
338        assert_eq!(pool.stats().buckets[0].available, 2);
339    }
340}