use dynamo_runtime::utils::pool::{PoolExt, PoolItem, PoolValue, ReturnHandle, Returnable};
use std::sync::{Arc, Mutex};
use tokio::sync::Notify;
pub struct IndexedPoolState<T: Returnable + Ord + Eq + PartialEq> {
pool: Arc<Mutex<Vec<PoolValue<T>>>>,
available: Arc<Notify>,
}
impl<T: Returnable + Ord + Eq + PartialEq> ReturnHandle<T> for IndexedPoolState<T> {
fn return_to_pool(&self, value: PoolValue<T>) {
let mut pool = self.pool.lock().unwrap();
pool.push(value);
pool.sort_by(|a, b| a.get().cmp(b.get()));
self.available.notify_one();
}
}
pub struct IndexedPool<T: Returnable + Ord + Eq + PartialEq> {
state: Arc<IndexedPoolState<T>>,
capacity: usize,
}
impl<T: Returnable + Ord + Eq + PartialEq> IndexedPool<T> {
pub fn new(mut initial_elements: Vec<PoolValue<T>>) -> Self {
let capacity = initial_elements.len();
initial_elements.sort_by(|a, b| a.get().cmp(b.get()));
let state = Arc::new(IndexedPoolState {
pool: Arc::new(Mutex::new(initial_elements)),
available: Arc::new(Notify::new()),
});
Self { state, capacity }
}
pub fn new_boxed(initial_elements: Vec<Box<T>>) -> Self {
let initial_values = initial_elements
.into_iter()
.map(PoolValue::from_boxed)
.collect();
Self::new(initial_values)
}
pub fn new_direct(initial_elements: Vec<T>) -> Self {
let initial_values = initial_elements
.into_iter()
.map(PoolValue::from_direct)
.collect();
Self::new(initial_values)
}
pub async fn get_contents(&self) -> Vec<T>
where
T: Clone,
{
let pool = self.state.pool.lock().unwrap();
pool.iter().map(|v| v.get().clone()).collect()
}
async fn try_acquire(&self) -> Option<PoolItem<T>> {
let mut pool = self.state.pool.lock().unwrap();
if pool.is_empty() {
return None;
}
let value = pool.remove(0);
Some(self.create_pool_item(value, self.state.clone()))
}
async fn acquire(&self) -> PoolItem<T> {
loop {
if let Some(guard) = self.try_acquire().await {
return guard;
}
self.state.available.notified().await;
}
}
}
impl<T: Returnable + Ord + Eq + PartialEq + Send + Sync + 'static> PoolExt<T> for IndexedPool<T> {}
impl<T: Returnable + Ord + Eq + PartialEq> Clone for IndexedPool<T> {
fn clone(&self) -> Self {
IndexedPool {
state: self.state.clone(),
capacity: self.capacity,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
struct NonResettableInt(i32);
impl Returnable for NonResettableInt {
fn on_return(&mut self) {}
}
impl From<i32> for NonResettableInt {
fn from(value: i32) -> Self {
NonResettableInt(value)
}
}
#[tokio::test]
async fn test_indexed_pool_sorting() {
let initial_elements = vec![
PoolValue::Direct(NonResettableInt::from(5)),
PoolValue::Direct(NonResettableInt::from(3)),
PoolValue::Direct(NonResettableInt::from(1)),
PoolValue::Direct(NonResettableInt::from(4)),
PoolValue::Direct(NonResettableInt::from(2)),
];
let pool = IndexedPool::new(initial_elements);
let contents = pool.get_contents().await;
assert_eq!(
contents,
vec![
NonResettableInt(1),
NonResettableInt(2),
NonResettableInt(3),
NonResettableInt(4),
NonResettableInt(5)
]
);
let mut item1 = pool.acquire().await;
assert_eq!(*item1, NonResettableInt(1));
let mut item2 = pool.acquire().await;
assert_eq!(*item2, NonResettableInt(2));
*item1 = NonResettableInt(10);
drop(item1);
let contents = pool.get_contents().await;
assert_eq!(
contents,
vec![
NonResettableInt(3),
NonResettableInt(4),
NonResettableInt(5),
NonResettableInt(10)
]
);
*item2 = NonResettableInt(4);
drop(item2);
let contents = pool.get_contents().await;
assert_eq!(
contents,
vec![
NonResettableInt(3),
NonResettableInt(4),
NonResettableInt(4),
NonResettableInt(5),
NonResettableInt(10)
]
);
}