1use std::sync::{Arc, Mutex, Weak};
9
10pub trait FramePool: Send + Sync + std::fmt::Debug {
39 fn acquire(&self, size: usize) -> Option<PooledBuffer>;
55
56 fn release(&self, _buffer: Vec<u8>) {
65 }
67}
68
69#[derive(Debug)]
87pub struct PooledBuffer {
88 data: Vec<u8>,
90 pool: Option<Weak<dyn FramePool>>,
92}
93
94impl PooledBuffer {
95 #[must_use]
102 pub fn new(data: Vec<u8>, pool: Weak<dyn FramePool>) -> Self {
103 Self {
104 data,
105 pool: Some(pool),
106 }
107 }
108
109 #[must_use]
114 pub fn standalone(data: Vec<u8>) -> Self {
115 Self { data, pool: None }
116 }
117
118 #[must_use]
120 #[inline]
121 pub fn data(&self) -> &[u8] {
122 &self.data
123 }
124
125 #[must_use]
127 #[inline]
128 pub fn data_mut(&mut self) -> &mut [u8] {
129 &mut self.data
130 }
131
132 #[must_use]
134 #[inline]
135 pub fn len(&self) -> usize {
136 self.data.len()
137 }
138
139 #[must_use]
141 #[inline]
142 pub fn is_empty(&self) -> bool {
143 self.data.is_empty()
144 }
145
146 #[must_use]
150 pub fn into_inner(mut self) -> Vec<u8> {
151 self.pool = None;
153 std::mem::take(&mut self.data)
154 }
155}
156
157impl Clone for PooledBuffer {
158 fn clone(&self) -> Self {
166 Self {
167 data: self.data.clone(),
168 pool: None, }
170 }
171}
172
173impl Drop for PooledBuffer {
174 fn drop(&mut self) {
175 if let Some(ref weak_pool) = self.pool
176 && let Some(pool) = weak_pool.upgrade()
177 {
178 let data = std::mem::take(&mut self.data);
180 pool.release(data);
181 }
182 }
184}
185
186impl AsRef<[u8]> for PooledBuffer {
187 fn as_ref(&self) -> &[u8] {
188 &self.data
189 }
190}
191
192impl AsMut<[u8]> for PooledBuffer {
193 fn as_mut(&mut self) -> &mut [u8] {
194 &mut self.data
195 }
196}
197
198#[derive(Debug)]
218pub struct VecPool {
219 buffers: Mutex<Vec<Vec<u8>>>,
221 capacity: usize,
223 self_ref: Mutex<Weak<Self>>,
225}
226
227impl VecPool {
228 #[must_use]
234 pub fn new(capacity: usize) -> Arc<Self> {
235 let pool = Arc::new(Self {
236 buffers: Mutex::new(Vec::with_capacity(capacity)),
237 capacity,
238 self_ref: Mutex::new(Weak::new()),
239 });
240 if let Ok(mut r) = pool.self_ref.lock() {
241 *r = Arc::downgrade(&pool);
242 }
243 pool
244 }
245
246 #[must_use]
248 pub fn capacity(&self) -> usize {
249 self.capacity
250 }
251
252 #[must_use]
263 pub fn available(&self) -> usize {
264 self.buffers.lock().map_or(0, |b| b.len())
265 }
266}
267
268impl FramePool for VecPool {
269 fn acquire(&self, size: usize) -> Option<PooledBuffer> {
270 if let Ok(mut buffers) = self.buffers.lock() {
271 let suitable_idx = buffers
272 .iter()
273 .enumerate()
274 .filter(|(_, b)| b.capacity() >= size)
275 .min_by_key(|(_, b)| b.capacity())
276 .map(|(idx, _)| idx);
277
278 if let Some(idx) = suitable_idx {
279 let mut buf = buffers.swap_remove(idx);
280 buf.resize(size, 0);
281 buf.fill(0);
282
283 let weak_ref = self
284 .self_ref
285 .lock()
286 .ok()
287 .and_then(|r| r.upgrade())
288 .map(|arc| Arc::downgrade(&(arc as Arc<dyn FramePool>)))?;
289
290 return Some(PooledBuffer::new(buf, weak_ref));
291 }
292 }
293 None
294 }
295
296 fn release(&self, buffer: Vec<u8>) {
297 if let Ok(mut buffers) = self.buffers.lock()
298 && buffers.len() < self.capacity
299 {
300 buffers.push(buffer);
301 }
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use super::*;
308 use std::sync::Arc;
309
310 #[test]
311 fn test_pooled_buffer_standalone() {
312 let data = vec![1u8, 2, 3, 4, 5];
313 let buffer = PooledBuffer::standalone(data.clone());
314
315 assert_eq!(buffer.len(), 5);
316 assert!(!buffer.is_empty());
317 assert_eq!(buffer.data(), &[1, 2, 3, 4, 5]);
318 }
319
320 #[test]
321 fn test_pooled_buffer_data_mut() {
322 let mut buffer = PooledBuffer::standalone(vec![0u8; 4]);
323 buffer.data_mut()[0] = 42;
324 assert_eq!(buffer.data()[0], 42);
325 }
326
327 #[test]
328 fn test_pooled_buffer_into_inner() {
329 let buffer = PooledBuffer::standalone(vec![1, 2, 3]);
330 let inner = buffer.into_inner();
331 assert_eq!(inner, vec![1, 2, 3]);
332 }
333
334 #[test]
335 fn test_pooled_buffer_as_ref() {
336 let buffer = PooledBuffer::standalone(vec![1, 2, 3]);
337 let slice: &[u8] = buffer.as_ref();
338 assert_eq!(slice, &[1, 2, 3]);
339 }
340
341 #[test]
342 fn test_pooled_buffer_as_mut() {
343 let mut buffer = PooledBuffer::standalone(vec![1, 2, 3]);
344 let slice: &mut [u8] = buffer.as_mut();
345 slice[0] = 99;
346 assert_eq!(buffer.data(), &[99, 2, 3]);
347 }
348
349 #[test]
350 fn test_empty_buffer() {
351 let buffer = PooledBuffer::standalone(vec![]);
352 assert!(buffer.is_empty());
353 assert_eq!(buffer.len(), 0);
354 }
355
356 #[test]
357 fn test_pool_with_arc_release() {
358 use std::sync::Mutex;
359 use std::sync::atomic::{AtomicUsize, Ordering};
360
361 #[derive(Debug)]
362 struct ArcPool {
363 buffers: Mutex<Vec<Vec<u8>>>,
364 release_count: AtomicUsize,
365 }
366
367 impl FramePool for ArcPool {
368 fn acquire(&self, _size: usize) -> Option<PooledBuffer> {
369 None }
371
372 fn release(&self, buffer: Vec<u8>) {
373 if let Ok(mut buffers) = self.buffers.lock() {
374 buffers.push(buffer);
375 self.release_count.fetch_add(1, Ordering::SeqCst);
376 }
377 }
378 }
379
380 let pool = Arc::new(ArcPool {
381 buffers: Mutex::new(vec![]),
382 release_count: AtomicUsize::new(0),
383 });
384
385 {
387 let _buffer =
388 PooledBuffer::new(vec![1, 2, 3], Arc::downgrade(&pool) as Weak<dyn FramePool>);
389 }
391
392 assert_eq!(pool.release_count.load(Ordering::SeqCst), 1);
394 assert!(pool.buffers.lock().map(|b| b.len() == 1).unwrap_or(false));
395 }
396
397 #[test]
398 fn test_pool_dropped_before_buffer() {
399 #[derive(Debug)]
400 struct DroppablePool;
401
402 impl FramePool for DroppablePool {
403 fn acquire(&self, _size: usize) -> Option<PooledBuffer> {
404 None
405 }
406
407 fn release(&self, _buffer: Vec<u8>) {
408 panic!("release should not be called on dropped pool");
410 }
411 }
412
413 let buffer;
414 {
415 let pool = Arc::new(DroppablePool);
416 buffer = PooledBuffer::new(vec![1, 2, 3], Arc::downgrade(&pool) as Weak<dyn FramePool>);
417 }
419
420 assert_eq!(buffer.data(), &[1, 2, 3]);
422
423 drop(buffer);
425 }
426
427 #[test]
428 fn test_pooled_buffer_clone_becomes_standalone() {
429 use std::sync::atomic::{AtomicUsize, Ordering};
430
431 #[derive(Debug)]
432 struct CountingPool {
433 release_count: AtomicUsize,
434 }
435
436 impl FramePool for CountingPool {
437 fn acquire(&self, _size: usize) -> Option<PooledBuffer> {
438 None
439 }
440
441 fn release(&self, _buffer: Vec<u8>) {
442 self.release_count.fetch_add(1, Ordering::SeqCst);
443 }
444 }
445
446 let pool = Arc::new(CountingPool {
447 release_count: AtomicUsize::new(0),
448 });
449
450 let buffer1 =
452 PooledBuffer::new(vec![1, 2, 3], Arc::downgrade(&pool) as Weak<dyn FramePool>);
453
454 let buffer2 = buffer1.clone();
456
457 assert_eq!(buffer1.data(), &[1, 2, 3]);
459 assert_eq!(buffer2.data(), &[1, 2, 3]);
460
461 drop(buffer1);
463 drop(buffer2);
464
465 assert_eq!(pool.release_count.load(Ordering::SeqCst), 1);
468 }
469
470 #[test]
471 fn test_pooled_buffer_clone_data_independence() {
472 let buffer1 = PooledBuffer::standalone(vec![1, 2, 3]);
473 let mut buffer2 = buffer1.clone();
474
475 buffer2.data_mut()[0] = 99;
477
478 assert_eq!(buffer1.data(), &[1, 2, 3]);
480 assert_eq!(buffer2.data(), &[99, 2, 3]);
481 }
482
483 #[test]
486 fn vec_pool_should_start_empty() {
487 let pool = VecPool::new(32);
488 assert_eq!(pool.capacity(), 32);
489 assert_eq!(pool.available(), 0);
490 }
491
492 #[test]
493 fn vec_pool_acquire_should_return_none_when_empty() {
494 let pool = VecPool::new(8);
495 assert!(pool.acquire(1024).is_none());
496 }
497
498 #[test]
499 fn vec_pool_release_then_acquire_should_reuse_buffer() {
500 let pool = VecPool::new(8);
501
502 pool.release(vec![0u8; 1024]);
503 assert_eq!(pool.available(), 1);
504
505 let buf = pool.acquire(512).unwrap();
506 assert_eq!(buf.len(), 512);
507 assert_eq!(pool.available(), 0);
508 }
509
510 #[test]
511 fn vec_pool_buffer_should_auto_return_on_drop() {
512 let pool = VecPool::new(8);
513 pool.release(vec![0u8; 2048]);
514
515 {
516 let _buf = pool.acquire(1024).unwrap();
517 assert_eq!(pool.available(), 0);
518 }
519
520 assert_eq!(pool.available(), 1);
521 }
522
523 #[test]
524 fn vec_pool_should_not_exceed_capacity() {
525 let pool = VecPool::new(2);
526
527 pool.release(vec![0u8; 512]);
528 pool.release(vec![0u8; 512]);
529 pool.release(vec![0u8; 512]); assert_eq!(pool.available(), 2);
532 }
533
534 #[test]
535 fn vec_pool_acquire_should_choose_smallest_fitting_buffer() {
536 let pool = VecPool::new(8);
537 pool.release(vec![0u8; 512]);
538 pool.release(vec![0u8; 1024]);
539 pool.release(vec![0u8; 2048]);
540
541 let buf = pool.acquire(1000).unwrap();
543 assert!(buf.len() >= 1000);
544 assert_eq!(pool.available(), 2);
545 }
546
547 #[test]
548 fn vec_pool_acquire_should_return_none_when_no_suitable_buffer() {
549 let pool = VecPool::new(4);
550 pool.release(vec![0u8; 512]);
551
552 assert!(pool.acquire(1024).is_none());
553 assert_eq!(pool.available(), 1);
555 }
556
557 #[test]
558 fn vec_pool_acquired_buffer_should_return_on_drop_when_pool_was_empty() {
559 let pool = VecPool::new(4);
560 assert_eq!(pool.available(), 0);
561 assert!(pool.acquire(1024).is_none());
562
563 let pool_dyn: Arc<dyn FramePool> = Arc::clone(&pool) as Arc<dyn FramePool>;
565 let buf = PooledBuffer::new(vec![0u8; 1024], Arc::downgrade(&pool_dyn));
566 drop(buf);
567
568 assert_eq!(pool.available(), 1);
569 }
570
571 #[test]
572 fn vec_pool_should_grow_from_zero_via_connected_alloc() {
573 let pool = VecPool::new(8);
574 let pool_dyn: Arc<dyn FramePool> = Arc::clone(&pool) as Arc<dyn FramePool>;
575
576 let b1 = PooledBuffer::new(vec![0u8; 1024], Arc::downgrade(&pool_dyn));
578 let b2 = PooledBuffer::new(vec![0u8; 1024], Arc::downgrade(&pool_dyn));
579 let b3 = PooledBuffer::new(vec![0u8; 1024], Arc::downgrade(&pool_dyn));
580 assert_eq!(pool.available(), 0);
581
582 drop(b1);
583 drop(b2);
584 drop(b3);
585
586 assert_eq!(pool.available(), 3);
587
588 let buf = pool.acquire(512);
590 assert!(buf.is_some());
591 assert_eq!(pool.available(), 2);
592 }
593}