use fast_pool::{Manager, Pool};
use std::fmt::Display;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug)]
pub struct TestManager {}
impl TestManager {
pub fn new() -> TestManager {
TestManager {}
}
pub fn hello(&self) {
println!("hello")
}
}
#[derive(Debug, Clone)]
pub struct TestConnection {
pub inner: String,
}
impl TestConnection {
pub fn new() -> TestConnection {
println!("new Connection");
TestConnection {
inner: "".to_string(),
}
}
}
impl Display for TestConnection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.inner)
}
}
impl Deref for TestConnection {
type Target = String;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for TestConnection {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl Drop for TestConnection {
fn drop(&mut self) {
println!("drop Connection");
}
}
impl Manager for TestManager {
type Connection = TestConnection;
type Error = String;
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
Ok(TestConnection::new())
}
async fn check(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
if conn.inner != "" {
return Err(Self::Error::from(&conn.to_string()));
}
Ok(())
}
}
#[tokio::test]
async fn test_debug() {
let p = Pool::new(TestManager {});
println!("{:?}", p);
}
#[tokio::test]
async fn test_clone() {
let p = Pool::new(TestManager {});
let p2 = p.clone();
assert_eq!(p.state(), p2.state());
}
#[tokio::test]
async fn test_pool_get() {
let p = Pool::new(TestManager {});
p.set_max_open(10);
let mut arr = vec![];
for i in 0..10 {
let v = p.get().await.unwrap();
println!("{},{}", i, v.deref());
arr.push(v);
}
}
#[tokio::test]
async fn test_pool_get2() {
let p = Pool::new(TestManager {});
p.set_max_open(10);
for i in 0..3 {
let v = p.get().await.unwrap();
println!("{},{}", i, v.deref().inner.as_str());
}
assert_eq!(p.state().idle, 3);
}
#[tokio::test]
async fn test_pool_get_timeout() {
let p = Pool::new(TestManager {});
p.set_max_open(10);
let mut arr = vec![];
for i in 0..10 {
let v = p.get().await.unwrap();
println!("{},{}", i, v.deref());
arr.push(v);
}
assert_eq!(
p.get_timeout(Some(Duration::from_secs(0))).await.is_err(),
true
);
}
#[tokio::test]
async fn test_pool_check() {
let p = Pool::new(TestManager {});
p.set_max_open(10);
let mut v = p.get().await.unwrap();
*v.inner.as_mut().unwrap() = TestConnection {
inner: "error".to_string(),
};
for _i in 0..10 {
let v = p.get().await.unwrap();
assert_eq!(v.deref().inner == "error", false);
}
}
#[tokio::test]
async fn test_pool_resize() {
let p = Pool::new(TestManager {});
p.set_max_open(10);
let mut arr = vec![];
for i in 0..10 {
let v = p.get().await.unwrap();
println!("{},{}", i, v.deref());
arr.push(v);
}
assert_eq!(
p.get_timeout(Some(Duration::from_secs(0))).await.is_err(),
true
);
p.set_max_open(11);
assert_eq!(
p.get_timeout(Some(Duration::from_secs(0))).await.is_err(),
false
);
arr.push(p.get().await.unwrap());
assert_eq!(
p.get_timeout(Some(Duration::from_secs(0))).await.is_err(),
true
);
}
#[tokio::test]
async fn test_pool_resize2() {
let p = Pool::new(TestManager {});
p.set_max_open(2);
let mut arr = vec![];
for _i in 0..2 {
let v = p.get().await.unwrap();
arr.push(v);
}
p.set_max_open(1);
drop(arr);
println!("{:?}", p.state());
assert_eq!(
p.get_timeout(Some(Duration::from_secs(0))).await.is_err(),
false
);
}
#[tokio::test]
async fn test_concurrent_access() {
let p = Pool::new(TestManager {});
p.set_max_open(10);
let mut handles = vec![];
for _ in 0..10 {
let pool = p.clone();
let handle = tokio::spawn(async move {
let _ = pool.get().await.unwrap();
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
assert_eq!(p.state().connections, 10);
}
#[tokio::test]
async fn test_invalid_connection() {
let p = Pool::new(TestManager {});
p.set_max_open(1);
let mut conn = p.get().await.unwrap();
*conn.inner.as_mut().unwrap() = TestConnection {
inner: "error".to_string(),
};
drop(conn);
println!("pool state: {}", p.state());
let new_conn = p.get().await.unwrap();
assert_ne!(new_conn.deref().inner, "error".to_string());
}
#[tokio::test]
async fn test_connection_lifetime() {
let p = Pool::new(TestManager {});
p.set_max_open(10);
let conn = p.get().await.unwrap();
drop(conn);
assert_eq!(p.state().in_use, 0);
let new_conn = p.get().await.unwrap();
assert_ne!(new_conn.deref().inner, "error".to_string());
}
#[tokio::test]
async fn test_boundary_conditions() {
let p = Pool::new(TestManager {});
p.set_max_open(2);
let conn_1 = p.get().await.unwrap();
let _conn_2 = p.get().await.unwrap();
println!("{}", p.state());
assert_eq!(p.state().in_use, 2);
assert!(p.get_timeout(Some(Duration::from_secs(0))).await.is_err());
drop(conn_1);
assert_eq!(p.state().in_use, 1);
let _conn_3 = p.get().await.unwrap();
assert_eq!(p.state().in_use, 2);
p.set_max_open(3);
let _conn_4 = p.get().await.unwrap();
assert_eq!(p.state().in_use, 3);
}
#[tokio::test]
async fn test_pool_wait() {
let p = Pool::new(TestManager {});
p.set_max_open(1);
let v = p.get().await.unwrap();
let p1 = p.clone();
tokio::spawn(async move {
p1.get().await.unwrap();
drop(p1);
});
let p1 = p.clone();
tokio::spawn(async move {
p1.get().await.unwrap();
drop(p1);
});
tokio::time::sleep(Duration::from_secs(1)).await;
println!("{:?}", p.state());
assert_eq!(p.state().waits, 2);
drop(v);
}
#[tokio::test]
async fn test_high_concurrency_with_timeout() {
let p = Pool::new(TestManager {});
p.set_max_open(5);
let success_count = Arc::new(AtomicUsize::new(0));
let timeout_count = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
let task_count = 50;
for _ in 0..task_count {
let pool = p.clone();
let success = success_count.clone();
let timeout = timeout_count.clone();
let handle = tokio::spawn(async move {
match pool.get_timeout(Some(Duration::from_millis(50))).await {
Ok(conn) => {
success.fetch_add(1, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(20)).await;
drop(conn);
}
Err(_) => {
timeout.fetch_add(1, Ordering::SeqCst);
}
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
println!("Final pool state: {:?}", p.state());
println!(
"Successful connections: {}",
success_count.load(Ordering::SeqCst)
);
println!("Timeouts: {}", timeout_count.load(Ordering::SeqCst));
assert_eq!(
success_count.load(Ordering::SeqCst) + timeout_count.load(Ordering::SeqCst),
task_count
);
assert!(p.state().connections <= p.state().max_open);
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(p.state().in_use, 0);
assert!(p.state().idle <= p.state().max_open);
}
#[tokio::test]
async fn test_concurrent_create_connection() {
let p = Pool::new(TestManager {});
let max_connections = 10;
p.set_max_open(max_connections);
p.set_max_open(0);
p.set_max_open(max_connections);
let tasks = 30;
let mut handles = vec![];
for i in 0..tasks {
let pool = p.clone();
let handle = tokio::spawn(async move {
let result = pool.get().await;
println!("Task {} get connection: {}", i, result.is_ok());
result
});
handles.push(handle);
}
let mut success_count = 0;
for handle in handles {
if handle.await.unwrap().is_ok() {
success_count += 1;
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
println!("Pool state: {:?}", p.state());
println!("Successfully created connections: {}", success_count);
assert!(p.state().connections <= max_connections);
assert_eq!(p.state().in_use, 0);
assert!(p.state().idle <= max_connections);
}
#[tokio::test]
async fn test_high_concurrency_long_connections() {
let p = Pool::new(TestManager {});
let max_connections = 20; p.set_max_open(max_connections);
p.set_max_open(0);
p.set_max_open(max_connections);
let task_count = 200; let connection_duration = Duration::from_secs(3);
let success_count = Arc::new(AtomicUsize::new(0));
let timeout_count = Arc::new(AtomicUsize::new(0));
let in_progress = Arc::new(AtomicUsize::new(0));
let max_in_progress = Arc::new(AtomicUsize::new(0));
let update_max = |current: usize, max_tracker: &Arc<AtomicUsize>| {
let mut current_max = max_tracker.load(Ordering::Relaxed);
while current > current_max {
match max_tracker.compare_exchange_weak(
current_max,
current,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(val) => current_max = val,
}
}
};
println!("Starting high concurrency test with long-lived connections");
println!(
"Max connections: {}, Tasks: {}, Connection duration: {:?}",
max_connections, task_count, connection_duration
);
let mut handles = vec![];
for id in 0..task_count {
let pool = p.clone();
let success = success_count.clone();
let timeout = timeout_count.clone();
let in_progress_counter = in_progress.clone();
let max_in_progress_counter = max_in_progress.clone();
let handle = tokio::spawn(async move {
match pool.get_timeout(Some(Duration::from_secs(1))).await {
Ok(conn) => {
success.fetch_add(1, Ordering::SeqCst);
let current = in_progress_counter.fetch_add(1, Ordering::SeqCst) + 1;
update_max(current, &max_in_progress_counter);
println!("Task {} got connection, in-progress: {}", id, current);
tokio::time::sleep(connection_duration).await;
let remaining = in_progress_counter.fetch_sub(1, Ordering::SeqCst) - 1;
println!("Task {} completed, in-progress: {}", id, remaining);
drop(conn);
}
Err(_) => {
timeout.fetch_add(1, Ordering::SeqCst);
println!("Task {} timed out waiting for connection", id);
}
}
});
handles.push(handle);
tokio::time::sleep(Duration::from_millis(20)).await;
}
let p_status = p.clone();
let in_progress_status = in_progress.clone();
let status_handle = tokio::spawn(async move {
for _ in 0..20 {
tokio::time::sleep(Duration::from_secs(1)).await;
println!(
"Pool status: {:?}, In-progress: {}",
p_status.state(),
in_progress_status.load(Ordering::SeqCst)
);
}
});
for handle in handles {
handle.await.unwrap();
}
let _ = status_handle.await;
println!("Connection pool stats:");
println!(" Max connections setting: {}", max_connections);
println!(" Total tasks: {}", task_count);
println!(
" Successful connections: {}",
success_count.load(Ordering::SeqCst)
);
println!(
" Connection timeouts: {}",
timeout_count.load(Ordering::SeqCst)
);
println!(
" Max concurrent connections: {}",
max_in_progress.load(Ordering::SeqCst)
);
println!(" Final pool state: {:?}", p.state());
assert!(max_in_progress.load(Ordering::SeqCst) <= max_connections as usize);
assert!(p.state().connections <= max_connections);
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(p.state().in_use, 0);
}
#[tokio::test]
async fn test_concurrent_create_connection_less_for_max_open() {
let p = Pool::new(TestManager {});
p.set_max_open(10);
for _ in 0..1000 {
let p1 = p.clone();
tokio::spawn(async move {
loop {
let result = p1.get().await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
drop(result);
}
});
}
for _ in 0..5 {
let state = p.state();
println!("{}", state);
assert_eq!(state.connections <= state.max_open, true);
assert_eq!(state.in_use <= state.max_open, true);
assert_eq!(state.idle <= state.max_open, true);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
#[tokio::test]
async fn test_change_max_open() {
let p = Pool::new(TestManager {});
p.set_max_open(4);
let c1 = p.get().await.unwrap();
let c2 = p.get().await.unwrap();
let c3 = p.get().await.unwrap();
let c4 = p.get().await.unwrap();
p.set_max_open(2);
drop(c1);
drop(c2);
drop(c3);
drop(c4);
println!("{}", p.state());
println!("len {}", p.idle_send.len());
}
#[tokio::test]
async fn test_change_max_open2() {
let p = Pool::new(TestManager {});
p.set_max_open(4);
let c1 = p.get().await.unwrap();
let c2 = p.get().await.unwrap();
let c3 = p.get().await.unwrap();
let c4 = p.get().await.unwrap();
drop(c1);
drop(c2);
drop(c3);
drop(c4);
p.set_max_open(2);
println!("{}", p.state());
println!("len {}", p.idle_send.len());
}
#[tokio::test]
async fn test_tokio_cancel() {
let p = Pool::new(TestManager {});
p.set_max_open(2);
let p1 = p.clone();
let task = tokio::spawn(async move {
let c1 = p1.get().await.unwrap();
let c2 = p1.get().await.unwrap();
tokio::time::sleep(Duration::from_secs(10)).await;
drop(c1);
drop(c2);
});
tokio::time::sleep(Duration::from_secs(1)).await;
task.abort();
tokio::time::sleep(Duration::from_secs(1)).await;
println!("{}", p.state());
assert_eq!(p.state().in_use, 0);
}
#[tokio::test]
async fn test_tokio_panic() {
let p = Pool::new(TestManager {});
p.set_max_open(2);
let p1 = p.clone();
let _task = tokio::spawn(async move {
let _c1 = p1.get().await.unwrap();
let _c2 = p1.get().await.unwrap();
panic!("test_tokio_panic");
});
tokio::time::sleep(Duration::from_secs(3)).await;
println!("{}", p.state());
assert_eq!(p.state().in_use, 0);
}
#[tokio::test]
async fn test_timeout_zero() {
let p = Pool::new(TestManager {});
p.set_max_open(1);
p.set_timeout_check(None);
let v = p.get().await.unwrap();
println!("{:?}", v.inner);
}
#[tokio::test]
async fn test_pool_drop() {
let p = Pool::new(TestManager {});
p.set_max_open(1);
let v = p.get().await.unwrap();
println!("{:?}", v.inner);
drop(v);
drop(p);
}
#[tokio::test]
async fn test_connection_check_success_path() {
#[derive(Debug, Default)]
struct CheckManager {
connection_count: std::sync::atomic::AtomicU64,
}
#[derive(Debug)]
struct CheckConnection {
valid: bool,
}
impl Manager for CheckManager {
type Connection = CheckConnection;
type Error = String;
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
self.connection_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(CheckConnection { valid: true })
}
async fn check(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
if conn.valid {
Ok(())
} else {
Err("Invalid connection".to_string())
}
}
}
let manager = CheckManager::default();
let pool = Pool::new(manager);
let _guard = pool.get().await.unwrap();
}
#[tokio::test]
async fn test_downcast() {
let p = Pool::new(TestManager {});
p.set_max_open(1);
let manager = p.downcast_manager::<TestManager>().unwrap();
manager.hello();
}
#[tokio::test]
async fn test_check_timeout_should_reduce_connections() {
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
#[derive(Debug)]
struct FailCheckManager {
connect_count: Arc<AtomicUsize>,
check_count: Arc<AtomicUsize>,
}
struct TestConnection;
impl Manager for FailCheckManager {
type Connection = TestConnection;
type Error = String;
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
self.connect_count.fetch_add(1, Ordering::SeqCst);
Ok(TestConnection)
}
async fn check(&self, _conn: &mut Self::Connection) -> Result<(), Self::Error> {
self.check_count.fetch_add(1, Ordering::SeqCst);
Err("check failed".to_string()) }
}
let connect_count = Arc::new(AtomicUsize::new(0));
let check_count = Arc::new(AtomicUsize::new(0));
let manager = FailCheckManager {
connect_count: connect_count.clone(),
check_count: check_count.clone(),
};
let pool = Pool::new(manager);
pool.set_max_open(2);
println!("Pool state: {:?}", pool.state());
let result = pool.get().await;
println!("First get result: {:?}", result.is_ok());
println!("Pool state: {:?}", pool.state());
let result2 = pool.get_timeout(Some(Duration::from_secs(1))).await;
println!("Second get result: {:?}", result2.is_ok());
println!("Pool state: {:?}", pool.state());
assert_eq!(pool.state().connections, 0, "connections should be 0 after failed checks");
}
#[tokio::test]
async fn test_multiple_check_timeouts_should_not_deadlock() {
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
#[derive(Debug)]
struct SlowCheckManager {
connect_count: Arc<AtomicUsize>,
}
struct SlowConnection;
impl Manager for SlowCheckManager {
type Connection = SlowConnection;
type Error = String;
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
self.connect_count.fetch_add(1, Ordering::SeqCst);
Ok(SlowConnection)
}
async fn check(&self, _conn: &mut Self::Connection) -> Result<(), Self::Error> {
tokio::time::sleep(Duration::from_secs(3600)).await;
Ok(())
}
}
let connect_count = Arc::new(AtomicUsize::new(0));
let manager = SlowCheckManager {
connect_count: connect_count.clone(),
};
let pool = Pool::new(manager);
pool.set_max_open(3);
pool.set_timeout_check(Some(Duration::from_millis(100)));
let mut handles = vec![];
let conn_count = connect_count.clone();
for i in 0..5 {
let pool = pool.clone();
let handle = tokio::spawn(async move {
let start = std::time::Instant::now();
let result = pool.get_timeout(Some(Duration::from_secs(2))).await;
let elapsed = start.elapsed();
println!("Request {} took {:?}, result: {:?}", i, elapsed, result.is_ok());
(i, result)
});
handles.push(handle);
}
let mut results = vec![];
for handle in handles {
results.push(handle.await.unwrap());
}
let total_connects = conn_count.load(Ordering::SeqCst);
println!("Total connections created: {}", total_connects);
println!("Final pool state: {:?}", pool.state());
assert_eq!(results.len(), 5);
assert!(pool.state().connections <= pool.state().max_open);
}