streamkit_core/
frame_pool.rs1use std::ops::{Deref, DerefMut};
15use std::sync::{Arc, Mutex, Weak};
16
17#[derive(Debug, Clone)]
18pub struct PoolStats {
19 pub hits: u64,
20 pub misses: u64,
21 pub buckets: Vec<BucketStats>,
22}
23
24#[derive(Debug, Clone)]
25pub struct BucketStats {
26 pub bucket_size: usize,
27 pub available: usize,
28 pub max_per_bucket: usize,
29}
30
31#[derive(Clone)]
32pub struct PoolHandle<T>(Weak<Mutex<PoolInner<T>>>);
33
34impl<T> PoolHandle<T> {
35 fn upgrade(&self) -> Option<Arc<Mutex<PoolInner<T>>>> {
36 self.0.upgrade()
37 }
38}
39
40struct PoolInner<T> {
41 bucket_sizes: Vec<usize>,
42 max_per_bucket: usize,
43 buckets: Vec<Vec<Vec<T>>>,
44 hits: u64,
45 misses: u64,
46}
47
48impl<T> PoolInner<T> {
49 fn bucket_index_for_min_len(&self, min_len: usize) -> Option<usize> {
50 self.bucket_sizes.iter().position(|&size| size >= min_len)
51 }
52
53 fn bucket_index_for_storage_len(&self, storage_len: usize) -> Option<usize> {
54 self.bucket_sizes.iter().position(|&size| size == storage_len)
55 }
56}
57
58pub struct FramePool<T> {
60 inner: Arc<Mutex<PoolInner<T>>>,
61}
62
63impl<T> Clone for FramePool<T> {
64 fn clone(&self) -> Self {
65 Self { inner: self.inner.clone() }
66 }
67}
68
69impl<T> FramePool<T> {
70 pub fn with_buckets(mut bucket_sizes: Vec<usize>, max_per_bucket: usize) -> Self {
74 bucket_sizes.sort_unstable();
75 bucket_sizes.dedup();
76 let buckets = (0..bucket_sizes.len()).map(|_| Vec::new()).collect();
77 Self {
78 inner: Arc::new(Mutex::new(PoolInner {
79 bucket_sizes,
80 max_per_bucket,
81 buckets,
82 hits: 0,
83 misses: 0,
84 })),
85 }
86 }
87
88 pub fn handle(&self) -> PoolHandle<T> {
89 PoolHandle(Arc::downgrade(&self.inner))
90 }
91
92 pub fn stats(&self) -> PoolStats {
93 let Ok(guard) = self.inner.lock() else {
94 return PoolStats { hits: 0, misses: 0, buckets: Vec::new() };
95 };
96 PoolStats {
97 hits: guard.hits,
98 misses: guard.misses,
99 buckets: guard
100 .bucket_sizes
101 .iter()
102 .enumerate()
103 .map(|(idx, &bucket_size)| BucketStats {
104 bucket_size,
105 available: guard.buckets[idx].len(),
106 max_per_bucket: guard.max_per_bucket,
107 })
108 .collect(),
109 }
110 }
111}
112
113impl<T: Clone + Default> FramePool<T> {
114 pub fn preallocated(bucket_sizes: &[usize], buffers_per_bucket: usize) -> Self {
115 let pool = Self::with_buckets(bucket_sizes.to_vec(), buffers_per_bucket);
116 let Ok(mut guard) = pool.inner.lock() else {
117 return pool;
118 };
119
120 for idx in 0..guard.bucket_sizes.len() {
121 let bucket_size = guard.bucket_sizes[idx];
122 for _ in 0..buffers_per_bucket {
123 guard.buckets[idx].push(vec![T::default(); bucket_size]);
124 }
125 }
126 drop(guard);
127 pool
128 }
129
130 pub fn get(&self, min_len: usize) -> PooledFrameData<T> {
134 let (handle, bucket_idx, bucket_size, maybe_buf) = {
135 let Ok(mut guard) = self.inner.lock() else {
136 return PooledFrameData::from_vec(vec![T::default(); min_len]);
137 };
138 let Some(bucket_idx) = guard.bucket_index_for_min_len(min_len) else {
139 guard.misses += 1;
140 return PooledFrameData::from_vec(vec![T::default(); min_len]);
141 };
142 let bucket_size = guard.bucket_sizes[bucket_idx];
143 let buf = guard.buckets[bucket_idx].pop();
144 if buf.is_some() {
145 guard.hits += 1;
146 } else {
147 guard.misses += 1;
148 }
149 (self.handle(), bucket_idx, bucket_size, buf)
150 };
151
152 let data = maybe_buf.unwrap_or_else(|| vec![T::default(); bucket_size]);
153 PooledFrameData::from_pool(data, min_len, handle, bucket_idx)
154 }
155}
156
157pub struct PooledFrameData<T> {
161 data: Vec<T>,
162 len: usize,
163 pool: Option<PoolHandle<T>>,
164 bucket_idx: Option<usize>,
165}
166
167impl<T: std::fmt::Debug> std::fmt::Debug for PooledFrameData<T> {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 f.debug_struct("PooledFrameData")
170 .field("len", &self.len)
171 .field("storage_len", &self.data.len())
172 .field("pooled", &self.pool.is_some())
173 .finish_non_exhaustive()
174 }
175}
176
177impl<T> PooledFrameData<T> {
178 pub const fn from_vec(data: Vec<T>) -> Self {
179 let len = data.len();
180 Self { data, len, pool: None, bucket_idx: None }
181 }
182
183 fn from_pool(data: Vec<T>, len: usize, pool: PoolHandle<T>, bucket_idx: usize) -> Self {
184 let len = len.min(data.len());
185 Self { data, len, pool: Some(pool), bucket_idx: Some(bucket_idx) }
186 }
187
188 #[must_use]
189 pub const fn len(&self) -> usize {
190 self.len
191 }
192
193 #[must_use]
194 pub const fn is_empty(&self) -> bool {
195 self.len == 0
196 }
197
198 #[must_use]
199 pub const fn storage_len(&self) -> usize {
200 self.data.len()
201 }
202
203 pub fn as_slice(&self) -> &[T] {
204 &self.data[..self.len]
205 }
206
207 pub fn as_mut_slice(&mut self) -> &mut [T] {
208 &mut self.data[..self.len]
209 }
210
211 pub fn truncate(&mut self, new_len: usize) {
213 self.len = new_len.min(self.data.len());
214 }
215
216 pub fn into_vec(mut self) -> Vec<T> {
218 self.pool = None;
219 self.bucket_idx = None;
220 let logical_len = self.len;
221 let mut data = std::mem::take(&mut self.data);
222 data.truncate(logical_len);
223 data
224 }
225}
226
227impl<T: Clone + Default> Clone for PooledFrameData<T> {
228 fn clone(&self) -> Self {
229 if let Some(pool) = &self.pool {
231 if let Some(inner) = pool.upgrade() {
232 if let Ok(mut guard) = inner.lock() {
233 if let Some(bucket_idx) = guard.bucket_index_for_min_len(self.len) {
234 let bucket_size = guard.bucket_sizes[bucket_idx];
235 let mut data = guard
236 .buckets
237 .get_mut(bucket_idx)
238 .and_then(std::vec::Vec::pop)
239 .unwrap_or_else(|| vec![T::default(); bucket_size]);
240 guard.hits += 1;
241
242 data[..self.len].clone_from_slice(self.as_slice());
243 return Self::from_pool(data, self.len, pool.clone(), bucket_idx);
244 }
245 }
246 }
247 }
248
249 Self::from_vec(self.as_slice().to_vec())
250 }
251}
252
253impl<T> Deref for PooledFrameData<T> {
254 type Target = [T];
255
256 fn deref(&self) -> &Self::Target {
257 self.as_slice()
258 }
259}
260
261impl<T> DerefMut for PooledFrameData<T> {
262 fn deref_mut(&mut self) -> &mut Self::Target {
263 self.as_mut_slice()
264 }
265}
266
267impl<T> Drop for PooledFrameData<T> {
268 fn drop(&mut self) {
269 let Some(pool) = self.pool.take() else { return };
270 let Some(bucket_idx) = self.bucket_idx.take() else { return };
271 let Some(inner) = pool.upgrade() else { return };
272 let Ok(mut guard) = inner.lock() else { return };
273
274 let Some(expected_bucket_idx) = guard.bucket_index_for_storage_len(self.data.len()) else {
276 return;
277 };
278 if expected_bucket_idx != bucket_idx {
279 return;
280 }
281
282 if guard.buckets[bucket_idx].len() >= guard.max_per_bucket {
283 return;
284 }
285
286 self.len = self.data.len();
288 guard.buckets[bucket_idx].push(std::mem::take(&mut self.data));
289 }
290}
291
292pub type AudioFramePool = FramePool<f32>;
293pub type PooledSamples = PooledFrameData<f32>;
294
295pub const DEFAULT_AUDIO_BUCKET_SIZES: &[usize] = &[960, 1920, 3840, 7680];
296pub const DEFAULT_AUDIO_BUFFERS_PER_BUCKET: usize = 32;
297
298impl FramePool<f32> {
299 pub fn audio_default() -> Self {
300 Self::preallocated(DEFAULT_AUDIO_BUCKET_SIZES, DEFAULT_AUDIO_BUFFERS_PER_BUCKET)
301 }
302}
303
304#[cfg(test)]
305#[allow(clippy::unwrap_used)]
306mod tests {
307 use super::*;
308
309 #[test]
310 fn returns_to_pool_on_drop() {
311 let pool = FramePool::<u8>::preallocated(&[10], 1);
312 let handle = pool.handle();
313 assert_eq!(pool.stats().buckets[0].available, 1);
314
315 {
316 let mut buf = pool.get(5);
317 assert_eq!(buf.len(), 5);
318 assert_eq!(buf.storage_len(), 10);
319 buf.as_mut_slice().fill(7);
320 assert_eq!(pool.stats().buckets[0].available, 0);
321 drop(handle);
322 }
323
324 assert_eq!(pool.stats().buckets[0].available, 1);
326 }
327
328 #[test]
329 fn clone_prefers_pool_when_available() {
330 let pool = FramePool::<u8>::preallocated(&[4], 2);
331 let mut a = pool.get(3);
332 a.as_mut_slice().copy_from_slice(&[1, 2, 3]);
333 let b = a.clone();
334 assert_eq!(b.as_slice(), &[1, 2, 3]);
335 drop(a);
337 drop(b);
338 assert_eq!(pool.stats().buckets[0].available, 2);
339 }
340}