mod buffer;
mod config;
mod pool;
mod tls;
mod utils;
pub use buffer::PooledBuffer;
pub use config::{Builder, EvictionPolicy};
pub use pool::BufferPool;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_pool_operations() {
let pool = BufferPool::new();
let buf = pool.get(1024);
assert_eq!(buf.len(), 1024);
drop(buf);
let buf2 = pool.get(1024);
assert_eq!(buf2.len(), 1024);
}
#[test]
fn test_buffer_sizing() {
let pool = BufferPool::builder().min_buffer_size(0).build();
let buf = pool.get(2048);
assert_eq!(buf.len(), 2048);
drop(buf);
let buf2 = pool.get(1024);
assert_eq!(buf2.len(), 1024);
assert!(buf2.capacity() >= 2048);
}
#[test]
fn test_min_size_filtering() {
use std::thread;
let pool = BufferPool::builder()
.min_buffer_size(1024 * 1024)
.max_buffers_per_shard(16)
.build();
let tls_cache_size = pool.config.tls_cache_size;
let pool_clone = pool.clone();
thread::spawn(move || {
for _ in 0..tls_cache_size {
let buf = pool_clone.get(512);
drop(buf); }
let small_buf = pool_clone.get(512);
drop(small_buf); })
.join()
.unwrap();
assert_eq!(pool.len(), 0);
let pool_clone = pool.clone();
thread::spawn(move || {
let mut buffers = Vec::new();
for _ in 0..=tls_cache_size {
buffers.push(pool_clone.get(2 * 1024 * 1024));
}
for buf in buffers {
drop(buf); }
})
.join()
.unwrap();
assert_eq!(pool.len(), 1);
}
#[test]
fn test_max_pool_size() {
let pool = BufferPool::builder().min_buffer_size(0).max_buffers_per_shard(2).build();
let tls_cache_size = pool.config.tls_cache_size;
let num_shards = pool.config.num_shards;
let mut buffers = Vec::new();
for _ in 0..(tls_cache_size + num_shards * 3) {
buffers.push(pool.get(1024));
}
for buf in buffers {
drop(buf); }
assert!(pool.len() <= num_shards * 2);
for shard in pool.shards.iter() {
assert!(shard.buffers.lock().len() <= 2);
}
}
#[test]
fn test_thread_local_cache() {
let pool = BufferPool::new();
let cache_size = pool.config.tls_cache_size;
for _ in 0..cache_size {
let buf = pool.get(1024);
drop(buf); }
assert_eq!(pool.len(), 0);
for _ in 0..cache_size {
let buf = pool.get(1024);
assert_eq!(buf.len(), 1024);
}
assert_eq!(pool.len(), 0);
}
#[test]
fn test_builder_api() {
let pool1 = BufferPool::builder().build();
assert!(!pool1.is_empty() || pool1.is_empty());
let pool2 = BufferPool::builder()
.min_buffer_size(256 * 1024)
.num_shards(8)
.tls_cache_size(4)
.max_buffers_per_shard(16)
.build();
assert_eq!(pool2.config.min_buffer_size, 256 * 1024);
assert_eq!(pool2.config.num_shards, 8);
assert_eq!(pool2.config.tls_cache_size, 4);
assert_eq!(pool2.config.max_buffers_per_shard, 16);
let buf = pool2.get(512 * 1024);
assert_eq!(buf.len(), 512 * 1024);
drop(buf); }
#[test]
fn test_concurrent_access() {
use std::thread;
let pool = BufferPool::new();
let mut handles = vec![];
for _ in 0..8 {
let pool = pool.clone();
handles.push(thread::spawn(move || {
for _ in 0..100 {
let buf = pool.get(4096);
assert_eq!(buf.len(), 4096);
drop(buf); }
}));
}
for handle in handles {
handle.join().unwrap();
}
assert!(pool.len() < 1000); }
#[test]
fn test_clone_shares_state() {
let pool = BufferPool::builder().min_buffer_size(0).tls_cache_size(2).build();
let buf1 = pool.get(1024);
let buf2 = pool.get(1024);
drop(buf1); drop(buf2);
let pool_clone = pool.clone();
let buf3 = pool_clone.get(2048);
let buf4 = pool_clone.get(2048);
let buf5 = pool_clone.get(2048);
drop(buf3); drop(buf4); drop(buf5);
assert!(!pool.is_empty());
}
#[test]
fn test_preallocate() {
let pool = BufferPool::builder().min_buffer_size(512 * 1024).num_shards(4).build();
let initial_len = pool.len();
pool.preallocate(10, 1024 * 1024);
assert!(pool.len() > initial_len);
let buf = pool.get(1024 * 1024);
assert!(buf.capacity() >= 1024 * 1024);
}
#[test]
fn test_edge_cases() {
use std::thread;
let pool = BufferPool::builder().tls_cache_size(2).min_buffer_size(0).build();
assert!(pool.is_empty());
let buf_zero = pool.get(0);
assert_eq!(buf_zero.len(), 0);
drop(buf_zero);
let pool_clone = pool.clone();
thread::spawn(move || {
let b1 = pool_clone.get(1024);
let b2 = pool_clone.get(1024);
drop(b1); drop(b2);
let buf_large = pool_clone.get(100 * 1024 * 1024); assert_eq!(buf_large.len(), 100 * 1024 * 1024);
drop(buf_large); })
.join()
.unwrap();
assert!(!pool.is_empty());
}
#[test]
fn test_shard_distribution() {
use std::thread;
let pool = BufferPool::builder().num_shards(4).min_buffer_size(0).tls_cache_size(2).build();
let mut handles = vec![];
for _ in 0..4 {
let pool_clone = pool.clone();
handles.push(thread::spawn(move || {
let mut buffers = vec![];
for _ in 0..5 {
buffers.push(pool_clone.get(1024));
}
for buf in buffers {
drop(buf); }
}));
}
for handle in handles {
handle.join().unwrap();
}
let mut non_empty_shards = 0;
for shard in pool.shards.iter() {
if !shard.buffers.lock().is_empty() {
non_empty_shards += 1;
}
}
assert!(non_empty_shards >= 2);
}
#[test]
fn test_eviction_policy_hot_buffers() {
let pool = BufferPool::builder()
.eviction_policy(EvictionPolicy::ClockPro)
.max_buffers_per_shard(5)
.num_shards(1)
.tls_cache_size(1) .build();
let sizes: Vec<usize> = (1..=10).map(|i| i * 1024).collect();
for &size in &sizes[0..5] {
let buf = vec![0u8; size];
drop(buf); }
for _ in 0..10 {
for &size in &sizes[0..3] {
let buf = pool.get(size);
drop(buf); }
}
for &size in &sizes[5..10] {
let buf = vec![0u8; size];
drop(buf); }
for &size in &sizes[0..3] {
let buf = pool.get(size);
assert_eq!(buf.capacity(), size, "Hot buffer was evicted!");
drop(buf); }
}
#[test]
fn test_clear() {
let pool = BufferPool::builder().min_buffer_size(0).tls_cache_size(2).build();
let mut buffers = vec![];
for _ in 0..10 {
buffers.push(pool.get(1024));
}
for buf in buffers {
drop(buf); }
assert!(!pool.is_empty());
pool.clear();
assert_eq!(pool.len(), 0);
assert!(pool.is_empty());
let buf = pool.get(1024);
assert_eq!(buf.len(), 1024);
drop(buf); }
}