use std::cell::Cell;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use futures_executor::block_on;
use async_resource::PoolConfig;
mod utils;
use utils::AtomicCounter;
fn counter_pool_config() -> PoolConfig<usize, ()> {
let source = Arc::new(AtomicCounter::default());
PoolConfig::<usize, ()>::new(move || {
let s = source.clone();
async move { Ok(s.increment()) }
})
}
#[test]
fn test_pool_acquire_order_timeout() {
let disposed = Arc::new(AtomicCounter::default());
let dcopy = disposed.clone();
let pool = counter_pool_config()
.idle_timeout(Duration::from_secs(1))
.dispose(move |_res, _| {
dcopy.increment();
})
.build();
block_on(async move {
let mut fst = pool.acquire().await.unwrap();
let mut snd = pool.acquire().await.unwrap();
assert_eq!(*fst, 1);
assert_eq!(*snd, 2);
drop(snd);
snd = pool.acquire().await.unwrap();
assert_eq!(*snd, 2);
drop(fst);
fst = pool.acquire().await.unwrap();
assert_eq!(*fst, 1);
drop(fst);
drop(snd);
pool.drain(Duration::from_millis(500)).await.unwrap();
assert_eq!(disposed.value(), 2);
})
}
#[test]
fn test_pool_acquire_order_no_timeout() {
let disposed = Arc::new(AtomicCounter::default());
let dcopy = disposed.clone();
let mut pool = counter_pool_config()
.dispose(move |_res, _| {
dcopy.increment();
})
.build();
block_on(async move {
let fst = pool.acquire().await.unwrap();
let snd = pool.acquire().await.unwrap();
assert_eq!(*fst, 1);
assert_eq!(*snd, 2);
drop(snd);
let trd = pool.acquire().await.unwrap();
assert_eq!(*trd, 3);
drop(fst);
assert_eq!(disposed.value(), 2);
pool = pool.drain(Duration::from_millis(50)).await.unwrap_err();
drop(trd);
pool.drain(Duration::from_millis(500)).await.unwrap();
});
}
#[test]
fn test_pool_not_sync() {
let source = Arc::new(AtomicCounter::default());
let pool = PoolConfig::<Cell<usize>, ()>::new(move || {
let s = source.clone();
async move { Ok(Cell::new(s.increment())) }
})
.build();
block_on(async move {
assert_eq!(pool.acquire().await.unwrap().get(), 1);
});
}
#[test]
fn test_pool_waiter() {
let waiting = Arc::new(AtomicCounter::default());
let results = Arc::new(Mutex::new(vec![]));
let pool = counter_pool_config().max_count(1).build();
let p1 = pool.clone();
let mut waiters = 3;
results
.lock()
.unwrap()
.push(block_on(async move { p1.acquire().await.unwrap() }));
for _ in 0..waiters {
let pool = pool.clone();
let results = results.clone();
let waiting = waiting.clone();
thread::spawn(|| {
block_on(async move {
let wait = pool.acquire();
waiting.increment();
let result = wait.await;
results.lock().unwrap().push(result.unwrap());
waiting.decrement();
})
});
}
loop {
if waiting.value() == waiters {
break;
}
thread::yield_now();
}
loop {
assert_eq!(*results.lock().unwrap().pop().unwrap(), 1);
waiters -= 1;
loop {
if waiting.value() == waiters {
break;
}
assert!(results.lock().unwrap().len() <= 1);
thread::yield_now();
}
if waiters == 0 {
break;
}
}
let mut res = results.lock().unwrap();
assert!(*res.pop().unwrap() == 1);
assert!(res.is_empty());
}