use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use crate::cx::Cx;
use crate::sync::{GenericPool, Pool, PoolConfig};
use crate::types::Time;
#[derive(Debug, Clone)]
struct MockResource {
_id: usize,
_created_at: std::time::Instant,
}
#[derive(Debug)]
struct MockFactory {
counter: Arc<AtomicUsize>,
creation_delay: Duration,
}
impl MockFactory {
fn new(creation_delay: Duration) -> Self {
Self {
counter: Arc::new(AtomicUsize::new(0)),
creation_delay,
}
}
}
impl crate::sync::AsyncResourceFactory for MockFactory {
type Resource = MockResource;
type Error = std::io::Error;
fn create(
&self,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<Self::Resource, Self::Error>> + Send + '_>,
> {
let id = self.counter.fetch_add(1, Ordering::AcqRel);
let delay = self.creation_delay;
Box::pin(async move {
crate::time::sleep(Time::ZERO, delay).await;
Ok(MockResource {
_id: id,
_created_at: std::time::Instant::now(),
})
})
}
}
#[derive(Debug, Clone)]
enum PoolOperation {
Acquire { hold_ms: u64 },
TryAcquire { hold_ms: u64 },
AcquireWithCancel { hold_ms: u64, cancel_after_ms: u64 },
}
async fn execute_operations(
pool: &GenericPool<MockResource, MockFactory>,
operations: &[PoolOperation],
) -> OperationResults {
let mut acquired_count = 0;
let mut returned_count = 0;
let mut discarded_count = 0;
let mut cancel_count = 0;
for op in operations {
match op {
PoolOperation::Acquire { hold_ms } => {
let cx = Cx::for_testing();
if let Ok(resource) = pool.acquire(&cx).await {
acquired_count += 1;
let delay = Duration::from_millis(*hold_ms);
crate::time::sleep(Time::ZERO, delay).await;
resource.return_to_pool();
returned_count += 1;
}
}
PoolOperation::TryAcquire { hold_ms } => {
if let Some(resource) = pool.try_acquire() {
acquired_count += 1;
let delay = Duration::from_millis(*hold_ms);
crate::time::sleep(Time::ZERO, delay).await;
if acquired_count % 3 == 0 {
resource.discard();
discarded_count += 1;
} else {
resource.return_to_pool();
returned_count += 1;
}
}
}
PoolOperation::AcquireWithCancel {
hold_ms,
cancel_after_ms,
} => {
let cx = Cx::for_testing();
let cancel_cx = cx.clone();
let acquire_future = pool.acquire(&cancel_cx);
let cancel_future = async {
crate::time::sleep(Time::ZERO, Duration::from_millis(*cancel_after_ms)).await;
cancel_cx.set_cancel_requested(true);
};
futures_lite::future::race(
async {
if let Ok(resource) = acquire_future.await {
acquired_count += 1;
crate::time::sleep(Time::ZERO, Duration::from_millis(*hold_ms)).await;
resource.return_to_pool();
returned_count += 1;
}
},
async {
cancel_future.await;
cancel_count += 1;
},
)
.await;
}
}
crate::time::sleep(Time::ZERO, Duration::from_millis(10)).await;
}
OperationResults {
acquired_count,
returned_count,
discarded_count,
cancel_count,
}
}
#[derive(Debug, Clone, PartialEq)]
struct OperationResults {
acquired_count: usize,
returned_count: usize,
discarded_count: usize,
cancel_count: usize,
}
#[cfg(test)]
mod metamorphic_tests {
use super::*;
#[test]
fn mr_pool_accounting_invariant() {
futures_lite::future::block_on(async {
crate::test_utils::init_test_logging();
crate::test_phase!("mr_pool_accounting_invariant");
let config = PoolConfig::default().max_size(5).min_size(1);
let factory = MockFactory::new(Duration::from_millis(1));
let pool = GenericPool::new(factory, config);
for _ in 0..10 {
let cx = Cx::for_testing();
let initial_stats = pool.stats();
assert_eq!(
initial_stats.active + initial_stats.idle,
initial_stats.total,
"Invariant violated initially: active={}, idle={}, total={}",
initial_stats.active,
initial_stats.idle,
initial_stats.total
);
if let Ok(resource) = pool.acquire(&cx).await {
let during_stats = pool.stats();
assert_eq!(
during_stats.active + during_stats.idle,
during_stats.total,
"Invariant violated during hold: active={}, idle={}, total={}",
during_stats.active,
during_stats.idle,
during_stats.total
);
resource.return_to_pool();
crate::time::sleep(Time::ZERO, Duration::from_millis(10)).await;
let final_stats = pool.stats();
assert_eq!(
final_stats.active + final_stats.idle,
final_stats.total,
"Invariant violated after return: active={}, idle={}, total={}",
final_stats.active,
final_stats.idle,
final_stats.total
);
}
}
crate::test_complete!("mr_pool_accounting_invariant");
});
}
#[test]
fn mr_cancel_safety_invariance() {
futures_lite::future::block_on(async {
crate::test_utils::init_test_logging();
crate::test_phase!("mr_cancel_safety_invariance");
let config = PoolConfig::default().max_size(3);
let factory = MockFactory::new(Duration::from_millis(10));
let pool = GenericPool::new(factory, config);
let cx = Cx::for_testing();
let cancel_cx = cx.clone();
let acquire_future = pool.acquire(&cancel_cx);
let cancel_future = async {
crate::time::sleep(Time::ZERO, Duration::from_millis(5)).await;
cancel_cx.set_cancel_requested(true);
};
futures_lite::future::race(
async {
if let Ok(resource) = acquire_future.await {
resource.return_to_pool();
}
},
async {
cancel_future.await;
},
)
.await;
crate::time::sleep(Time::ZERO, Duration::from_millis(50)).await;
let final_stats = pool.stats();
assert_eq!(
final_stats.active + final_stats.idle,
final_stats.total,
"Pool accounting broken by cancellation"
);
crate::test_complete!("mr_cancel_safety_invariance");
});
}
#[test]
fn mr_return_vs_drop_equivalence() {
futures_lite::future::block_on(async {
crate::test_utils::init_test_logging();
crate::test_phase!("mr_return_vs_drop_equivalence");
let config = PoolConfig::default().max_size(2);
let factory1 = MockFactory::new(Duration::from_millis(1));
let pool1 = GenericPool::new(factory1, config.clone());
let cx = Cx::for_testing();
let resource1 = pool1.acquire(&cx).await.unwrap();
resource1.return_to_pool(); crate::time::sleep(Time::ZERO, Duration::from_millis(10)).await;
let stats1 = pool1.stats();
let factory2 = MockFactory::new(Duration::from_millis(1));
let pool2 = GenericPool::new(factory2, config);
let resource2 = pool2.acquire(&cx).await.unwrap();
drop(resource2); crate::time::sleep(Time::ZERO, Duration::from_millis(10)).await;
let stats2 = pool2.stats();
assert_eq!(
stats1.active, stats2.active,
"Active count differs between return methods"
);
assert_eq!(stats1.active + stats1.idle, stats1.total);
assert_eq!(stats2.active + stats2.idle, stats2.total);
assert_eq!(
stats1.active, 0,
"Resource not returned properly (explicit)"
);
assert_eq!(stats2.active, 0, "Resource not returned properly (drop)");
crate::test_complete!("mr_return_vs_drop_equivalence");
});
}
#[test]
fn mr_discard_vs_return_counting() {
futures_lite::future::block_on(async {
crate::test_utils::init_test_logging();
crate::test_phase!("mr_discard_vs_return_counting");
let config = PoolConfig::default().max_size(3);
let factory1 = MockFactory::new(Duration::from_millis(1));
let pool1 = GenericPool::new(factory1, config.clone());
let cx = Cx::for_testing();
let resource1 = pool1.acquire(&cx).await.unwrap();
resource1.return_to_pool();
crate::time::sleep(Time::ZERO, Duration::from_millis(10)).await;
let stats_return = pool1.stats();
let factory2 = MockFactory::new(Duration::from_millis(1));
let pool2 = GenericPool::new(factory2, config);
let resource2 = pool2.acquire(&cx).await.unwrap();
resource2.discard(); crate::time::sleep(Time::ZERO, Duration::from_millis(10)).await;
let stats_discard = pool2.stats();
assert!(
stats_discard.total <= stats_return.total,
"Discard should not increase total: discard_total={}, return_total={}",
stats_discard.total,
stats_return.total
);
assert_eq!(stats_return.active + stats_return.idle, stats_return.total);
assert_eq!(
stats_discard.active + stats_discard.idle,
stats_discard.total
);
crate::test_complete!("mr_discard_vs_return_counting");
});
}
#[test]
fn mr_hold_duration_invariance() {
futures_lite::future::block_on(async {
crate::test_utils::init_test_logging();
crate::test_phase!("mr_hold_duration_invariance");
let config = PoolConfig::default().max_size(2);
let factory1 = MockFactory::new(Duration::from_millis(1));
let pool1 = GenericPool::new(factory1, config.clone());
let cx = Cx::for_testing();
let resource1 = pool1.acquire(&cx).await.unwrap();
crate::time::sleep(Time::ZERO, Duration::from_millis(5)).await; resource1.return_to_pool();
crate::time::sleep(Time::ZERO, Duration::from_millis(10)).await;
let stats_short = pool1.stats();
let factory2 = MockFactory::new(Duration::from_millis(1));
let pool2 = GenericPool::new(factory2, config);
let resource2 = pool2.acquire(&cx).await.unwrap();
crate::time::sleep(Time::ZERO, Duration::from_millis(50)).await; resource2.return_to_pool();
crate::time::sleep(Time::ZERO, Duration::from_millis(10)).await;
let stats_long = pool2.stats();
assert_eq!(stats_short.active + stats_short.idle, stats_short.total);
assert_eq!(stats_long.active + stats_long.idle, stats_long.total);
assert_eq!(
stats_short.active, 0,
"Resource not returned after short hold"
);
assert_eq!(
stats_long.active, 0,
"Resource not returned after long hold"
);
crate::test_complete!("mr_hold_duration_invariance");
});
}
#[test]
fn mr_composite_operation_sequence() {
futures_lite::future::block_on(async {
crate::test_utils::init_test_logging();
crate::test_phase!("mr_composite_operation_sequence");
let config = PoolConfig::default().max_size(3).min_size(1);
let max_size = config.max_size;
let factory = MockFactory::new(Duration::from_millis(2));
let pool = GenericPool::new(factory, config);
let operations = vec![
PoolOperation::Acquire { hold_ms: 10 },
PoolOperation::TryAcquire { hold_ms: 5 },
PoolOperation::AcquireWithCancel {
hold_ms: 20,
cancel_after_ms: 5,
},
PoolOperation::Acquire { hold_ms: 15 },
];
let initial_stats = pool.stats();
assert_eq!(
initial_stats.active + initial_stats.idle,
initial_stats.total
);
let _results = execute_operations(&pool, &operations).await;
crate::time::sleep(Time::ZERO, Duration::from_millis(100)).await;
let final_stats = pool.stats();
assert_eq!(
final_stats.active + final_stats.idle,
final_stats.total,
"Pool accounting violated after composite operations"
);
assert!(
final_stats.total <= max_size,
"Pool exceeded max_size: total={}, max={}",
final_stats.total,
max_size
);
crate::test_complete!("mr_composite_operation_sequence");
});
}
}