use std::sync::{Arc, Mutex, RwLock};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PressureConfig {
pub high_watermark_free: usize,
pub shrink_to_target: usize,
}
pub struct BufferPool {
free_buffers: RwLock<Vec<Arc<RwLock<Vec<u8>>>>>,
buffer_size: usize,
max_buffers: usize,
in_use_count: Mutex<usize>,
pressure_config: Mutex<Option<PressureConfig>>,
pressure_callback: Mutex<Option<Box<dyn Fn() + Send + Sync + 'static>>>,
}
impl std::fmt::Debug for BufferPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let free = self.free_buffers.read().map(|v| v.len()).unwrap_or(0);
let in_use = self.in_use_count.lock().map(|g| *g).unwrap_or(0);
f.debug_struct("BufferPool")
.field("buffer_size", &self.buffer_size)
.field("max_buffers", &self.max_buffers)
.field("free_count", &free)
.field("in_use_count", &in_use)
.finish()
}
}
impl BufferPool {
#[must_use]
pub fn new(count: usize, buffer_size: usize) -> Self {
let buffers: Vec<_> = (0..count)
.map(|_| Arc::new(RwLock::new(vec![0u8; buffer_size])))
.collect();
Self {
free_buffers: RwLock::new(buffers),
buffer_size,
max_buffers: count,
in_use_count: Mutex::new(0),
pressure_config: Mutex::new(None),
pressure_callback: Mutex::new(None),
}
}
#[must_use]
pub fn with_capacity(max_buffers: usize, buffer_size: usize) -> Self {
Self {
free_buffers: RwLock::new(Vec::with_capacity(max_buffers)),
buffer_size,
max_buffers,
in_use_count: Mutex::new(0),
pressure_config: Mutex::new(None),
pressure_callback: Mutex::new(None),
}
}
pub fn set_pressure_config(&mut self, config: PressureConfig) -> &mut Self {
if let Ok(mut guard) = self.pressure_config.lock() {
*guard = Some(config);
}
self
}
pub fn on_pressure<F>(&mut self, callback: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
if let Ok(mut guard) = self.pressure_callback.lock() {
*guard = Some(Box::new(callback));
}
self
}
#[must_use]
pub fn acquire(&self) -> Option<Arc<RwLock<Vec<u8>>>> {
let buffer = self.free_buffers.write().ok()?.pop()?;
if let Ok(mut guard) = self.in_use_count.lock() {
*guard = guard.saturating_add(1);
}
Some(buffer)
}
#[must_use]
pub fn acquire_or_alloc(&self) -> Arc<RwLock<Vec<u8>>> {
self.acquire().unwrap_or_else(|| {
if let Ok(mut guard) = self.in_use_count.lock() {
*guard = guard.saturating_add(1);
}
Arc::new(RwLock::new(vec![0u8; self.buffer_size]))
})
}
pub fn release(&self, buffer: Arc<RwLock<Vec<u8>>>) {
if let Ok(mut guard) = self.in_use_count.lock() {
*guard = guard.saturating_sub(1);
}
let returned = if let Ok(mut buffers) = self.free_buffers.write() {
if buffers.len() < self.max_buffers {
if let Ok(mut guard) = buffer.write() {
guard.fill(0);
}
buffers.push(buffer);
true
} else {
false
}
} else {
false
};
if returned {
self.watermark_check();
}
}
pub fn shrink_to(&self, target: usize) {
if let Ok(mut buffers) = self.free_buffers.write() {
while buffers.len() > target {
buffers.pop(); }
}
}
pub fn watermark_check(&self) {
let cfg = match self.pressure_config.lock().ok().and_then(|g| *g) {
Some(c) => c,
None => return,
};
let free_count = self.free_buffers.read().map(|v| v.len()).unwrap_or(0);
if free_count <= cfg.high_watermark_free {
return;
}
if let Ok(guard) = self.pressure_callback.lock() {
if let Some(cb) = guard.as_ref() {
cb();
}
}
self.shrink_to(cfg.shrink_to_target);
}
#[must_use]
pub fn available(&self) -> usize {
self.free_buffers.read().map(|b| b.len()).unwrap_or(0)
}
#[must_use]
pub fn in_use_count(&self) -> usize {
self.in_use_count.lock().map(|g| *g).unwrap_or(0)
}
#[must_use]
pub fn buffer_size(&self) -> usize {
self.buffer_size
}
#[must_use]
pub fn max_buffers(&self) -> usize {
self.max_buffers
}
}
impl Default for BufferPool {
fn default() -> Self {
Self::new(4, 4096)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new() {
let pool = BufferPool::new(4, 1024);
assert_eq!(pool.available(), 4);
assert_eq!(pool.buffer_size(), 1024);
assert_eq!(pool.max_buffers(), 4);
}
#[test]
fn test_with_capacity() {
let pool = BufferPool::with_capacity(8, 2048);
assert_eq!(pool.available(), 0);
assert_eq!(pool.buffer_size(), 2048);
assert_eq!(pool.max_buffers(), 8);
}
#[test]
fn test_acquire_release() {
let pool = BufferPool::new(2, 1024);
assert_eq!(pool.available(), 2);
let buf1 = pool.acquire().expect("acquire should succeed");
assert_eq!(pool.available(), 1);
let buf2 = pool.acquire().expect("acquire should succeed");
assert_eq!(pool.available(), 0);
assert!(pool.acquire().is_none());
pool.release(buf1);
assert_eq!(pool.available(), 1);
pool.release(buf2);
assert_eq!(pool.available(), 2);
}
#[test]
fn test_acquire_or_alloc() {
let pool = BufferPool::new(0, 1024);
assert_eq!(pool.available(), 0);
let buffer = pool.acquire_or_alloc();
assert_eq!(buffer.read().expect("read lock should succeed").len(), 1024);
}
#[test]
fn test_buffer_contents() {
let pool = BufferPool::new(1, 64);
let buffer = pool.acquire().expect("acquire should succeed");
{
let mut guard = buffer.write().expect("write lock should succeed");
guard[0] = 42;
guard[63] = 255;
}
{
let guard = buffer.read().expect("read lock should succeed");
assert_eq!(guard[0], 42);
assert_eq!(guard[63], 255);
}
pool.release(buffer);
let buffer = pool.acquire().expect("acquire should succeed");
{
let guard = buffer.read().expect("read lock should succeed");
assert_eq!(guard[0], 0);
assert_eq!(guard[63], 0);
}
}
#[test]
fn test_default() {
let pool = BufferPool::default();
assert_eq!(pool.available(), 4);
assert_eq!(pool.buffer_size(), 4096);
}
#[test]
fn test_release_at_capacity() {
let pool = BufferPool::new(2, 1024);
let extra_buffer = Arc::new(RwLock::new(vec![0u8; 1024]));
pool.release(extra_buffer);
assert_eq!(pool.available(), 2); }
}
#[cfg(test)]
mod buffer_pool_pressure_tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
fn pool_with_pressure(
count: usize,
buf_size: usize,
watermark: usize,
target: usize,
) -> BufferPool {
let mut pool = BufferPool::new(count, buf_size);
pool.set_pressure_config(PressureConfig {
high_watermark_free: watermark,
shrink_to_target: target,
});
pool
}
#[test]
fn test_pool_shrinks_to_target_on_pressure() {
let pool = pool_with_pressure(10, 64, 5, 3);
assert_eq!(pool.available(), 10);
pool.watermark_check();
assert_eq!(
pool.available(),
3,
"pool should shrink to target=3 when free_count=10 > watermark=5"
);
}
#[test]
fn test_pool_retains_in_use_buffers() {
let pool = BufferPool::with_capacity(20, 64);
let handles: Vec<_> = (0..5).map(|_| pool.acquire_or_alloc()).collect();
assert_eq!(pool.in_use_count(), 5);
for _ in 0..5 {
let buf = Arc::new(RwLock::new(vec![0u8; 64]));
if let Ok(mut v) = pool.free_buffers.write() {
v.push(buf);
}
}
assert_eq!(pool.available(), 5);
pool.shrink_to(0);
assert_eq!(pool.available(), 0, "all free buffers should be dropped");
assert_eq!(
pool.in_use_count(),
5,
"in-use buffers must not be reclaimed by shrink_to"
);
for h in handles {
pool.release(h);
}
assert_eq!(pool.in_use_count(), 0);
}
#[test]
fn test_watermark_auto_shrink_fires_above_threshold() {
let mut pool = BufferPool::with_capacity(20, 64);
pool.set_pressure_config(PressureConfig {
high_watermark_free: 5,
shrink_to_target: 3,
});
let handles: Vec<_> = (0..8).map(|_| pool.acquire_or_alloc()).collect();
assert_eq!(pool.in_use_count(), 8);
assert_eq!(pool.available(), 0);
for h in handles {
pool.release(h);
}
let final_free = pool.available();
assert!(
final_free <= 5,
"auto-shrink must keep free count ≤ watermark after all releases; got {final_free}"
);
}
#[test]
fn test_no_shrink_below_threshold() {
let mut pool = BufferPool::with_capacity(10, 64);
pool.set_pressure_config(PressureConfig {
high_watermark_free: 5,
shrink_to_target: 3,
});
let handles: Vec<_> = (0..3).map(|_| pool.acquire_or_alloc()).collect();
for h in handles {
pool.release(h);
}
assert_eq!(
pool.available(),
3,
"pool must not shrink when free count is below watermark"
);
}
#[test]
fn test_pressure_callback_fires_before_shrink() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
let mut pool = pool_with_pressure(10, 64, 5, 3);
pool.on_pressure(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
pool.watermark_check();
assert!(
counter.load(Ordering::SeqCst) >= 1,
"pressure callback must fire at least once"
);
assert_eq!(pool.available(), 3);
}
}