use super::config::PoolConfig;
use super::stats::FastPoolStats;
use crate::buffer::Buffer;
use std::cell::RefCell;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
struct LockFreeQueue<T> {
items: crossbeam::queue::SegQueue<T>,
size: AtomicUsize,
}
impl<T> LockFreeQueue<T> {
fn new() -> Self {
Self {
items: crossbeam::queue::SegQueue::new(),
size: AtomicUsize::new(0),
}
}
#[inline]
fn push(&self, item: T) {
self.items.push(item);
self.size.fetch_add(1, Ordering::Relaxed);
}
#[inline]
fn pop(&self) -> Option<T> {
self.items.pop().inspect(|_| {
self.size.fetch_sub(1, Ordering::Relaxed);
})
}
#[inline]
fn len(&self) -> usize {
self.size.load(Ordering::Relaxed)
}
}
pub(crate) struct FastPoolStatsInner {
pub(crate) allocated: AtomicUsize,
pub(crate) acquired: AtomicUsize,
pub(crate) returned: AtomicUsize,
pub(crate) cache_hits: AtomicUsize,
}
impl FastPoolStatsInner {
pub(crate) fn new() -> Self {
Self {
allocated: AtomicUsize::new(0),
acquired: AtomicUsize::new(0),
returned: AtomicUsize::new(0),
cache_hits: AtomicUsize::new(0),
}
}
}
const THREAD_CACHE_CAPACITY: usize = 16;
thread_local! {
static THREAD_CACHE: RefCell<Vec<Buffer>> =
RefCell::new(Vec::with_capacity(THREAD_CACHE_CAPACITY));
}
pub struct FastBufferPool {
global_pool: Arc<LockFreeQueue<Buffer>>,
config: PoolConfig,
stats: Arc<FastPoolStatsInner>,
}
impl FastBufferPool {
pub fn new(config: PoolConfig) -> Self {
let queue = Arc::new(LockFreeQueue::new());
for _ in 0..config.min_pool_size {
queue.push(Buffer::new(config.buffer_size));
}
Self {
global_pool: queue,
config,
stats: Arc::new(FastPoolStatsInner::new()),
}
}
}
impl Default for FastBufferPool {
fn default() -> Self {
Self::new(PoolConfig::default())
}
}
impl FastBufferPool {
#[inline]
pub fn acquire(&self) -> FastPooledBuffer {
self.stats.acquired.fetch_add(1, Ordering::Relaxed);
let buffer = THREAD_CACHE.with(|cache| {
let mut c = cache.borrow_mut();
if let Some(buf) = c.pop() {
self.stats.cache_hits.fetch_add(1, Ordering::Relaxed);
Some(buf)
} else {
None
}
});
if let Some(buf) = buffer {
return FastPooledBuffer {
buffer: Some(buf),
pool: Arc::clone(&self.global_pool),
config: self.config.clone(),
stats: Arc::clone(&self.stats),
};
}
if let Some(buf) = self.global_pool.pop() {
return FastPooledBuffer {
buffer: Some(buf),
pool: Arc::clone(&self.global_pool),
config: self.config.clone(),
stats: Arc::clone(&self.stats),
};
}
self.stats.allocated.fetch_add(1, Ordering::Relaxed);
FastPooledBuffer {
buffer: Some(Buffer::new(self.config.buffer_size)),
pool: Arc::clone(&self.global_pool),
config: self.config.clone(),
stats: Arc::clone(&self.stats),
}
}
#[inline]
pub fn available(&self) -> usize {
self.global_pool.len()
}
pub fn stats(&self) -> FastPoolStats {
FastPoolStats {
available: self.global_pool.len(),
allocated: self.stats.allocated.load(Ordering::Relaxed),
acquired: self.stats.acquired.load(Ordering::Relaxed),
returned: self.stats.returned.load(Ordering::Relaxed),
cache_hits: self.stats.cache_hits.load(Ordering::Relaxed),
thread_local_lost: 0,
}
}
pub fn clear(&self) {
while self.global_pool.pop().is_some() {}
}
pub fn warm(&self, target_size: usize) {
let target = target_size.min(self.config.max_pool_size);
let current = self.global_pool.len();
for _ in current..target {
self.global_pool.push(Buffer::new(self.config.buffer_size));
}
}
pub fn clear_thread_cache(&self) {
THREAD_CACHE.with(|cache| {
let mut c = cache.borrow_mut();
while let Some(mut buf) = c.pop() {
if self.global_pool.len() < self.config.max_pool_size {
self.global_pool.push(buf);
} else {
buf.burn();
drop(buf);
}
}
});
}
}
pub struct FastPooledBuffer {
buffer: Option<Buffer>,
pool: Arc<LockFreeQueue<Buffer>>,
config: PoolConfig,
stats: Arc<FastPoolStatsInner>,
}
impl FastPooledBuffer {
pub fn leak(mut self) -> Buffer {
self.buffer.take().unwrap()
}
pub fn drop_now(mut self) {
if let Some(mut buffer) = self.buffer.take() {
buffer.burn();
}
}
#[inline]
pub fn capacity(&self) -> usize {
self.buffer.as_ref().unwrap().capacity()
}
}
impl std::ops::Deref for FastPooledBuffer {
type Target = Buffer;
fn deref(&self) -> &Self::Target {
self.buffer.as_ref().unwrap()
}
}
impl std::ops::DerefMut for FastPooledBuffer {
fn deref_mut(&mut self) -> &mut Self::Target {
self.buffer.as_mut().unwrap()
}
}
impl Drop for FastPooledBuffer {
fn drop(&mut self) {
if let Some(mut buffer) = self.buffer.take() {
buffer.burn();
self.stats.returned.fetch_add(1, Ordering::Relaxed);
let mut buffer_opt = Some(buffer);
let _ = THREAD_CACHE.try_with(|cache| {
let mut c = cache.borrow_mut();
if c.len() < THREAD_CACHE_CAPACITY {
if let Some(buf) = buffer_opt.take() {
c.push(buf);
}
}
});
if let Some(buf) = buffer_opt {
if self.pool.len() < self.config.max_pool_size {
self.pool.push(buf);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fast_pool_basic() {
let pool = FastBufferPool::new(PoolConfig {
buffer_size: 1024,
max_pool_size: 10,
min_pool_size: 2,
});
let _buf = pool.acquire();
assert_eq!(pool.stats().acquired, 1);
}
#[test]
fn test_returned_buffer_is_clean() {
let pool = FastBufferPool::new(PoolConfig {
buffer_size: 64,
max_pool_size: 4,
min_pool_size: 0,
});
{
let mut buf = pool.acquire();
buf.put_bytes(&[0xAA; 32]).unwrap();
}
pool.clear_thread_cache();
let mut buf2 = pool.acquire();
buf2.set_len(32).unwrap();
assert_eq!(&buf2.as_slice()[..32], &[0u8; 32]);
}
#[test]
fn test_buffer_return_to_cache() {
let pool = FastBufferPool::new(PoolConfig {
buffer_size: 1024,
max_pool_size: 100,
min_pool_size: 10,
});
{ let _buf = pool.acquire(); }
assert_eq!(pool.stats().returned, 1);
}
#[test]
fn test_drop_now() {
let pool = FastBufferPool::new(PoolConfig {
buffer_size: 1024,
max_pool_size: 10,
min_pool_size: 2,
});
let mut buf = pool.acquire();
buf.put_u32(12345).unwrap();
buf.drop_now();
assert_eq!(pool.stats().acquired, 1);
}
#[test]
fn test_leak() {
let pool = FastBufferPool::new(PoolConfig {
buffer_size: 1024,
max_pool_size: 10,
min_pool_size: 2,
});
let pooled = pool.acquire();
let _owned = pooled.leak();
assert_eq!(pool.stats().acquired, 1);
}
#[test]
fn test_warm() {
let pool = FastBufferPool::new(PoolConfig {
buffer_size: 512,
max_pool_size: 20,
min_pool_size: 0,
});
pool.warm(10);
assert!(pool.available() <= 10);
}
#[test]
fn test_clear_thread_cache() {
let pool = FastBufferPool::new(PoolConfig {
buffer_size: 128,
max_pool_size: 20,
min_pool_size: 0,
});
for _ in 0..5 { let _b = pool.acquire(); }
pool.clear_thread_cache();
assert!(pool.available() > 0);
}
#[test]
fn test_multi_thread() {
use std::sync::Arc;
use std::thread;
let pool = Arc::new(FastBufferPool::new(PoolConfig {
buffer_size: 256,
max_pool_size: 64,
min_pool_size: 4,
}));
let handles: Vec<_> = (0..4)
.map(|_| {
let p = Arc::clone(&pool);
thread::spawn(move || {
for i in 0..100u32 {
let mut buf = p.acquire();
buf.put_u32(i).unwrap();
}
p.clear_thread_cache();
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let s = pool.stats();
assert_eq!(s.acquired, 400);
assert!(s.cache_hit_rate() >= 0.0);
}
}