use crate::config::GlobalExecutor;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use tokio::sync::Semaphore;
use tokio::time::timeout;
struct MockPool {
executors: Vec<Option<MockExecutor>>,
available: Semaphore,
borrow_count: AtomicU32,
return_count: AtomicU32,
}
#[derive(Clone)]
struct MockExecutor {
id: u32,
healthy: bool,
}
impl MockPool {
fn new(size: usize) -> Self {
let executors: Vec<_> = (0..size)
.map(|i| {
Some(MockExecutor {
id: i as u32,
healthy: true,
})
})
.collect();
Self {
executors,
available: Semaphore::new(size),
borrow_count: AtomicU32::new(0),
return_count: AtomicU32::new(0),
}
}
async fn pop_executor(&mut self) -> MockExecutor {
let permit = self.available.acquire().await.unwrap();
permit.forget();
self.borrow_count.fetch_add(1, Ordering::SeqCst);
for slot in &mut self.executors {
if let Some(executor) = slot.take() {
return executor;
}
}
unreachable!("No executors available despite semaphore permit")
}
fn return_executor(&mut self, executor: MockExecutor) {
self.return_count.fetch_add(1, Ordering::SeqCst);
if let Some(empty_slot) = self.executors.iter_mut().find(|slot| slot.is_none()) {
*empty_slot = Some(executor);
self.available.add_permits(1);
} else {
unreachable!("No empty slot found in the pool")
}
}
fn available_permits(&self) -> usize {
self.available.available_permits()
}
}
#[tokio::test]
async fn test_pool_creation_with_various_sizes() {
for size in [1, 2, 4, 8, 16] {
let pool = MockPool::new(size);
assert_eq!(pool.executors.len(), size);
assert_eq!(pool.available.available_permits(), size);
}
}
#[tokio::test]
async fn test_pool_creation_single_executor() {
let pool = MockPool::new(1);
assert_eq!(pool.executors.len(), 1);
assert_eq!(pool.available.available_permits(), 1);
}
#[tokio::test]
async fn test_borrow_and_return_single_executor() {
let mut pool = MockPool::new(1);
let executor = pool.pop_executor().await;
assert_eq!(pool.available_permits(), 0);
pool.return_executor(executor);
assert_eq!(pool.available_permits(), 1);
}
#[tokio::test]
async fn test_borrow_all_executors() {
let mut pool = MockPool::new(4);
let mut borrowed = Vec::new();
for _ in 0..4 {
borrowed.push(pool.pop_executor().await);
}
assert_eq!(pool.available_permits(), 0);
for executor in borrowed {
pool.return_executor(executor);
}
assert_eq!(pool.available_permits(), 4);
}
#[tokio::test]
async fn test_borrow_blocks_when_pool_exhausted() {
let semaphore = Arc::new(Semaphore::new(1));
let permit = semaphore.clone().acquire_owned().await.unwrap();
permit.forget();
assert_eq!(semaphore.available_permits(), 0);
let sem_clone = semaphore.clone();
let acquire_future = async move {
let _permit = sem_clone.acquire().await.unwrap();
};
let result = timeout(Duration::from_millis(50), acquire_future).await;
assert!(result.is_err(), "Should timeout waiting for permit");
semaphore.add_permits(1);
assert_eq!(semaphore.available_permits(), 1);
}
#[tokio::test]
async fn test_semaphore_concurrent_access() {
let semaphore = Arc::new(Semaphore::new(4));
let completed = Arc::new(AtomicU32::new(0));
let max_concurrent = Arc::new(AtomicU32::new(0));
let current = Arc::new(AtomicU32::new(0));
let handles: Vec<_> = (0..8)
.map(|i| {
let semaphore = semaphore.clone();
let completed = completed.clone();
let max_concurrent = max_concurrent.clone();
let current = current.clone();
GlobalExecutor::spawn(async move {
let permit = semaphore.acquire().await.unwrap();
let now_current = current.fetch_add(1, Ordering::SeqCst) + 1;
max_concurrent.fetch_max(now_current, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(10 + i * 5)).await;
current.fetch_sub(1, Ordering::SeqCst);
completed.fetch_add(1, Ordering::SeqCst);
drop(permit);
})
})
.collect();
for h in handles {
h.await.unwrap();
}
assert_eq!(completed.load(Ordering::SeqCst), 8);
assert!(max_concurrent.load(Ordering::SeqCst) <= 4);
}
#[tokio::test]
async fn test_rapid_borrow_return_cycles() {
let mut pool = MockPool::new(1);
for _ in 0..100 {
let executor = pool.pop_executor().await;
pool.return_executor(executor);
}
assert_eq!(pool.borrow_count.load(Ordering::SeqCst), 100);
assert_eq!(pool.return_count.load(Ordering::SeqCst), 100);
assert_eq!(pool.available_permits(), 1);
}
#[tokio::test]
async fn test_healthy_executor_reused() {
let mut pool = MockPool::new(1);
let executor = pool.pop_executor().await;
assert!(executor.healthy);
let id = executor.id;
pool.return_executor(executor);
let executor2 = pool.pop_executor().await;
assert_eq!(executor2.id, id); assert!(executor2.healthy);
}
#[tokio::test]
async fn test_unhealthy_executor_detected() {
let mut pool = MockPool::new(1);
let mut executor = pool.pop_executor().await;
executor.healthy = false;
pool.return_executor(executor);
let executor = pool.pop_executor().await;
assert!(!executor.healthy);
}
#[tokio::test]
async fn test_health_check_and_replace_pattern() {
let mut pool = MockPool::new(2);
let _broken_executor = pool.pop_executor().await;
let replacement = MockExecutor {
id: 100, healthy: true,
};
pool.return_executor(replacement);
let next = pool.pop_executor().await;
assert_eq!(next.id, 100);
assert!(next.healthy);
}
#[tokio::test]
async fn test_pool_exhaustion_with_timeout() {
let semaphore = Arc::new(Semaphore::new(2));
let permit1 = semaphore.clone().acquire_owned().await.unwrap();
let permit2 = semaphore.clone().acquire_owned().await.unwrap();
permit1.forget();
permit2.forget();
let sem_clone = semaphore.clone();
let borrow_result = timeout(Duration::from_millis(100), async move {
let _permit = sem_clone.acquire().await.unwrap();
})
.await;
assert!(borrow_result.is_err(), "Should timeout");
semaphore.add_permits(2);
assert_eq!(semaphore.available_permits(), 2);
}
#[tokio::test]
async fn test_pool_recovers_after_exhaustion() {
let semaphore = Arc::new(Semaphore::new(1));
let permit = semaphore.clone().acquire_owned().await.unwrap();
permit.forget();
let sem_clone = semaphore.clone();
let waiter = GlobalExecutor::spawn(async move {
let _permit = sem_clone.acquire().await.unwrap();
});
tokio::time::sleep(Duration::from_millis(50)).await;
semaphore.add_permits(1);
let result = timeout(Duration::from_millis(100), waiter).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_high_contention_semaphore() {
let semaphore = Arc::new(Semaphore::new(4));
let operations = Arc::new(AtomicU32::new(0));
let handles: Vec<_> = (0..20)
.map(|_| {
let semaphore = semaphore.clone();
let operations = operations.clone();
GlobalExecutor::spawn(async move {
for _ in 0..10 {
let permit = semaphore.acquire().await.unwrap();
tokio::time::sleep(Duration::from_micros(100)).await;
drop(permit);
operations.fetch_add(1, Ordering::SeqCst);
}
})
})
.collect();
for h in handles {
h.await.unwrap();
}
assert_eq!(operations.load(Ordering::SeqCst), 200);
assert_eq!(semaphore.available_permits(), 4);
}
#[tokio::test]
async fn test_return_without_borrow_panics() {
let mut pool = MockPool::new(1);
let executor = pool.pop_executor().await;
pool.return_executor(executor.clone());
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
pool.return_executor(executor);
}));
assert!(result.is_err());
}
#[tokio::test]
async fn test_mixed_healthy_unhealthy_executors() {
let mut pool = MockPool::new(4);
let mut executors = Vec::new();
for _ in 0..4 {
executors.push(pool.pop_executor().await);
}
executors[0].healthy = false;
executors[2].healthy = false;
for e in executors {
pool.return_executor(e);
}
let mut reborrowed = Vec::new();
for _ in 0..4 {
reborrowed.push(pool.pop_executor().await);
}
let healthy_count = reborrowed.iter().filter(|e| e.healthy).count();
let unhealthy_count = reborrowed.iter().filter(|e| !e.healthy).count();
assert_eq!(healthy_count, 2);
assert_eq!(unhealthy_count, 2);
}
#[tokio::test]
async fn test_permit_count_remains_stable() {
let mut pool = MockPool::new(4);
for _ in 0..50 {
let e1 = pool.pop_executor().await;
let e2 = pool.pop_executor().await;
pool.return_executor(e1);
pool.return_executor(e2);
}
assert_eq!(pool.available_permits(), 4);
}
#[tokio::test]
async fn test_permit_count_after_partial_borrow() {
let mut pool = MockPool::new(4);
let e1 = pool.pop_executor().await;
let e2 = pool.pop_executor().await;
assert_eq!(pool.available_permits(), 2);
pool.return_executor(e1);
assert_eq!(pool.available_permits(), 3);
pool.return_executor(e2);
assert_eq!(pool.available_permits(), 4);
}