use crate::config::PoolConfig;
use crate::error::Result;
use core::ops::{Deref, DerefMut};
#[cfg(not(feature = "parking_lot"))]
use std::sync::{Arc, Mutex};
#[cfg(feature = "parking_lot")]
use parking_lot::Mutex;
#[cfg(feature = "parking_lot")]
use std::sync::Arc;
pub struct ThreadSafeHandle<T: crate::traits::Poolable> {
pool: Arc<Mutex<crate::pool::GrowingPool<T>>>,
index: usize,
cached_ptr: *mut T,
}
impl<T: crate::traits::Poolable> Deref for ThreadSafeHandle<T> {
type Target = T;
#[inline]
fn deref(&self) -> &Self::Target {
unsafe { &*self.cached_ptr }
}
}
impl<T: crate::traits::Poolable> DerefMut for ThreadSafeHandle<T> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.cached_ptr }
}
}
impl<T: crate::traits::Poolable> Drop for ThreadSafeHandle<T> {
fn drop(&mut self) {
#[cfg(not(feature = "parking_lot"))]
let pool = self.pool.lock().unwrap();
#[cfg(feature = "parking_lot")]
let pool = self.pool.lock();
pool.return_to_pool(self.index);
}
}
unsafe impl<T: crate::traits::Poolable + Send> Send for ThreadSafeHandle<T> {}
pub struct ThreadSafePool<T> {
inner: Arc<Mutex<crate::pool::GrowingPool<T>>>,
}
impl<T: crate::traits::Poolable> ThreadSafePool<T> {
pub fn new(capacity: usize) -> Result<Self> {
let config = PoolConfig::builder().capacity(capacity).build()?;
Self::with_config(config)
}
pub fn with_config(config: PoolConfig<T>) -> Result<Self> {
let pool = crate::pool::GrowingPool::with_config(config)?;
Ok(Self {
inner: Arc::new(Mutex::new(pool)),
})
}
pub fn allocate(&self, value: T) -> Result<ThreadSafeHandle<T>> {
#[cfg(not(feature = "parking_lot"))]
let mut pool = self.inner.lock().unwrap();
#[cfg(feature = "parking_lot")]
let mut pool = self.inner.lock();
let index = pool.allocate_internal(value)?;
let cached_ptr = pool.get_mut(index) as *mut T;
Ok(ThreadSafeHandle {
pool: Arc::clone(&self.inner),
index,
cached_ptr,
})
}
pub fn capacity(&self) -> usize {
#[cfg(not(feature = "parking_lot"))]
let pool = self.inner.lock().unwrap();
#[cfg(feature = "parking_lot")]
let pool = self.inner.lock();
pool.capacity()
}
pub fn available(&self) -> usize {
#[cfg(not(feature = "parking_lot"))]
let pool = self.inner.lock().unwrap();
#[cfg(feature = "parking_lot")]
let pool = self.inner.lock();
pool.available()
}
pub fn allocated(&self) -> usize {
#[cfg(not(feature = "parking_lot"))]
let pool = self.inner.lock().unwrap();
#[cfg(feature = "parking_lot")]
let pool = self.inner.lock();
pool.allocated()
}
}
impl<T> Clone for ThreadSafePool<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
unsafe impl<T: Send> Send for ThreadSafePool<T> {}
unsafe impl<T: Send> Sync for ThreadSafePool<T> {}
#[cfg(feature = "lock-free")]
#[cfg_attr(docsrs, doc(cfg(feature = "lock-free")))]
pub struct LockFreePool<T> {
inner: Arc<crossbeam::queue::SegQueue<Box<T>>>,
capacity: std::sync::atomic::AtomicUsize,
}
#[cfg(feature = "lock-free")]
impl<T> LockFreePool<T> {
pub fn new(capacity: usize) -> Result<Self> {
Ok(Self {
inner: Arc::new(crossbeam::queue::SegQueue::new()),
capacity: std::sync::atomic::AtomicUsize::new(capacity),
})
}
pub fn with_initializer<F>(capacity: usize, mut init: F) -> Result<Self>
where
F: FnMut() -> T,
{
let pool = Self::new(capacity)?;
for _ in 0..capacity {
pool.inner.push(Box::new(init()));
}
Ok(pool)
}
pub fn try_allocate(&self) -> Option<Box<T>> {
self.inner.pop()
}
pub fn return_object(&self, object: Box<T>) {
self.inner.push(object);
}
}
#[cfg(feature = "lock-free")]
impl<T> Clone for LockFreePool<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
capacity: std::sync::atomic::AtomicUsize::new(
self.capacity.load(std::sync::atomic::Ordering::Relaxed),
),
}
}
}
#[cfg(feature = "lock-free")]
unsafe impl<T: Send> Send for LockFreePool<T> {}
#[cfg(feature = "lock-free")]
unsafe impl<T: Send> Sync for LockFreePool<T> {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn thread_safe_pool_basic() {
let pool = ThreadSafePool::<i32>::new(10).unwrap();
let handle = pool.allocate(42).unwrap();
assert_eq!(*handle, 42);
}
#[test]
fn thread_safe_pool_concurrent() {
use std::thread;
let pool = Arc::new(ThreadSafePool::<i32>::new(100).unwrap());
let mut handles = vec![];
for i in 0..4 {
let pool_clone = Arc::clone(&pool);
handles.push(thread::spawn(move || {
let _h = pool_clone.allocate(i).unwrap();
}));
}
for handle in handles {
handle.join().unwrap();
}
}
#[cfg(feature = "lock-free")]
#[test]
fn lock_free_pool_basic() {
let pool = LockFreePool::<i32>::with_initializer(10, || 0).unwrap();
let obj = pool.try_allocate();
assert!(obj.is_some());
pool.return_object(obj.unwrap());
}
}