#![cfg(test)]
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::cx::Cx;
use crate::lab::LabRuntime;
use crate::sync::{AcquireError, Semaphore};
use crate::types::CancelKind;
use proptest::prelude::*;
#[derive(Debug)]
struct PermitConservation {
initial_permits: usize,
semaphore: Arc<Semaphore>,
live_permits: AtomicUsize,
}
impl PermitConservation {
fn new(initial_permits: usize) -> Self {
Self {
initial_permits,
semaphore: Arc::new(Semaphore::new(initial_permits)),
live_permits: AtomicUsize::new(0),
}
}
fn verify_conservation(&self) {
let available = self.semaphore.available_permits();
let held = self.live_permits.load(Ordering::Acquire);
let total = available + held;
assert_eq!(
total, self.initial_permits,
"Permit conservation violated: initial={}, available={}, held={}, total={}",
self.initial_permits, available, held, total
);
}
}
#[test]
fn mr_idempotent_close() {
let sem = Arc::new(Semaphore::new(5));
let sem1 = sem.clone();
sem1.close();
let closed_once = sem1.is_closed();
let permits_once = sem1.available_permits();
let sem2 = Arc::new(Semaphore::new(5));
sem2.close();
sem2.close();
sem2.close();
let closed_multiple = sem2.is_closed();
let permits_multiple = sem2.available_permits();
assert_eq!(
closed_once, closed_multiple,
"Multiple closes should be idempotent"
);
assert_eq!(
permits_once, permits_multiple,
"Permit count after close should be idempotent"
);
}
#[test]
fn mr_cancellation_equivalence() {
futures_lite::future::block_on(async {
let initial_permits = 10;
let sem_a = Arc::new(Semaphore::new(initial_permits));
let permit_count_a = Arc::new(AtomicUsize::new(0));
let cx_a = Cx::for_testing();
let _permit_a1 = sem_a.acquire(&cx_a, 3).await.unwrap();
permit_count_a.fetch_add(3, Ordering::Relaxed);
let cx_cancel = Cx::for_testing();
cx_cancel.set_cancel_requested(true);
let result_a = sem_a.acquire(&cx_cancel, 5).await;
assert!(result_a.is_err());
let available_a = sem_a.available_permits();
let sem_b = Arc::new(Semaphore::new(initial_permits));
let cx_b = Cx::for_testing();
let _permit_b1 = sem_b.acquire(&cx_b, 3).await.unwrap();
let available_b = sem_b.available_permits();
assert_eq!(
available_a, available_b,
"Equivalent permit usage should yield same available count regardless of cancellation pattern"
);
});
}
#[test]
fn mr_scaling_linearity() {
futures_lite::future::block_on(async {
let base_permits = 6;
let scale_factor = 3;
let sem_base = Arc::new(Semaphore::new(base_permits));
let cx = Cx::for_testing();
let _permit1 = sem_base.acquire(&cx, 2).await.unwrap();
let available_base = sem_base.available_permits();
let sem_scaled = Arc::new(Semaphore::new(base_permits * scale_factor));
let _permit2 = sem_scaled.acquire(&cx, 2 * scale_factor).await.unwrap();
let available_scaled = sem_scaled.available_permits();
assert_eq!(
available_scaled,
available_base * scale_factor,
"Scaling all permits by factor {} should scale results proportionally",
scale_factor
);
});
}
#[test]
fn mr_acquire_release_roundtrip() {
futures_lite::future::block_on(async {
let sem = Arc::new(Semaphore::new(10));
let cx = Cx::for_testing();
let original_permits = sem.available_permits();
let permit = sem.acquire(&cx, 4).await.unwrap();
let after_acquire = sem.available_permits();
assert_eq!(after_acquire, original_permits - 4);
drop(permit);
let after_release = sem.available_permits();
assert_eq!(
after_release, original_permits,
"Acquire-release roundtrip should restore original permit count"
);
});
}
#[test]
fn mr_non_overlapping_commutativity() {
futures_lite::future::block_on(async {
let sem = Arc::new(Semaphore::new(10));
let cx = Cx::for_testing();
let sem_ab = sem.clone();
let permit_a1 = sem_ab.acquire(&cx, 3).await.unwrap();
drop(permit_a1);
let permit_b1 = sem_ab.acquire(&cx, 2).await.unwrap();
drop(permit_b1);
let final_ab = sem_ab.available_permits();
let sem_ba = Arc::new(Semaphore::new(10));
let permit_b2 = sem_ba.acquire(&cx, 2).await.unwrap();
drop(permit_b2);
let permit_a2 = sem_ba.acquire(&cx, 3).await.unwrap();
drop(permit_a2);
let final_ba = sem_ba.available_permits();
assert_eq!(
final_ab, final_ba,
"Non-overlapping acquire/release sequences should be commutative"
);
});
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum RetryErrorKind {
Cancelled,
Closed,
}
fn canonical_acquire_error(error: AcquireError) -> RetryErrorKind {
match error {
AcquireError::Cancelled => RetryErrorKind::Cancelled,
AcquireError::Closed | AcquireError::PolledAfterCompletion => RetryErrorKind::Closed,
}
}
fn retry_error_sequence(seed: u64, attempts: usize, closed: bool) -> Vec<RetryErrorKind> {
let mut runtime = LabRuntime::with_seed(seed);
let mut sequence = Vec::with_capacity(attempts);
for _ in 0..attempts {
let sem = Semaphore::new(0);
let cx = Cx::for_testing();
if closed {
sem.close();
} else {
cx.cancel_fast(CancelKind::User);
}
let error = match futures_lite::future::block_on(sem.acquire(&cx, 1)) {
Ok(_) => panic!("retry attempt should produce a canonical acquisition error"),
Err(error) => error,
};
sequence.push(canonical_acquire_error(error));
}
let violations = runtime.check_invariants();
assert!(
violations.is_empty(),
"lab runtime invariant violations during retry MR: {violations:?}"
);
sequence
}
#[test]
fn mr_retry_same_input_replays_same_canonical_error_sequence() {
proptest!(|(
seed in any::<u64>(),
attempts in 1usize..12,
closed in any::<bool>(),
)| {
let first = retry_error_sequence(seed, attempts, closed);
let second = retry_error_sequence(seed, attempts, closed);
prop_assert_eq!(&first, &second,
"retrying the same input under deterministic mode changed the canonical error sequence");
prop_assert_eq!(first.len(), attempts,
"retry sequence length should equal the requested attempt count");
prop_assert!(
first.iter().all(|kind| *kind == first[0]),
"same-input retries should not alternate canonical error kind: {first:?}"
);
});
}
#[test]
fn mr_cancel_monotonicity() {
futures_lite::future::block_on(async {
let sem = Arc::new(Semaphore::new(8));
let cx_no_cancel = Cx::for_testing();
let _permit1 = sem.acquire(&cx_no_cancel, 3).await.unwrap();
let available_no_cancel = sem.available_permits();
let sem_with_cancel = Arc::new(Semaphore::new(8));
let cx_with_cancel = Cx::for_testing();
let _permit2 = sem_with_cancel.acquire(&cx_with_cancel, 3).await.unwrap();
let cx_cancel = Cx::for_testing();
cx_cancel.set_cancel_requested(true);
let _ = sem_with_cancel.acquire(&cx_cancel, 2).await;
let available_with_cancel = sem_with_cancel.available_permits();
assert!(
available_with_cancel <= available_no_cancel + 2,
"Cancellation should not increase available permits beyond what would be released"
);
});
}
#[test]
fn mr_batch_vs_sequential_equivalence() {
futures_lite::future::block_on(async {
let cx = Cx::for_testing();
let sem_batch = Arc::new(Semaphore::new(10));
let _batch_permit = sem_batch.acquire(&cx, 4).await.unwrap();
let available_batch = sem_batch.available_permits();
let sem_sequential = Arc::new(Semaphore::new(10));
let _permit1 = sem_sequential.acquire(&cx, 1).await.unwrap();
let _permit2 = sem_sequential.acquire(&cx, 1).await.unwrap();
let _permit3 = sem_sequential.acquire(&cx, 1).await.unwrap();
let _permit4 = sem_sequential.acquire(&cx, 1).await.unwrap();
let available_sequential = sem_sequential.available_permits();
assert_eq!(
available_batch, available_sequential,
"Batch vs sequential acquire should yield equivalent end state"
);
});
}
proptest! {
#[test]
fn prop_permit_conservation(
initial_permits in 1usize..20,
operations in prop::collection::vec(
(1usize..5, prop::bool::ANY), 0..10
)
) {
futures_lite::future::block_on(async {
let conservation = PermitConservation::new(initial_permits);
conservation.verify_conservation();
let mut permits_held = Vec::new();
for (idx, (count, should_cancel)) in operations.into_iter().enumerate() {
if count > initial_permits { continue; } if !should_cancel && count > conservation.semaphore.available_permits() {
continue;
}
let cx = Cx::for_testing();
if should_cancel {
cx.set_cancel_requested(true);
}
match conservation.semaphore.acquire(&cx, count).await {
Ok(permit) => {
conservation.live_permits.fetch_add(count, Ordering::Release);
permits_held.push((permit, count));
},
Err(_) => {
}
}
conservation.verify_conservation();
if !permits_held.is_empty() && (idx + permits_held.len()) % 2 == 0 {
if let Some((permit, count)) = permits_held.pop() {
conservation.live_permits.fetch_sub(count, Ordering::Release);
drop(permit);
conservation.verify_conservation();
}
}
}
for (permit, count) in permits_held {
conservation.live_permits.fetch_sub(count, Ordering::Release);
drop(permit);
conservation.verify_conservation();
}
});
}
}
#[test]
fn mr_composite_properties() {
futures_lite::future::block_on(async {
let base_permits = 6;
let scale = 2;
let sem = Arc::new(Semaphore::new(base_permits * scale));
let cx = Cx::for_testing();
let permit1 = sem.acquire(&cx, 2 * scale).await.unwrap();
let after_acquire = sem.available_permits();
let cx_cancel = Cx::for_testing();
cx_cancel.set_cancel_requested(true);
let _ = sem.acquire(&cx_cancel, scale).await;
let after_cancel = sem.available_permits();
assert_eq!(
after_acquire, after_cancel,
"Cancelled operation should not change state"
);
drop(permit1);
let final_permits = sem.available_permits();
assert_eq!(
final_permits,
base_permits * scale,
"Final state should conserve all permits"
);
});
}