use parking_lot::Mutex;
use std::sync::Arc;
#[cfg(feature = "pool-metrics")]
use std::sync::atomic::{AtomicUsize, Ordering};
#[cfg(feature = "pool-metrics")]
#[derive(Debug)]
pub struct PoolMetrics {
pub total_acquires: AtomicUsize,
pub total_cache_hits: AtomicUsize,
pub peak_items_stored: AtomicUsize,
pub total_creations: AtomicUsize,
}
#[cfg(feature = "pool-metrics")]
impl PoolMetrics {
pub fn new() -> Self {
PoolMetrics {
total_acquires: AtomicUsize::new(0),
total_cache_hits: AtomicUsize::new(0),
peak_items_stored: AtomicUsize::new(0),
total_creations: AtomicUsize::new(0),
}
}
pub fn hit_rate(&self) -> f64 {
let acquires = self.total_acquires.load(Ordering::Relaxed);
if acquires == 0 {
return 0.0;
}
(self.total_cache_hits.load(Ordering::Relaxed) as f64 / acquires as f64) * 100.0
}
pub fn snapshot(&self) -> PoolMetricsSnapshot {
PoolMetricsSnapshot {
total_acquires: self.total_acquires.load(Ordering::Relaxed),
total_cache_hits: self.total_cache_hits.load(Ordering::Relaxed),
peak_items_stored: self.peak_items_stored.load(Ordering::Relaxed),
total_creations: self.total_creations.load(Ordering::Relaxed),
}
}
pub fn reset(&self) {
self.total_acquires.store(0, Ordering::Relaxed);
self.total_cache_hits.store(0, Ordering::Relaxed);
self.peak_items_stored.store(0, Ordering::Relaxed);
self.total_creations.store(0, Ordering::Relaxed);
}
}
#[cfg(feature = "pool-metrics")]
impl Default for PoolMetrics {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "pool-metrics")]
#[derive(Debug, Clone, Copy)]
pub struct PoolMetricsSnapshot {
pub total_acquires: usize,
pub total_cache_hits: usize,
pub peak_items_stored: usize,
pub total_creations: usize,
}
#[derive(Clone)]
pub struct Pool<T: Recyclable> {
factory: Arc<dyn Fn() -> T + Send + Sync>,
objects: Arc<Mutex<Vec<T>>>,
max_size: usize,
#[cfg(feature = "pool-metrics")]
metrics: Arc<PoolMetrics>,
}
pub trait Recyclable: Send + 'static {
fn reset(&mut self);
}
impl<T: Recyclable> Pool<T> {
pub fn new<F>(factory: F, max_size: usize) -> Self
where
F: Fn() -> T + Send + Sync + 'static,
{
Pool {
factory: Arc::new(factory),
objects: Arc::new(Mutex::new(Vec::with_capacity(max_size))),
max_size,
#[cfg(feature = "pool-metrics")]
metrics: Arc::new(PoolMetrics::new()),
}
}
pub fn acquire(&self) -> Result<PoolGuard<T>, PoolError> {
#[cfg(feature = "pool-metrics")]
self.metrics.total_acquires.fetch_add(1, Ordering::Relaxed);
let mut objects = self.objects.lock();
let object = if let Some(mut obj) = objects.pop() {
#[cfg(feature = "pool-metrics")]
self.metrics.total_cache_hits.fetch_add(1, Ordering::Relaxed);
obj.reset();
obj
} else {
#[cfg(feature = "pool-metrics")]
self.metrics.total_creations.fetch_add(1, Ordering::Relaxed);
(self.factory)()
};
Ok(PoolGuard {
object: Some(object),
pool: Pool {
factory: Arc::clone(&self.factory),
objects: Arc::clone(&self.objects),
max_size: self.max_size,
#[cfg(feature = "pool-metrics")]
metrics: Arc::clone(&self.metrics),
},
})
}
pub fn size(&self) -> usize {
self.objects.lock().len()
}
pub fn clear(&self) -> Result<(), PoolError> {
self.objects.lock().clear();
Ok(())
}
#[cfg(feature = "pool-metrics")]
pub fn metrics(&self) -> &PoolMetrics {
&self.metrics
}
}
pub struct PoolGuard<T: Recyclable> {
object: Option<T>,
pool: Pool<T>,
}
impl<T: Recyclable> std::ops::Deref for PoolGuard<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.object.as_ref().expect("object should never be None")
}
}
impl<T: Recyclable> std::ops::DerefMut for PoolGuard<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.object.as_mut().expect("object should never be None")
}
}
impl<T: Recyclable> Drop for PoolGuard<T> {
fn drop(&mut self) {
if let Some(object) = self.object.take() {
let mut objects = self.pool.objects.lock();
if objects.len() < self.pool.max_size {
objects.push(object);
#[cfg(feature = "pool-metrics")]
{
let current_size = objects.len();
let peak = self.pool.metrics.peak_items_stored.load(Ordering::Relaxed);
if current_size > peak {
self.pool
.metrics
.peak_items_stored
.store(current_size, Ordering::Relaxed);
}
}
}
}
}
}
impl Recyclable for String {
fn reset(&mut self) {
self.clear();
}
}
impl Recyclable for Vec<u8> {
fn reset(&mut self) {
self.clear();
}
}
#[derive(Debug, Clone)]
pub enum PoolError {
LockPoisoned,
}
impl std::fmt::Display for PoolError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PoolError::LockPoisoned => write!(f, "pool lock poisoned"),
}
}
}
impl std::error::Error for PoolError {}
pub type StringBufferPool = Pool<String>;
pub type ByteBufferPool = Pool<Vec<u8>>;
pub fn create_string_buffer_pool(pool_size: usize, buffer_capacity: usize) -> StringBufferPool {
Pool::new(move || String::with_capacity(buffer_capacity), pool_size)
}
pub fn create_byte_buffer_pool(pool_size: usize, buffer_capacity: usize) -> ByteBufferPool {
Pool::new(move || Vec::with_capacity(buffer_capacity), pool_size)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pool_acquire_and_reuse() {
let pool = Pool::new(String::new, 5);
{
let mut s1 = pool.acquire().unwrap();
s1.push_str("hello");
assert_eq!(s1.len(), 5);
}
{
let s2 = pool.acquire().unwrap();
assert_eq!(s2.len(), 0, "string should be cleared when reused");
}
}
#[test]
fn test_pool_respects_max_size() {
let pool = Pool::new(String::new, 2);
let guard1 = pool.acquire().unwrap();
let guard2 = pool.acquire().unwrap();
let guard3 = pool.acquire().unwrap();
drop(guard1);
drop(guard2);
drop(guard3);
assert!(pool.size() <= 2, "pool size should not exceed max_size");
}
#[test]
fn test_pool_clear() {
let pool = Pool::new(String::new, 5);
let _g1 = pool.acquire().unwrap();
let _g2 = pool.acquire().unwrap();
drop(_g1);
drop(_g2);
assert!(pool.size() > 0, "pool should have items");
pool.clear().unwrap();
assert_eq!(pool.size(), 0, "pool should be empty after clear");
}
#[test]
fn test_byte_buffer_pool() {
let pool = Pool::new(Vec::new, 5);
{
let mut buf = pool.acquire().unwrap();
buf.extend_from_slice(b"hello");
assert_eq!(buf.len(), 5);
}
{
let buf = pool.acquire().unwrap();
assert_eq!(buf.len(), 0, "buffer should be cleared");
}
}
#[test]
fn test_pool_deref() {
let pool = Pool::new(String::new, 5);
let mut guard = pool.acquire().unwrap();
let _len = guard.len();
guard.push_str("test");
assert_eq!(&*guard, "test");
}
#[test]
fn test_concurrent_access() {
use std::sync::Arc;
use std::thread;
let pool = Arc::new(Pool::new(String::new, 10));
let mut handles = vec![];
for i in 0..5 {
let pool_clone = Arc::clone(&pool);
let handle = thread::spawn(move || {
let mut buf = pool_clone.acquire().unwrap();
buf.push_str(&format!("thread_{}", i));
std::thread::sleep(std::time::Duration::from_millis(10));
drop(buf);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
assert!(pool.size() <= 10);
}
#[test]
#[cfg(feature = "pool-metrics")]
fn test_pool_metrics_tracking() {
let pool = Pool::new(String::new, 5);
{
let _s1 = pool.acquire().unwrap();
}
{
let _s2 = pool.acquire().unwrap();
}
let metrics = pool.metrics();
assert_eq!(metrics.total_acquires.load(Ordering::Relaxed), 2);
assert_eq!(metrics.total_cache_hits.load(Ordering::Relaxed), 1);
assert_eq!(metrics.total_creations.load(Ordering::Relaxed), 1);
let hit_rate = metrics.hit_rate();
assert!(hit_rate > 49.0 && hit_rate < 51.0);
}
#[test]
#[cfg(feature = "pool-metrics")]
fn test_pool_metrics_peak_tracking() {
let pool = Pool::new(String::new, 5);
let g1 = pool.acquire().unwrap();
let g2 = pool.acquire().unwrap();
let g3 = pool.acquire().unwrap();
drop(g1);
drop(g2);
drop(g3);
let metrics = pool.metrics();
assert!(metrics.peak_items_stored.load(Ordering::Relaxed) >= 3);
}
}