use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex};
pub struct BufferPool {
pool: Arc<Mutex<BufferPoolInner>>,
buffer_size: usize,
max_pool_size: usize,
}
struct BufferPoolInner {
free_buffers: Vec<Vec<u8>>,
stats: PoolStats,
}
impl BufferPool {
pub fn new(buffer_size: usize, max_pool_size: usize) -> Self {
Self {
pool: Arc::new(Mutex::new(BufferPoolInner {
free_buffers: Vec::with_capacity(max_pool_size),
stats: PoolStats::default(),
})),
buffer_size,
max_pool_size,
}
}
pub fn acquire(&self) -> PooledBuffer {
let mut inner = self.pool.lock().unwrap();
let buffer = if let Some(mut buf) = inner.free_buffers.pop() {
buf.clear();
inner.stats.reuses += 1;
buf
} else {
inner.stats.allocations += 1;
Vec::with_capacity(self.buffer_size)
};
inner.stats.in_use += 1;
inner.stats.peak_in_use = inner.stats.peak_in_use.max(inner.stats.in_use);
PooledBuffer {
buffer: Some(buffer),
pool: self.pool.clone(),
max_pool_size: self.max_pool_size,
}
}
pub fn stats(&self) -> PoolStats {
let inner = self.pool.lock().unwrap();
PoolStats {
pool_size: inner.free_buffers.len(),
in_use: inner.stats.in_use,
peak_in_use: inner.stats.peak_in_use,
allocations: inner.stats.allocations,
reuses: inner.stats.reuses,
}
}
pub fn buffer_size(&self) -> usize {
self.buffer_size
}
pub fn max_pool_size(&self) -> usize {
self.max_pool_size
}
pub fn clear(&self) {
let mut inner = self.pool.lock().unwrap();
inner.free_buffers.clear();
}
pub fn estimated_memory_bytes(&self) -> usize {
let stats = self.stats();
(stats.pool_size + stats.in_use) * self.buffer_size
}
}
impl Default for BufferPool {
fn default() -> Self {
Self::new(64 * 1024, 10)
}
}
pub struct PooledBuffer {
buffer: Option<Vec<u8>>,
pool: Arc<Mutex<BufferPoolInner>>,
max_pool_size: usize,
}
impl PooledBuffer {
pub fn capacity(&self) -> usize {
self.buffer.as_ref().map(|b| b.capacity()).unwrap_or(0)
}
pub fn into_inner(mut self) -> Vec<u8> {
let buffer = self.buffer.take().unwrap();
let mut inner = self.pool.lock().unwrap();
inner.stats.in_use -= 1;
buffer
}
}
impl Deref for PooledBuffer {
type Target = Vec<u8>;
fn deref(&self) -> &Self::Target {
self.buffer.as_ref().unwrap()
}
}
impl DerefMut for PooledBuffer {
fn deref_mut(&mut self) -> &mut Self::Target {
self.buffer.as_mut().unwrap()
}
}
impl Drop for PooledBuffer {
fn drop(&mut self) {
if let Some(buffer) = self.buffer.take() {
let mut inner = self.pool.lock().unwrap();
if inner.free_buffers.len() < self.max_pool_size {
inner.free_buffers.push(buffer);
}
inner.stats.in_use -= 1;
}
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct PoolStats {
pub pool_size: usize,
pub in_use: usize,
pub peak_in_use: usize,
pub allocations: usize,
pub reuses: usize,
}
impl PoolStats {
pub fn reuse_rate(&self) -> f64 {
let total = self.allocations + self.reuses;
if total == 0 {
0.0
} else {
self.reuses as f64 / total as f64
}
}
pub fn total_acquires(&self) -> usize {
self.allocations + self.reuses
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pool_creation() {
let pool = BufferPool::new(1024, 5);
assert_eq!(pool.buffer_size(), 1024);
assert_eq!(pool.max_pool_size(), 5);
let stats = pool.stats();
assert_eq!(stats.pool_size, 0);
assert_eq!(stats.in_use, 0);
}
#[test]
fn test_acquire_and_release() {
let pool = BufferPool::new(1024, 5);
{
let buffer = pool.acquire();
assert_eq!(buffer.capacity(), 1024);
let stats = pool.stats();
assert_eq!(stats.in_use, 1);
assert_eq!(stats.allocations, 1);
}
let stats = pool.stats();
assert_eq!(stats.in_use, 0);
assert_eq!(stats.pool_size, 1);
}
#[test]
fn test_buffer_reuse() {
let pool = BufferPool::new(1024, 5);
{
let _buffer = pool.acquire();
}
{
let _buffer = pool.acquire();
}
let stats = pool.stats();
assert_eq!(stats.allocations, 1);
assert_eq!(stats.reuses, 1);
assert_eq!(stats.reuse_rate(), 0.5);
}
#[test]
fn test_max_pool_size() {
let pool = BufferPool::new(1024, 2);
let buffers: Vec<_> = (0..5).map(|_| pool.acquire()).collect();
drop(buffers);
let stats = pool.stats();
assert_eq!(stats.pool_size, 2);
}
#[test]
fn test_concurrent_buffers() {
let pool = BufferPool::new(1024, 5);
let b1 = pool.acquire();
let b2 = pool.acquire();
let b3 = pool.acquire();
let stats = pool.stats();
assert_eq!(stats.in_use, 3);
assert_eq!(stats.peak_in_use, 3);
drop(b1);
drop(b2);
drop(b3);
let stats = pool.stats();
assert_eq!(stats.in_use, 0);
assert_eq!(stats.pool_size, 3);
assert_eq!(stats.peak_in_use, 3);
}
#[test]
fn test_buffer_usage() {
let pool = BufferPool::new(1024, 5);
let mut buffer = pool.acquire();
buffer.extend_from_slice(b"hello");
assert_eq!(buffer.len(), 5);
assert_eq!(&buffer[..], b"hello");
buffer.clear();
assert_eq!(buffer.len(), 0);
}
#[test]
fn test_into_inner() {
let pool = BufferPool::new(1024, 5);
let mut buffer = pool.acquire();
buffer.extend_from_slice(b"data");
let stats = pool.stats();
assert_eq!(stats.in_use, 1);
let inner = buffer.into_inner();
assert_eq!(&inner[..], b"data");
let stats = pool.stats();
assert_eq!(stats.pool_size, 0);
assert_eq!(stats.in_use, 0);
}
#[test]
fn test_clear_pool() {
let pool = BufferPool::new(1024, 5);
{
let _buffers: Vec<_> = (0..3).map(|_| pool.acquire()).collect();
}
let stats = pool.stats();
assert_eq!(stats.pool_size, 3);
pool.clear();
let stats = pool.stats();
assert_eq!(stats.pool_size, 0);
}
#[test]
fn test_estimated_memory() {
let pool = BufferPool::new(1024, 5);
let _b1 = pool.acquire();
let _b2 = pool.acquire();
let memory = pool.estimated_memory_bytes();
assert_eq!(memory, 2 * 1024);
}
#[test]
fn test_default_pool() {
let pool = BufferPool::default();
assert_eq!(pool.buffer_size(), 64 * 1024);
assert_eq!(pool.max_pool_size(), 10);
}
#[test]
fn test_peak_tracking() {
let pool = BufferPool::new(1024, 5);
let b1 = pool.acquire();
let b2 = pool.acquire();
let b3 = pool.acquire();
let stats = pool.stats();
assert_eq!(stats.peak_in_use, 3);
drop(b1);
drop(b2);
let stats = pool.stats();
assert_eq!(stats.in_use, 1);
assert_eq!(stats.peak_in_use, 3);
drop(b3);
let stats = pool.stats();
assert_eq!(stats.in_use, 0);
assert_eq!(stats.peak_in_use, 3); }
#[test]
fn test_buffer_cleared_on_acquire() {
let pool = BufferPool::new(1024, 5);
{
let mut buffer = pool.acquire();
buffer.extend_from_slice(b"old data");
}
{
let buffer = pool.acquire();
assert_eq!(buffer.len(), 0);
}
}
#[test]
fn test_concurrent_threads() {
use std::thread;
let pool = Arc::new(BufferPool::new(1024, 10));
let mut handles = vec![];
for _ in 0..5 {
let pool = pool.clone();
handles.push(thread::spawn(move || {
for _ in 0..10 {
let mut buffer = pool.acquire();
buffer.extend_from_slice(b"test");
assert!(buffer.len() > 0);
}
}));
}
for handle in handles {
handle.join().unwrap();
}
let stats = pool.stats();
assert_eq!(stats.in_use, 0);
assert!(stats.allocations > 0);
assert!(stats.reuses > 0);
}
#[test]
fn test_reuse_rate() {
let pool = BufferPool::new(1024, 5);
{
let _buffers: Vec<_> = (0..5).map(|_| pool.acquire()).collect();
}
let stats = pool.stats();
assert_eq!(stats.allocations, 5);
assert_eq!(stats.pool_size, 5);
{
let _buffers: Vec<_> = (0..5).map(|_| pool.acquire()).collect();
}
let stats = pool.stats();
assert_eq!(stats.allocations, 5);
assert_eq!(stats.reuses, 5);
assert_eq!(stats.reuse_rate(), 0.5);
assert_eq!(stats.total_acquires(), 10);
}
#[test]
fn test_zero_max_pool_size() {
let pool = BufferPool::new(1024, 0);
{
let _buffer = pool.acquire();
}
let stats = pool.stats();
assert_eq!(stats.pool_size, 0);
assert_eq!(stats.allocations, 1);
}
}