1use std::sync::{Arc, Mutex, Weak};
19
20pub use ff_common::{FramePool, PooledBuffer};
22
23#[derive(Debug)]
53pub struct SimpleFramePool {
54 max_capacity: usize,
56 buffers: Mutex<Vec<Vec<u8>>>,
58 self_ref: Mutex<Weak<Self>>,
60}
61
62impl SimpleFramePool {
63 #[must_use]
92 pub fn new(max_capacity: usize) -> Arc<Self> {
93 let pool = Arc::new(Self {
94 max_capacity,
95 buffers: Mutex::new(Vec::with_capacity(max_capacity)),
96 self_ref: Mutex::new(Weak::new()),
97 });
98
99 if let Ok(mut self_ref) = pool.self_ref.lock() {
101 *self_ref = Arc::downgrade(&pool);
102 }
103
104 pool
105 }
106
107 #[must_use]
109 pub fn max_capacity(&self) -> usize {
110 self.max_capacity
111 }
112
113 #[must_use]
124 pub fn available(&self) -> usize {
125 self.buffers.lock().map_or(0, |b| b.len())
126 }
127}
128
129impl FramePool for SimpleFramePool {
130 fn acquire(&self, size: usize) -> Option<PooledBuffer> {
131 if let Ok(mut buffers) = self.buffers.lock() {
133 let suitable_idx = buffers
136 .iter()
137 .enumerate()
138 .filter(|(_, b)| b.capacity() >= size)
139 .min_by_key(|(_, b)| b.capacity())
140 .map(|(idx, _)| idx);
141
142 if let Some(idx) = suitable_idx {
143 let mut buf = buffers.swap_remove(idx);
144
145 buf.resize(size, 0);
147
148 buf.fill(0);
151
152 let weak_ref = self
154 .self_ref
155 .lock()
156 .ok()
157 .and_then(|r| r.upgrade())
158 .map(|arc| Arc::downgrade(&(arc as Arc<dyn FramePool>)))?;
159
160 return Some(PooledBuffer::new(buf, weak_ref));
162 }
163 }
164
165 None
168 }
169
170 fn release(&self, buffer: Vec<u8>) {
171 if let Ok(mut buffers) = self.buffers.lock() {
172 if buffers.len() < self.max_capacity {
174 buffers.push(buffer);
175 }
176 }
178 }
179}
180
181#[cfg(test)]
182#[allow(clippy::panic)]
183mod tests {
184 use super::*;
185 use std::sync::{Arc, Mutex, atomic::AtomicUsize, atomic::Ordering};
186
187 #[derive(Debug)]
188 struct TestPool {
189 buffers: Mutex<Vec<Vec<u8>>>,
190 acquire_count: AtomicUsize,
191 release_count: AtomicUsize,
192 }
193
194 impl TestPool {
195 fn new(count: usize, size: usize) -> Self {
196 let buffers: Vec<Vec<u8>> = (0..count).map(|_| vec![0u8; size]).collect();
197 Self {
198 buffers: Mutex::new(buffers),
199 acquire_count: AtomicUsize::new(0),
200 release_count: AtomicUsize::new(0),
201 }
202 }
203 }
204
205 impl FramePool for TestPool {
206 fn acquire(&self, size: usize) -> Option<PooledBuffer> {
207 let mut buffers = self.buffers.lock().ok()?;
208 if let Some(idx) = buffers.iter().position(|b| b.len() >= size) {
210 let buf = buffers.swap_remove(idx);
211 self.acquire_count.fetch_add(1, Ordering::SeqCst);
212 Some(PooledBuffer::standalone(buf))
215 } else {
216 None
217 }
218 }
219
220 fn release(&self, buffer: Vec<u8>) {
221 if let Ok(mut buffers) = self.buffers.lock() {
222 buffers.push(buffer);
223 self.release_count.fetch_add(1, Ordering::SeqCst);
224 }
225 }
226 }
227
228 #[test]
229 fn test_pooled_buffer_standalone() {
230 let data = vec![1u8, 2, 3, 4, 5];
231 let buffer = PooledBuffer::standalone(data.clone());
232
233 assert_eq!(buffer.len(), 5);
234 assert!(!buffer.is_empty());
235 assert_eq!(buffer.data(), &[1, 2, 3, 4, 5]);
236 }
237
238 #[test]
239 fn test_pooled_buffer_data_mut() {
240 let mut buffer = PooledBuffer::standalone(vec![0u8; 4]);
241 buffer.data_mut()[0] = 42;
242 assert_eq!(buffer.data()[0], 42);
243 }
244
245 #[test]
246 fn test_pooled_buffer_into_inner() {
247 let buffer = PooledBuffer::standalone(vec![1, 2, 3]);
248 let inner = buffer.into_inner();
249 assert_eq!(inner, vec![1, 2, 3]);
250 }
251
252 #[test]
253 fn test_pooled_buffer_as_ref() {
254 let buffer = PooledBuffer::standalone(vec![1, 2, 3]);
255 let slice: &[u8] = buffer.as_ref();
256 assert_eq!(slice, &[1, 2, 3]);
257 }
258
259 #[test]
260 fn test_pooled_buffer_as_mut() {
261 let mut buffer = PooledBuffer::standalone(vec![1, 2, 3]);
262 let slice: &mut [u8] = buffer.as_mut();
263 slice[0] = 99;
264 assert_eq!(buffer.data(), &[99, 2, 3]);
265 }
266
267 #[test]
268 fn test_empty_buffer() {
269 let buffer = PooledBuffer::standalone(vec![]);
270 assert!(buffer.is_empty());
271 assert_eq!(buffer.len(), 0);
272 }
273
274 #[test]
275 fn test_pool_acquire() {
276 let pool = TestPool::new(2, 1024);
277 let buffer = pool.acquire(512);
278 assert!(buffer.is_some());
279 assert!(buffer.as_ref().is_some_and(|b| b.len() >= 512));
280 }
281
282 #[test]
283 fn test_pool_acquire_too_large() {
284 let pool = TestPool::new(2, 512);
285 let buffer = pool.acquire(1024);
286 assert!(buffer.is_none());
287 }
288
289 #[test]
290 fn test_pool_with_arc_release() {
291 #[derive(Debug)]
292 struct ArcPool {
293 buffers: Mutex<Vec<Vec<u8>>>,
294 release_count: AtomicUsize,
295 }
296
297 impl FramePool for ArcPool {
298 fn acquire(&self, _size: usize) -> Option<PooledBuffer> {
299 None }
301
302 fn release(&self, buffer: Vec<u8>) {
303 if let Ok(mut buffers) = self.buffers.lock() {
304 buffers.push(buffer);
305 self.release_count.fetch_add(1, Ordering::SeqCst);
306 }
307 }
308 }
309
310 let pool = Arc::new(ArcPool {
311 buffers: Mutex::new(vec![]),
312 release_count: AtomicUsize::new(0),
313 });
314
315 {
317 let _buffer =
318 PooledBuffer::new(vec![1, 2, 3], Arc::downgrade(&pool) as Weak<dyn FramePool>);
319 }
321
322 assert_eq!(pool.release_count.load(Ordering::SeqCst), 1);
324 assert!(pool.buffers.lock().map(|b| b.len() == 1).unwrap_or(false));
325 }
326
327 #[test]
328 fn test_pool_dropped_before_buffer() {
329 #[derive(Debug)]
330 struct DroppablePool;
331
332 impl FramePool for DroppablePool {
333 fn acquire(&self, _size: usize) -> Option<PooledBuffer> {
334 None
335 }
336
337 fn release(&self, _buffer: Vec<u8>) {
338 panic!("release should not be called on dropped pool");
340 }
341 }
342
343 let buffer;
344 {
345 let pool = Arc::new(DroppablePool);
346 buffer = PooledBuffer::new(vec![1, 2, 3], Arc::downgrade(&pool) as Weak<dyn FramePool>);
347 }
349
350 assert_eq!(buffer.data(), &[1, 2, 3]);
352
353 drop(buffer);
355 }
356
357 #[test]
359 fn test_simple_frame_pool_new() {
360 let pool = SimpleFramePool::new(32);
361 assert_eq!(pool.max_capacity(), 32);
362 assert_eq!(pool.available(), 0);
363 }
364
365 #[test]
366 fn test_simple_frame_pool_acquire_empty() {
367 let pool = SimpleFramePool::new(8);
368
369 let buffer = pool.acquire(1024);
371 assert!(buffer.is_none());
372 }
373
374 #[test]
375 fn test_simple_frame_pool_acquire_and_release() {
376 let pool = SimpleFramePool::new(8);
377
378 pool.release(vec![0u8; 1024]);
380 assert_eq!(pool.available(), 1);
381
382 let buffer = pool.acquire(512);
384 assert!(buffer.is_some());
385 assert_eq!(pool.available(), 0);
386
387 let buffer = buffer.unwrap();
388 assert_eq!(buffer.len(), 512);
389 }
390
391 #[test]
392 fn test_simple_frame_pool_buffer_auto_return() {
393 let pool = SimpleFramePool::new(8);
394
395 pool.release(vec![0u8; 2048]);
397 assert_eq!(pool.available(), 1);
398
399 {
400 let _buffer = pool.acquire(1024).unwrap();
401 assert_eq!(pool.available(), 0);
402 }
404
405 assert_eq!(pool.available(), 1);
407 }
408
409 #[test]
410 fn test_simple_frame_pool_max_capacity() {
411 let pool = SimpleFramePool::new(2);
412
413 pool.release(vec![0u8; 512]);
415 pool.release(vec![0u8; 512]);
416 pool.release(vec![0u8; 512]);
417
418 assert_eq!(pool.available(), 2);
420 }
421
422 #[test]
423 fn test_simple_frame_pool_buffer_reuse() {
424 let pool = SimpleFramePool::new(4);
425
426 pool.release(vec![42u8; 1024]);
428 assert_eq!(pool.available(), 1);
429
430 let buffer = pool.acquire(512).unwrap();
432 assert_eq!(buffer.len(), 512);
433 assert!(buffer.data().iter().all(|&b| b == 0)); drop(buffer);
436 assert_eq!(pool.available(), 1);
437
438 let buffer = pool.acquire(512).unwrap();
440 assert_eq!(buffer.len(), 512);
441 assert!(buffer.data().iter().all(|&b| b == 0));
442
443 drop(buffer);
444 assert_eq!(pool.available(), 1);
445
446 let buffer = pool.acquire(1024).unwrap();
448 assert_eq!(buffer.len(), 1024);
449 }
450
451 #[test]
452 fn test_simple_frame_pool_find_suitable_buffer() {
453 let pool = SimpleFramePool::new(8);
454
455 pool.release(vec![0u8; 512]);
457 pool.release(vec![0u8; 1024]);
458 pool.release(vec![0u8; 2048]);
459 assert_eq!(pool.available(), 3);
460
461 let buffer = pool.acquire(1000).unwrap();
463 assert!(buffer.len() >= 1000);
464 assert_eq!(pool.available(), 2);
465
466 drop(buffer);
467 assert_eq!(pool.available(), 3);
468 }
469
470 #[test]
471 fn test_simple_frame_pool_acquire_too_large() {
472 let pool = SimpleFramePool::new(4);
473
474 pool.release(vec![0u8; 512]);
476
477 let buffer = pool.acquire(1024);
479 assert!(buffer.is_none());
480
481 assert_eq!(pool.available(), 1);
483 }
484
485 #[test]
486 fn test_simple_frame_pool_concurrent_access() {
487 use std::thread;
488
489 let pool = SimpleFramePool::new(16);
490
491 for _ in 0..8 {
493 pool.release(vec![0u8; 1024]);
494 }
495
496 let pool1 = Arc::clone(&pool);
497 let pool2 = Arc::clone(&pool);
498
499 let handle1 = thread::spawn(move || {
500 for _ in 0..10 {
501 if let Some(buffer) = pool1.acquire(512) {
502 drop(buffer);
503 }
504 }
505 });
506
507 let handle2 = thread::spawn(move || {
508 for _ in 0..10 {
509 if let Some(buffer) = pool2.acquire(512) {
510 drop(buffer);
511 }
512 }
513 });
514
515 handle1.join().unwrap();
516 handle2.join().unwrap();
517
518 assert!(pool.available() <= 16);
520 }
521}