use bytes::BytesMut;
use std::cell::RefCell;
use std::collections::VecDeque;
pub const DEFAULT_BUFFER_SIZE: usize = 64 * 1024;
pub const MAX_POOL_SIZE: usize = 16;
pub const MAX_POOLED_BUFFER_SIZE: usize = 256 * 1024;
thread_local! {
static BUFFER_POOL: RefCell<BufferPool> = RefCell::new(BufferPool::new());
}
struct BufferPool {
buffers: VecDeque<BytesMut>,
allocated: usize,
reused: usize,
dropped: usize,
}
impl BufferPool {
fn new() -> Self {
Self {
buffers: VecDeque::with_capacity(MAX_POOL_SIZE),
allocated: 0,
reused: 0,
dropped: 0,
}
}
fn get(&mut self, min_capacity: usize) -> BytesMut {
if let Some(idx) = self
.buffers
.iter()
.position(|b| b.capacity() >= min_capacity)
{
let mut buf = self.buffers.remove(idx).unwrap();
buf.clear();
self.reused += 1;
return buf;
}
if let Some(mut buf) = self.buffers.pop_front() {
buf.clear();
if min_capacity > buf.capacity() {
buf.reserve(min_capacity - buf.capacity());
}
self.reused += 1;
return buf;
}
self.allocated += 1;
BytesMut::with_capacity(min_capacity.max(DEFAULT_BUFFER_SIZE))
}
fn put(&mut self, buf: BytesMut) {
if buf.capacity() > MAX_POOLED_BUFFER_SIZE {
self.dropped += 1;
return;
}
if self.buffers.len() >= MAX_POOL_SIZE {
self.dropped += 1;
return;
}
self.buffers.push_back(buf);
}
}
pub struct PooledBuffer {
buffer: Option<BytesMut>,
}
impl PooledBuffer {
pub fn new(min_capacity: usize) -> Self {
let buffer = BUFFER_POOL.with(|pool| pool.borrow_mut().get(min_capacity));
Self {
buffer: Some(buffer),
}
}
pub fn default_size() -> Self {
Self::new(DEFAULT_BUFFER_SIZE)
}
#[inline]
#[allow(clippy::should_implement_trait)]
pub fn as_mut(&mut self) -> &mut BytesMut {
self.buffer.as_mut().expect("buffer already taken")
}
#[inline]
#[allow(clippy::should_implement_trait)]
pub fn as_ref(&self) -> &BytesMut {
self.buffer.as_ref().expect("buffer already taken")
}
pub fn take(mut self) -> BytesMut {
self.buffer.take().expect("buffer already taken")
}
#[inline]
pub fn len(&self) -> usize {
self.as_ref().len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.as_ref().is_empty()
}
#[inline]
pub fn capacity(&self) -> usize {
self.as_ref().capacity()
}
pub fn clear(&mut self) {
self.as_mut().clear();
}
}
impl Drop for PooledBuffer {
fn drop(&mut self) {
if let Some(buf) = self.buffer.take() {
BUFFER_POOL.with(|pool| pool.borrow_mut().put(buf));
}
}
}
impl std::ops::Deref for PooledBuffer {
type Target = BytesMut;
fn deref(&self) -> &Self::Target {
self.as_ref()
}
}
impl std::ops::DerefMut for PooledBuffer {
fn deref_mut(&mut self) -> &mut Self::Target {
self.as_mut()
}
}
impl AsRef<[u8]> for PooledBuffer {
fn as_ref(&self) -> &[u8] {
self.buffer.as_ref().expect("buffer already taken")
}
}
impl AsMut<[u8]> for PooledBuffer {
fn as_mut(&mut self) -> &mut [u8] {
self.buffer.as_mut().expect("buffer already taken")
}
}
pub fn pool_stats() -> PoolStats {
BUFFER_POOL.with(|pool| {
let pool = pool.borrow();
PoolStats {
pooled: pool.buffers.len(),
allocated: pool.allocated,
reused: pool.reused,
dropped: pool.dropped,
}
})
}
pub fn clear_pool() {
BUFFER_POOL.with(|pool| {
pool.borrow_mut().buffers.clear();
});
}
#[derive(Debug, Clone, Copy)]
pub struct PoolStats {
pub pooled: usize,
pub allocated: usize,
pub reused: usize,
pub dropped: usize,
}
impl PoolStats {
pub fn hit_rate(&self) -> f64 {
let total = self.allocated + self.reused;
if total == 0 {
0.0
} else {
self.reused as f64 / total as f64
}
}
}
#[inline]
pub fn acquire(min_capacity: usize) -> PooledBuffer {
PooledBuffer::new(min_capacity)
}
#[inline]
pub fn acquire_default() -> PooledBuffer {
PooledBuffer::default_size()
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::BufMut;
#[test]
fn test_pooled_buffer_basic() {
let mut buf = acquire(1024);
assert!(buf.capacity() >= 1024);
assert!(buf.is_empty());
buf.put_slice(b"hello");
assert_eq!(buf.len(), 5);
assert_eq!(&buf[..], b"hello");
}
#[test]
fn test_buffer_reuse() {
clear_pool();
{
let mut buf = acquire(1024);
buf.put_slice(b"test data");
}
let stats = pool_stats();
assert_eq!(stats.pooled, 1);
{
let buf = acquire(1024);
assert!(buf.capacity() >= 1024);
}
let stats = pool_stats();
assert!(stats.reused >= 1);
}
#[test]
fn test_large_buffer_not_pooled() {
clear_pool();
{
let mut buf = acquire(MAX_POOLED_BUFFER_SIZE + 1);
buf.put_slice(b"large data");
}
let stats = pool_stats();
assert_eq!(stats.dropped, 1);
}
#[test]
fn test_buffer_take() {
clear_pool();
let buf = acquire(1024);
let taken = buf.take();
assert!(!taken.is_empty() || taken.is_empty());
let stats = pool_stats();
assert_eq!(stats.pooled, 0);
}
#[test]
fn test_pool_stats() {
clear_pool();
let _buf1 = acquire(1024);
let _buf2 = acquire(2048);
let stats = pool_stats();
assert_eq!(stats.allocated, 2);
assert_eq!(stats.reused, 0);
assert_eq!(stats.pooled, 0);
drop(_buf1);
drop(_buf2);
let stats = pool_stats();
assert_eq!(stats.pooled, 2);
}
#[test]
fn test_hit_rate() {
let stats = PoolStats {
pooled: 5,
allocated: 10,
reused: 90,
dropped: 0,
};
assert!((stats.hit_rate() - 0.9).abs() < 0.01);
}
#[test]
fn test_pool_max_size() {
clear_pool();
let buffers: Vec<_> = (0..MAX_POOL_SIZE + 5).map(|_| acquire(1024)).collect();
drop(buffers);
let stats = pool_stats();
assert_eq!(stats.pooled, MAX_POOL_SIZE);
assert!(stats.dropped >= 5);
}
}