use super::config::PoolConfig;
use super::stats::PoolStats;
use crate::buffer::Buffer;
use std::sync::{Arc, Mutex};
pub(crate) struct PoolInner {
pub(crate) buffers: Vec<Buffer>,
pub(crate) config: PoolConfig,
pub(crate) total_allocated: usize,
pub(crate) total_acquired: usize,
pub(crate) total_returned: usize,
}
pub struct BufferPool {
pub(crate) inner: Arc<Mutex<PoolInner>>,
}
impl Default for BufferPool {
fn default() -> Self {
Self::new(PoolConfig::default())
}
}
impl BufferPool {
pub fn new(config: PoolConfig) -> Self {
let mut buffers = Vec::with_capacity(config.min_pool_size);
for _ in 0..config.min_pool_size {
buffers.push(Buffer::new(config.buffer_size));
}
Self {
inner: Arc::new(Mutex::new(PoolInner {
buffers,
config,
total_allocated: 0,
total_acquired: 0,
total_returned: 0,
})),
}
}
pub fn acquire(&self) -> PooledBuffer {
let mut inner = self.inner.lock().unwrap();
inner.total_acquired += 1;
let buffer = inner.buffers.pop().unwrap_or_else(|| {
inner.total_allocated += 1;
Buffer::new(inner.config.buffer_size)
});
PooledBuffer {
buffer: Some(buffer),
pool: Arc::clone(&self.inner),
}
}
pub fn available(&self) -> usize {
self.inner.lock().unwrap().buffers.len()
}
pub fn stats(&self) -> PoolStats {
let inner = self.inner.lock().unwrap();
PoolStats {
available: inner.buffers.len(),
total_allocated: inner.total_allocated,
total_acquired: inner.total_acquired,
total_returned: inner.total_returned,
buffer_size: inner.config.buffer_size,
max_pool_size: inner.config.max_pool_size,
}
}
pub fn shrink(&self) {
let mut inner = self.inner.lock().unwrap();
let min_size = inner.config.min_pool_size;
inner.buffers.truncate(min_size);
inner.buffers.shrink_to_fit();
}
pub fn clear(&self) {
let mut inner = self.inner.lock().unwrap();
inner.buffers.clear();
}
pub fn grow(&self, target_size: usize) {
let mut inner = self.inner.lock().unwrap();
let max_size = inner.config.max_pool_size;
let buffer_size = inner.config.buffer_size;
let target = target_size.min(max_size);
while inner.buffers.len() < target {
inner.buffers.push(Buffer::new(buffer_size));
}
}
}
pub struct PooledBuffer {
pub(crate) buffer: Option<Buffer>,
pub(crate) pool: Arc<Mutex<PoolInner>>,
}
impl PooledBuffer {
pub fn leak(mut self) -> Buffer {
self.buffer.take().unwrap()
}
pub fn drop_now(mut self) {
if let Some(mut buffer) = self.buffer.take() {
buffer.burn();
drop(buffer);
}
}
#[inline]
pub fn capacity(&self) -> usize {
self.buffer.as_ref().unwrap().capacity()
}
}
impl std::ops::Deref for PooledBuffer {
type Target = Buffer;
fn deref(&self) -> &Self::Target {
self.buffer.as_ref().unwrap()
}
}
impl std::ops::DerefMut for PooledBuffer {
fn deref_mut(&mut self) -> &mut Self::Target {
self.buffer.as_mut().unwrap()
}
}
impl Drop for PooledBuffer {
fn drop(&mut self) {
if let Some(mut buffer) = self.buffer.take() {
buffer.burn();
let mut inner = self.pool.lock().unwrap();
inner.total_returned += 1;
if inner.buffers.len() < inner.config.max_pool_size {
inner.buffers.push(buffer);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pool_basic() {
let pool = BufferPool::new(PoolConfig {
buffer_size: 1024,
max_pool_size: 10,
min_pool_size: 2,
});
assert_eq!(pool.available(), 2);
let _buf = pool.acquire();
assert_eq!(pool.stats().total_acquired, 1);
}
#[test]
fn test_returned_buffer_is_clean() {
let pool = BufferPool::new(PoolConfig {
buffer_size: 64,
max_pool_size: 2,
min_pool_size: 0,
});
{
let mut buf = pool.acquire();
buf.put_bytes(&[0xFF; 16]).unwrap();
}
let mut buf2 = pool.acquire();
buf2.set_len(16).unwrap();
assert_eq!(&buf2.as_slice()[..16], &[0u8; 16]);
}
#[test]
fn test_drop_now() {
let pool = BufferPool::new(PoolConfig {
buffer_size: 1024,
max_pool_size: 10,
min_pool_size: 2,
});
let initial_available = pool.available();
{
let mut buf = pool.acquire();
buf.put_u32(12345).unwrap();
buf.drop_now();
}
assert_eq!(pool.available(), initial_available - 1);
}
#[test]
fn test_leak() {
let pool = BufferPool::new(PoolConfig {
buffer_size: 1024,
max_pool_size: 10,
min_pool_size: 2,
});
let pooled = pool.acquire();
let _owned = pooled.leak();
assert_eq!(pool.stats().total_acquired, 1);
}
#[test]
fn test_normal_return() {
let pool = BufferPool::new(PoolConfig {
buffer_size: 1024,
max_pool_size: 10,
min_pool_size: 2,
});
let initial = pool.available();
{ let _buf = pool.acquire(); } assert_eq!(pool.available(), initial);
}
#[test]
fn test_grow_shrink() {
let pool = BufferPool::new(PoolConfig {
buffer_size: 64,
max_pool_size: 20,
min_pool_size: 0,
});
pool.grow(10);
assert_eq!(pool.available(), 10);
pool.shrink();
assert_eq!(pool.available(), 0);
}
}