use std::collections::VecDeque;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::sync::{Condvar, Mutex};
use tokio::sync::Notify;
pub trait Returnable: Send + Sync + 'static {
fn on_return(&mut self) {}
}
pub trait ReturnHandle<T: Returnable>: Send + Sync + 'static {
fn return_to_pool(&self, value: PoolValue<T>);
}
pub enum PoolValue<T: Returnable> {
Boxed(Box<T>),
Direct(T),
}
impl<T: Returnable> PoolValue<T> {
pub fn from_boxed(value: Box<T>) -> Self {
PoolValue::Boxed(value)
}
pub fn from_direct(value: T) -> Self {
PoolValue::Direct(value)
}
pub fn get(&self) -> &T {
match self {
PoolValue::Boxed(boxed) => boxed.as_ref(),
PoolValue::Direct(direct) => direct,
}
}
pub fn get_mut(&mut self) -> &mut T {
match self {
PoolValue::Boxed(boxed) => boxed.as_mut(),
PoolValue::Direct(direct) => direct,
}
}
pub fn on_return(&mut self) {
self.get_mut().on_return();
}
}
impl<T: Returnable> Deref for PoolValue<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.get()
}
}
impl<T: Returnable> DerefMut for PoolValue<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.get_mut()
}
}
mod private {
#[derive(Clone, Copy)]
pub struct PoolItemToken(());
impl PoolItemToken {
pub(super) fn new() -> Self {
PoolItemToken(())
}
}
}
pub trait PoolExt<T: Returnable>: Send + Sync + 'static {
fn create_pool_item(
&self,
value: PoolValue<T>,
handle: Arc<dyn ReturnHandle<T>>,
) -> PoolItem<T> {
PoolItem::new(value, handle)
}
}
pub struct PoolItem<T: Returnable> {
value: Option<PoolValue<T>>,
handle: Arc<dyn ReturnHandle<T>>,
_token: private::PoolItemToken,
}
impl<T: Returnable> PoolItem<T> {
fn new(value: PoolValue<T>, handle: Arc<dyn ReturnHandle<T>>) -> Self {
Self {
value: Some(value),
handle,
_token: private::PoolItemToken::new(),
}
}
pub fn into_shared(self) -> SharedPoolItem<T> {
SharedPoolItem {
inner: Arc::new(self),
}
}
pub fn has_value(&self) -> bool {
self.value.is_some()
}
}
impl<T: Returnable> Deref for PoolItem<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.value.as_ref().unwrap().get()
}
}
impl<T: Returnable> DerefMut for PoolItem<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.value.as_mut().unwrap().get_mut()
}
}
impl<T: Returnable> Drop for PoolItem<T> {
fn drop(&mut self) {
if let Some(mut value) = self.value.take() {
value.on_return();
self.handle.return_to_pool(value);
}
}
}
pub struct SharedPoolItem<T: Returnable> {
inner: Arc<PoolItem<T>>,
}
impl<T: Returnable> Clone for SharedPoolItem<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T: Returnable> SharedPoolItem<T> {
pub fn get(&self) -> &T {
self.inner.value.as_ref().unwrap().get()
}
pub fn strong_count(&self) -> usize {
Arc::strong_count(&self.inner)
}
}
impl<T: Returnable> Deref for SharedPoolItem<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.inner.value.as_ref().unwrap().get()
}
}
pub struct Pool<T: Returnable> {
state: Arc<PoolState<T>>,
capacity: usize,
}
struct PoolState<T: Returnable> {
pool: Arc<Mutex<VecDeque<PoolValue<T>>>>,
available: Arc<Notify>,
}
impl<T: Returnable> ReturnHandle<T> for PoolState<T> {
fn return_to_pool(&self, value: PoolValue<T>) {
let mut pool = self.pool.lock().unwrap();
pool.push_back(value);
self.available.notify_one();
}
}
impl<T: Returnable> Pool<T> {
pub fn new(initial_elements: Vec<PoolValue<T>>) -> Self {
let capacity = initial_elements.len();
let pool = initial_elements
.into_iter()
.collect::<VecDeque<PoolValue<T>>>();
let state = Arc::new(PoolState {
pool: Arc::new(Mutex::new(pool)),
available: Arc::new(Notify::new()),
});
Self { state, capacity }
}
pub fn new_boxed(initial_elements: Vec<Box<T>>) -> Self {
let initial_values = initial_elements
.into_iter()
.map(PoolValue::from_boxed)
.collect();
Self::new(initial_values)
}
pub fn new_direct(initial_elements: Vec<T>) -> Self {
let initial_values = initial_elements
.into_iter()
.map(PoolValue::from_direct)
.collect();
Self::new(initial_values)
}
async fn try_acquire(&self) -> Option<PoolItem<T>> {
let mut pool = self.state.pool.lock().unwrap();
pool.pop_front()
.map(|value| PoolItem::new(value, self.state.clone()))
}
async fn acquire(&self) -> PoolItem<T> {
loop {
if let Some(guard) = self.try_acquire().await {
return guard;
}
self.state.available.notified().await;
}
}
fn notify_return(&self) {
self.state.available.notify_one();
}
fn capacity(&self) -> usize {
self.capacity
}
}
impl<T: Returnable> PoolExt<T> for Pool<T> {}
impl<T: Returnable> Clone for Pool<T> {
fn clone(&self) -> Self {
Self {
state: self.state.clone(),
capacity: self.capacity,
}
}
}
pub struct SyncPool<T: Returnable> {
state: Arc<SyncPoolState<T>>,
capacity: usize,
}
struct SyncPoolState<T: Returnable> {
pool: Mutex<VecDeque<PoolValue<T>>>,
available: Condvar,
}
impl<T: Returnable> SyncPool<T> {
pub fn new(initial_elements: Vec<PoolValue<T>>) -> Self {
let capacity = initial_elements.len();
let pool = initial_elements
.into_iter()
.collect::<VecDeque<PoolValue<T>>>();
let state = Arc::new(SyncPoolState {
pool: Mutex::new(pool),
available: Condvar::new(),
});
Self { state, capacity }
}
pub fn new_direct(initial_elements: Vec<T>) -> Self {
let initial_values = initial_elements
.into_iter()
.map(PoolValue::from_direct)
.collect();
Self::new(initial_values)
}
pub fn try_acquire(&self) -> Option<SyncPoolItem<T>> {
let mut pool = self.state.pool.lock().unwrap();
pool.pop_front()
.map(|value| SyncPoolItem::new(value, self.state.clone()))
}
pub fn acquire_blocking(&self) -> SyncPoolItem<T> {
let mut pool = self.state.pool.lock().unwrap();
while pool.is_empty() {
tracing::debug!("SyncPool: waiting for available resource (pool empty)");
pool = self.state.available.wait(pool).unwrap();
tracing::debug!(
"SyncPool: woke up, checking pool again (size: {})",
pool.len()
);
}
let value = pool.pop_front().unwrap();
tracing::debug!("SyncPool: acquired resource, pool size now: {}", pool.len());
SyncPoolItem::new(value, self.state.clone())
}
pub fn capacity(&self) -> usize {
self.capacity
}
}
impl<T: Returnable> Clone for SyncPool<T> {
fn clone(&self) -> Self {
Self {
state: self.state.clone(),
capacity: self.capacity,
}
}
}
pub struct SyncPoolItem<T: Returnable> {
value: Option<PoolValue<T>>,
state: Arc<SyncPoolState<T>>,
}
impl<T: Returnable> SyncPoolItem<T> {
fn new(value: PoolValue<T>, state: Arc<SyncPoolState<T>>) -> Self {
Self {
value: Some(value),
state,
}
}
}
impl<T: Returnable> Deref for SyncPoolItem<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.value.as_ref().unwrap().get()
}
}
impl<T: Returnable> DerefMut for SyncPoolItem<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.value.as_mut().unwrap().get_mut()
}
}
impl<T: Returnable> Drop for SyncPoolItem<T> {
fn drop(&mut self) {
if let Some(mut value) = self.value.take() {
value.on_return();
let mut pool = self.state.pool.lock().unwrap();
pool.push_back(value);
tracing::debug!(
"SyncPool: returned resource, pool size now: {}, notifying waiters",
pool.len()
);
self.state.available.notify_one();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use tokio::time::{Duration, timeout};
impl Returnable for u32 {
fn on_return(&mut self) {
*self = 0;
tracing::debug!("Resetting u32 to 0");
}
}
#[tokio::test]
async fn test_acquire_release() {
let initial_elements = vec![
PoolValue::Direct(1),
PoolValue::Direct(2),
PoolValue::Direct(3),
PoolValue::Direct(4),
PoolValue::Direct(5),
];
let pool = Pool::new(initial_elements);
if let Some(mut item) = pool.try_acquire().await {
assert_eq!(*item, 1);
*item += 10;
assert_eq!(*item, 11);
}
let mut values = Vec::new();
let mut items = Vec::new();
while let Some(item) = pool.try_acquire().await {
values.push(*item);
items.push(item);
}
assert_eq!(values, vec![2, 3, 4, 5, 0]);
let pool_clone = pool.clone();
let task = tokio::spawn(async move {
let first_acquired = pool_clone.acquire().await;
assert_eq!(*first_acquired, 0);
});
timeout(Duration::from_secs(1), task)
.await
.expect_err("Expected timeout");
items.clear();
let pool_clone = pool.clone();
let task = tokio::spawn(async move {
let first_acquired = pool_clone.acquire().await;
assert_eq!(*first_acquired, 0);
});
timeout(Duration::from_secs(1), task)
.await
.expect("Task did not complete in time")
.unwrap();
}
#[tokio::test]
async fn test_shared_items() {
let initial_elements = vec![
PoolValue::Direct(1),
];
let pool = Pool::new(initial_elements);
let mut item = pool.acquire().await;
*item += 10; let shared = item.into_shared();
assert_eq!(*shared, 11);
let shared_clone = shared.clone();
assert_eq!(*shared_clone, 11);
drop(shared);
assert_eq!(*shared_clone, 11);
drop(shared_clone);
let item = pool.acquire().await;
assert_eq!(*item, 0); }
#[tokio::test]
async fn test_boxed_values() {
let initial_elements = vec![
PoolValue::Boxed(Box::new(1)),
];
let pool = Pool::new(initial_elements);
let mut item = pool.acquire().await;
assert_eq!(*item, 1);
*item += 10;
drop(item);
let item = pool.acquire().await;
assert_eq!(*item, 0);
}
#[tokio::test]
async fn test_pool_item_creation() {
let pool = Pool::new(vec![PoolValue::Direct(1)]);
let item = pool.acquire().await;
assert_eq!(*item, 1);
}
#[test]
fn test_sync_pool_basic_acquire_release() {
let initial_elements = vec![1u32, 2, 3];
let pool = SyncPool::new_direct(initial_elements);
let item1 = pool.try_acquire().unwrap();
assert_eq!(*item1, 1);
let item2 = pool.try_acquire().unwrap();
assert_eq!(*item2, 2);
let item3 = pool.try_acquire().unwrap();
assert_eq!(*item3, 3);
assert!(pool.try_acquire().is_none());
drop(item1); drop(item2); drop(item3);
let item = pool.try_acquire().unwrap();
assert_eq!(*item, 0); }
#[test]
fn test_sync_pool_blocking_acquire() {
let pool = SyncPool::new_direct(vec![42u32]);
let item = pool.acquire_blocking();
assert_eq!(*item, 42);
let pool_clone = pool.clone();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let handle = thread::spawn(move || {
counter_clone.store(1, Ordering::SeqCst); let waiting_item = pool_clone.acquire_blocking(); counter_clone.store(2, Ordering::SeqCst); assert_eq!(*waiting_item, 0); });
thread::sleep(Duration::from_millis(10));
assert_eq!(counter.load(Ordering::SeqCst), 1);
drop(item);
handle.join().unwrap();
assert_eq!(counter.load(Ordering::SeqCst), 2); }
#[test]
fn test_sync_pool_multiple_waiters() {
let pool = SyncPool::new_direct(vec![1u32]);
let item = pool.acquire_blocking();
let pool_clone1 = pool.clone();
let pool_clone2 = pool.clone();
let completed = Arc::new(AtomicUsize::new(0));
let completed1 = completed.clone();
let completed2 = completed.clone();
let handle1 = thread::spawn(move || {
let _item = pool_clone1.acquire_blocking(); completed1.fetch_add(1, Ordering::SeqCst); });
let handle2 = thread::spawn(move || {
let _item = pool_clone2.acquire_blocking(); completed2.fetch_add(1, Ordering::SeqCst); });
thread::sleep(Duration::from_millis(50));
assert_eq!(completed.load(Ordering::SeqCst), 0);
drop(item);
handle1.join().unwrap();
handle2.join().unwrap();
assert_eq!(completed.load(Ordering::SeqCst), 2);
}
#[test]
fn test_sync_vs_async_pool_compatibility() {
let async_pool = Pool::new_direct(vec![1u32, 2u32]);
let sync_pool = SyncPool::new_direct(vec![3u32, 4u32]);
let async_rt = tokio::runtime::Runtime::new().unwrap();
let async_item = async_rt.block_on(async { async_pool.acquire().await });
assert_eq!(*async_item, 1);
let sync_item = sync_pool.acquire_blocking();
assert_eq!(*sync_item, 3);
drop(async_item); drop(sync_item); }
#[test]
fn test_sync_pool_condvar_performance() {
let pool = SyncPool::new_direct((0..10).collect::<Vec<u32>>());
let start = std::time::Instant::now();
for _ in 0..1000 {
let item = pool.acquire_blocking();
let _ = *item + 1;
drop(item); }
let duration = start.elapsed();
println!("1000 sync pool operations took {:?}", duration);
assert!(duration < Duration::from_millis(200));
}
}