use std::sync::{Arc, Mutex, Weak};
pub trait FramePool: Send + Sync + std::fmt::Debug {
fn acquire(&self, size: usize) -> Option<PooledBuffer>;
fn release(&self, _buffer: Vec<u8>) {
}
}
#[derive(Debug)]
pub struct PooledBuffer {
data: Vec<u8>,
pool: Option<Weak<dyn FramePool>>,
}
impl PooledBuffer {
#[must_use]
pub fn new(data: Vec<u8>, pool: Weak<dyn FramePool>) -> Self {
Self {
data,
pool: Some(pool),
}
}
#[must_use]
pub fn standalone(data: Vec<u8>) -> Self {
Self { data, pool: None }
}
#[must_use]
#[inline]
pub fn data(&self) -> &[u8] {
&self.data
}
#[must_use]
#[inline]
pub fn data_mut(&mut self) -> &mut [u8] {
&mut self.data
}
#[must_use]
#[inline]
pub fn len(&self) -> usize {
self.data.len()
}
#[must_use]
#[inline]
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
#[must_use]
pub fn into_inner(mut self) -> Vec<u8> {
self.pool = None;
std::mem::take(&mut self.data)
}
}
impl Clone for PooledBuffer {
fn clone(&self) -> Self {
Self {
data: self.data.clone(),
pool: None, }
}
}
impl Drop for PooledBuffer {
fn drop(&mut self) {
if let Some(ref weak_pool) = self.pool
&& let Some(pool) = weak_pool.upgrade()
{
let data = std::mem::take(&mut self.data);
pool.release(data);
}
}
}
impl AsRef<[u8]> for PooledBuffer {
fn as_ref(&self) -> &[u8] {
&self.data
}
}
impl AsMut<[u8]> for PooledBuffer {
fn as_mut(&mut self) -> &mut [u8] {
&mut self.data
}
}
#[derive(Debug)]
pub struct VecPool {
buffers: Mutex<Vec<Vec<u8>>>,
capacity: usize,
self_ref: Mutex<Weak<Self>>,
}
impl VecPool {
#[must_use]
pub fn new(capacity: usize) -> Arc<Self> {
let pool = Arc::new(Self {
buffers: Mutex::new(Vec::with_capacity(capacity)),
capacity,
self_ref: Mutex::new(Weak::new()),
});
if let Ok(mut r) = pool.self_ref.lock() {
*r = Arc::downgrade(&pool);
}
pool
}
#[must_use]
pub fn capacity(&self) -> usize {
self.capacity
}
#[must_use]
pub fn available(&self) -> usize {
self.buffers.lock().map_or(0, |b| b.len())
}
}
impl FramePool for VecPool {
fn acquire(&self, size: usize) -> Option<PooledBuffer> {
if let Ok(mut buffers) = self.buffers.lock() {
let suitable_idx = buffers
.iter()
.enumerate()
.filter(|(_, b)| b.capacity() >= size)
.min_by_key(|(_, b)| b.capacity())
.map(|(idx, _)| idx);
if let Some(idx) = suitable_idx {
let mut buf = buffers.swap_remove(idx);
buf.resize(size, 0);
buf.fill(0);
let weak_ref = self
.self_ref
.lock()
.ok()
.and_then(|r| r.upgrade())
.map(|arc| Arc::downgrade(&(arc as Arc<dyn FramePool>)))?;
return Some(PooledBuffer::new(buf, weak_ref));
}
}
None
}
fn release(&self, buffer: Vec<u8>) {
if let Ok(mut buffers) = self.buffers.lock()
&& buffers.len() < self.capacity
{
buffers.push(buffer);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
#[test]
fn test_pooled_buffer_standalone() {
let data = vec![1u8, 2, 3, 4, 5];
let buffer = PooledBuffer::standalone(data.clone());
assert_eq!(buffer.len(), 5);
assert!(!buffer.is_empty());
assert_eq!(buffer.data(), &[1, 2, 3, 4, 5]);
}
#[test]
fn test_pooled_buffer_data_mut() {
let mut buffer = PooledBuffer::standalone(vec![0u8; 4]);
buffer.data_mut()[0] = 42;
assert_eq!(buffer.data()[0], 42);
}
#[test]
fn test_pooled_buffer_into_inner() {
let buffer = PooledBuffer::standalone(vec![1, 2, 3]);
let inner = buffer.into_inner();
assert_eq!(inner, vec![1, 2, 3]);
}
#[test]
fn test_pooled_buffer_as_ref() {
let buffer = PooledBuffer::standalone(vec![1, 2, 3]);
let slice: &[u8] = buffer.as_ref();
assert_eq!(slice, &[1, 2, 3]);
}
#[test]
fn test_pooled_buffer_as_mut() {
let mut buffer = PooledBuffer::standalone(vec![1, 2, 3]);
let slice: &mut [u8] = buffer.as_mut();
slice[0] = 99;
assert_eq!(buffer.data(), &[99, 2, 3]);
}
#[test]
fn test_empty_buffer() {
let buffer = PooledBuffer::standalone(vec![]);
assert!(buffer.is_empty());
assert_eq!(buffer.len(), 0);
}
#[test]
fn test_pool_with_arc_release() {
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
struct ArcPool {
buffers: Mutex<Vec<Vec<u8>>>,
release_count: AtomicUsize,
}
impl FramePool for ArcPool {
fn acquire(&self, _size: usize) -> Option<PooledBuffer> {
None }
fn release(&self, buffer: Vec<u8>) {
if let Ok(mut buffers) = self.buffers.lock() {
buffers.push(buffer);
self.release_count.fetch_add(1, Ordering::SeqCst);
}
}
}
let pool = Arc::new(ArcPool {
buffers: Mutex::new(vec![]),
release_count: AtomicUsize::new(0),
});
{
let _buffer =
PooledBuffer::new(vec![1, 2, 3], Arc::downgrade(&pool) as Weak<dyn FramePool>);
}
assert_eq!(pool.release_count.load(Ordering::SeqCst), 1);
assert!(pool.buffers.lock().map(|b| b.len() == 1).unwrap_or(false));
}
#[test]
fn test_pool_dropped_before_buffer() {
#[derive(Debug)]
struct DroppablePool;
impl FramePool for DroppablePool {
fn acquire(&self, _size: usize) -> Option<PooledBuffer> {
None
}
fn release(&self, _buffer: Vec<u8>) {
panic!("release should not be called on dropped pool");
}
}
let buffer;
{
let pool = Arc::new(DroppablePool);
buffer = PooledBuffer::new(vec![1, 2, 3], Arc::downgrade(&pool) as Weak<dyn FramePool>);
}
assert_eq!(buffer.data(), &[1, 2, 3]);
drop(buffer);
}
#[test]
fn test_pooled_buffer_clone_becomes_standalone() {
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
struct CountingPool {
release_count: AtomicUsize,
}
impl FramePool for CountingPool {
fn acquire(&self, _size: usize) -> Option<PooledBuffer> {
None
}
fn release(&self, _buffer: Vec<u8>) {
self.release_count.fetch_add(1, Ordering::SeqCst);
}
}
let pool = Arc::new(CountingPool {
release_count: AtomicUsize::new(0),
});
let buffer1 =
PooledBuffer::new(vec![1, 2, 3], Arc::downgrade(&pool) as Weak<dyn FramePool>);
let buffer2 = buffer1.clone();
assert_eq!(buffer1.data(), &[1, 2, 3]);
assert_eq!(buffer2.data(), &[1, 2, 3]);
drop(buffer1);
drop(buffer2);
assert_eq!(pool.release_count.load(Ordering::SeqCst), 1);
}
#[test]
fn test_pooled_buffer_clone_data_independence() {
let buffer1 = PooledBuffer::standalone(vec![1, 2, 3]);
let mut buffer2 = buffer1.clone();
buffer2.data_mut()[0] = 99;
assert_eq!(buffer1.data(), &[1, 2, 3]);
assert_eq!(buffer2.data(), &[99, 2, 3]);
}
#[test]
fn pooled_buffer_clone_should_be_independent_of_source() {
let mut original = PooledBuffer::standalone(vec![1, 2, 3]);
let clone = original.clone();
original.data_mut()[0] = 99;
assert_eq!(clone.data(), &[1, 2, 3]);
assert_eq!(original.data(), &[99, 2, 3]);
}
#[test]
fn vec_pool_should_start_empty() {
let pool = VecPool::new(32);
assert_eq!(pool.capacity(), 32);
assert_eq!(pool.available(), 0);
}
#[test]
fn vec_pool_acquire_should_return_none_when_empty() {
let pool = VecPool::new(8);
assert!(pool.acquire(1024).is_none());
}
#[test]
fn vec_pool_release_then_acquire_should_reuse_buffer() {
let pool = VecPool::new(8);
pool.release(vec![0u8; 1024]);
assert_eq!(pool.available(), 1);
let buf = pool.acquire(512).unwrap();
assert_eq!(buf.len(), 512);
assert_eq!(pool.available(), 0);
}
#[test]
fn vec_pool_buffer_should_auto_return_on_drop() {
let pool = VecPool::new(8);
pool.release(vec![0u8; 2048]);
{
let _buf = pool.acquire(1024).unwrap();
assert_eq!(pool.available(), 0);
}
assert_eq!(pool.available(), 1);
}
#[test]
fn vec_pool_should_not_exceed_capacity() {
let pool = VecPool::new(2);
pool.release(vec![0u8; 512]);
pool.release(vec![0u8; 512]);
pool.release(vec![0u8; 512]);
assert_eq!(pool.available(), 2);
}
#[test]
fn vec_pool_acquire_should_choose_smallest_fitting_buffer() {
let pool = VecPool::new(8);
pool.release(vec![0u8; 512]);
pool.release(vec![0u8; 1024]);
pool.release(vec![0u8; 2048]);
let buf = pool.acquire(1000).unwrap();
assert!(buf.len() >= 1000);
assert_eq!(pool.available(), 2);
}
#[test]
fn vec_pool_acquire_should_return_none_when_no_suitable_buffer() {
let pool = VecPool::new(4);
pool.release(vec![0u8; 512]);
assert!(pool.acquire(1024).is_none());
assert_eq!(pool.available(), 1);
}
#[test]
fn vec_pool_acquired_buffer_should_return_on_drop_when_pool_was_empty() {
let pool = VecPool::new(4);
assert_eq!(pool.available(), 0);
assert!(pool.acquire(1024).is_none());
let pool_dyn: Arc<dyn FramePool> = Arc::clone(&pool) as Arc<dyn FramePool>;
let buf = PooledBuffer::new(vec![0u8; 1024], Arc::downgrade(&pool_dyn));
drop(buf);
assert_eq!(pool.available(), 1);
}
#[test]
fn vec_pool_should_grow_from_zero_via_connected_alloc() {
let pool = VecPool::new(8);
let pool_dyn: Arc<dyn FramePool> = Arc::clone(&pool) as Arc<dyn FramePool>;
let b1 = PooledBuffer::new(vec![0u8; 1024], Arc::downgrade(&pool_dyn));
let b2 = PooledBuffer::new(vec![0u8; 1024], Arc::downgrade(&pool_dyn));
let b3 = PooledBuffer::new(vec![0u8; 1024], Arc::downgrade(&pool_dyn));
assert_eq!(pool.available(), 0);
drop(b1);
drop(b2);
drop(b3);
assert_eq!(pool.available(), 3);
let buf = pool.acquire(512);
assert!(buf.is_some());
assert_eq!(pool.available(), 2);
}
}