mod tests_scondvar {
use crate::core::scondvar::SCondVar;
use crate::core::smutex::SMutex;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::thread;
use std::time::{Duration, Instant};
const WAIT_TIMEOUT: Duration = Duration::from_secs(2);
fn wait_for_waiters(condvar: &SCondVar, count: usize) {
assert!(
condvar.wait_for_waiters(count, WAIT_TIMEOUT),
"expected at least {} waiter(s), found {}",
count,
condvar.waiters.load(Ordering::Acquire)
);
}
struct TestCtx {
mutex: Arc<SMutex>,
condvar: Arc<SCondVar>,
}
impl TestCtx {
fn new() -> Self {
Self {
mutex: Arc::new(SMutex::new()),
condvar: Arc::new(SCondVar::new()),
}
}
fn clone_refs(&self) -> (Arc<SMutex>, Arc<SCondVar>) {
(Arc::clone(&self.mutex), Arc::clone(&self.condvar))
}
}
#[test]
fn test_notify_one_single_waiter() {
let ctx = TestCtx::new();
let woke_up = Arc::new(AtomicBool::new(false));
let (m, cv) = ctx.clone_refs();
let woke = Arc::clone(&woke_up);
let handle = thread::spawn(move || {
let guard = m.lock();
let _guard = cv.wait(guard);
woke.store(true, Ordering::Release);
42
});
wait_for_waiters(&ctx.condvar, 1);
assert!(!woke_up.load(Ordering::Acquire), "Should still be waiting");
ctx.condvar.notify_one();
assert_eq!(handle.join().unwrap(), 42);
assert!(woke_up.load(Ordering::Acquire));
}
#[test]
fn test_notify_one_wakes_exactly_one() {
let ctx = TestCtx::new();
let wake_count = Arc::new(AtomicUsize::new(0));
let waiting_count = Arc::new(AtomicUsize::new(0));
const N: usize = 3;
let handles: Vec<_> = (0..N)
.map(|_| {
let (m, cv) = ctx.clone_refs();
let cnt = Arc::clone(&wake_count);
let waiting = Arc::clone(&waiting_count);
thread::spawn(move || {
let guard = m.lock();
waiting.fetch_add(1, Ordering::AcqRel);
let _guard = cv.wait(guard);
cnt.fetch_add(1, Ordering::AcqRel);
})
})
.collect();
assert!(
SCondVar::wait_until(WAIT_TIMEOUT, || waiting_count.load(Ordering::Acquire) == N),
"not all waiters reached the wait path"
);
wait_for_waiters(&ctx.condvar, N);
assert_eq!(wake_count.load(Ordering::Acquire), 0);
ctx.condvar.notify_one();
assert!(
SCondVar::wait_until(WAIT_TIMEOUT, || wake_count.load(Ordering::Acquire) >= 1),
"notify_one did not wake a waiter"
);
for _ in 0..100 {
thread::yield_now();
}
assert_eq!(
wake_count.load(Ordering::Acquire),
1,
"notify_one should wake exactly one"
);
ctx.condvar.notify_all();
for h in handles {
h.join().unwrap();
}
assert_eq!(wake_count.load(Ordering::Acquire), N);
}
#[test]
fn test_notify_all_wakes_all() {
let ctx = TestCtx::new();
let wake_count = Arc::new(AtomicUsize::new(0));
const N: usize = 5;
let handles: Vec<_> = (0..N)
.map(|_| {
let (m, cv) = ctx.clone_refs();
let cnt = Arc::clone(&wake_count);
thread::spawn(move || {
let guard = m.lock();
let _guard = cv.wait(guard);
cnt.fetch_add(1, Ordering::AcqRel);
})
})
.collect();
wait_for_waiters(&ctx.condvar, N);
ctx.condvar.notify_all();
for h in handles {
h.join().unwrap();
}
assert_eq!(wake_count.load(Ordering::Acquire), N);
}
#[test]
fn test_notify_without_waiters_is_safe() {
let cv = SCondVar::new();
cv.notify_one();
cv.notify_all();
cv.notify_one();
}
#[test]
fn test_wait_blocks_until_notified() {
let ctx = TestCtx::new();
ctx.condvar.notify_one();
let (m, cv) = ctx.clone_refs();
let cv2 = Arc::clone(&ctx.condvar);
let handle = thread::spawn(move || {
let guard = m.lock();
let start = Instant::now();
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
cv2.notify_one();
});
let _guard = cv.wait(guard);
start.elapsed()
});
let elapsed = handle.join().unwrap();
assert!(elapsed >= Duration::from_millis(40), "Should have blocked");
}
#[test]
fn test_guard_held_after_wait() {
let ctx = TestCtx::new();
let (m, cv) = ctx.clone_refs();
let handle = thread::spawn(move || {
let guard = m.lock();
let _guard = cv.wait(guard);
true
});
wait_for_waiters(&ctx.condvar, 1);
ctx.condvar.notify_one();
assert!(handle.join().unwrap());
}
#[test]
fn test_repeated_wait_notify_cycles() {
let ctx = TestCtx::new();
let woke_count = Arc::new(AtomicUsize::new(0));
let ready_to_wait = Arc::new(AtomicUsize::new(0));
const CYCLES: usize = 5;
let (m, cv) = ctx.clone_refs();
let woke = Arc::clone(&woke_count);
let ready = Arc::clone(&ready_to_wait);
let handle = thread::spawn(move || {
let mut guard = m.lock();
for _ in 0..CYCLES {
ready.fetch_add(1, Ordering::AcqRel);
guard = cv.wait(guard);
woke.fetch_add(1, Ordering::AcqRel);
}
});
for i in 0..CYCLES {
assert!(
SCondVar::wait_until(WAIT_TIMEOUT, || ready_to_wait.load(Ordering::Acquire) > i),
"waiter never reached cycle {}",
i
);
wait_for_waiters(&ctx.condvar, 1);
ctx.condvar.notify_one();
assert!(
SCondVar::wait_until(WAIT_TIMEOUT, || woke_count.load(Ordering::Acquire) > i),
"waiter never woke in cycle {}",
i
);
}
handle.join().unwrap();
assert_eq!(woke_count.load(Ordering::Acquire), CYCLES);
}
#[test]
fn test_condvar_reuse_across_sessions() {
let ctx = TestCtx::new();
for round in 0..3u32 {
let (m, cv) = ctx.clone_refs();
let handle = thread::spawn(move || {
let guard = m.lock();
let _guard = cv.wait(guard);
round
});
wait_for_waiters(&ctx.condvar, 1);
ctx.condvar.notify_one();
assert_eq!(handle.join().unwrap(), round);
}
}
#[test]
fn test_wait_with_group_lock() {
let ctx = TestCtx::new();
let wake_count = Arc::new(AtomicUsize::new(0));
const N: usize = 3;
let handles: Vec<_> = (0..N)
.map(|_| {
let (m, cv) = ctx.clone_refs();
let cnt = Arc::clone(&wake_count);
thread::spawn(move || {
let guard = m.lock_group();
let _guard = cv.wait(guard);
cnt.fetch_add(1, Ordering::AcqRel);
})
})
.collect();
wait_for_waiters(&ctx.condvar, N);
ctx.condvar.notify_all();
for h in handles {
h.join().unwrap();
}
assert_eq!(wake_count.load(Ordering::Acquire), N);
}
#[test]
fn test_stress_many_waiters() {
let ctx = TestCtx::new();
let wake_count = Arc::new(AtomicUsize::new(0));
const N: usize = 20;
let handles: Vec<_> = (0..N)
.map(|_| {
let (m, cv) = ctx.clone_refs();
let cnt = Arc::clone(&wake_count);
thread::spawn(move || {
let guard = m.lock();
let _guard = cv.wait(guard);
cnt.fetch_add(1, Ordering::AcqRel);
})
})
.collect();
wait_for_waiters(&ctx.condvar, N);
for i in 1..=N {
ctx.condvar.notify_one();
assert!(
SCondVar::wait_until(WAIT_TIMEOUT, || wake_count.load(Ordering::Acquire) >= i),
"waiter {} never woke",
i
);
}
for h in handles {
h.join().unwrap();
}
assert_eq!(wake_count.load(Ordering::Acquire), N);
}
#[test]
fn test_stress_rapid_notify_all() {
let ctx = TestCtx::new();
let total_wakes = Arc::new(AtomicUsize::new(0));
const ROUNDS: usize = 5;
const THREADS_PER_ROUND: usize = 3;
for round in 0..ROUNDS {
let round_wakes = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = (0..THREADS_PER_ROUND)
.map(|_| {
let (m, cv) = ctx.clone_refs();
let cnt = Arc::clone(&total_wakes);
let rw = Arc::clone(&round_wakes);
thread::spawn(move || {
let guard = m.lock();
let _guard = cv.wait(guard);
cnt.fetch_add(1, Ordering::AcqRel);
rw.fetch_add(1, Ordering::AcqRel);
})
})
.collect();
wait_for_waiters(&ctx.condvar, THREADS_PER_ROUND);
ctx.condvar.notify_all();
for h in handles {
h.join().unwrap();
}
assert_eq!(
round_wakes.load(Ordering::Acquire),
THREADS_PER_ROUND,
"Round {} failed",
round
);
}
assert_eq!(
total_wakes.load(Ordering::Acquire),
ROUNDS * THREADS_PER_ROUND
);
}
#[test]
fn test_concurrent_wait_and_notify() {
let ctx = TestCtx::new();
let completed = Arc::new(AtomicUsize::new(0));
const N: usize = 10;
let waiter_handles: Vec<_> = (0..N)
.map(|_| {
let (m, cv) = ctx.clone_refs();
let done = Arc::clone(&completed);
thread::spawn(move || {
let guard = m.lock();
let _guard = cv.wait(guard);
done.fetch_add(1, Ordering::AcqRel);
})
})
.collect();
wait_for_waiters(&ctx.condvar, N);
for _ in 0..N {
ctx.condvar.notify_one();
thread::sleep(Duration::from_millis(5));
}
for h in waiter_handles {
h.join().unwrap();
}
assert_eq!(completed.load(Ordering::Acquire), N);
}
#[test]
fn test_all_waiters_eventually_wake() {
let ctx = TestCtx::new();
let wake_set = Arc::new(std::sync::Mutex::new(std::collections::HashSet::new()));
const N: usize = 5;
let handles: Vec<_> = (0..N)
.map(|id| {
let (m, cv) = ctx.clone_refs();
let set = Arc::clone(&wake_set);
thread::spawn(move || {
let guard = m.lock();
let _guard = cv.wait(guard);
set.lock().unwrap().insert(id);
})
})
.collect();
wait_for_waiters(&ctx.condvar, N);
for _ in 0..N {
ctx.condvar.notify_one();
thread::sleep(Duration::from_millis(20));
}
for h in handles {
h.join().unwrap();
}
let set = wake_set.lock().unwrap();
assert_eq!(set.len(), N, "All threads should have woken");
}
#[test]
fn test_notify_adds_wake_tokens() {
let cv = SCondVar::new();
cv.notify_one();
assert_eq!(cv.to_wake.load(Ordering::Relaxed), 0);
cv.waiters.store(5, Ordering::SeqCst);
cv.notify_one();
assert_eq!(cv.to_wake.load(Ordering::Relaxed), 1);
cv.notify_all();
assert_eq!(cv.to_wake.load(Ordering::Relaxed), 1 + 5);
}
#[test]
fn test_many_notifications_still_works() {
let ctx = TestCtx::new();
for _ in 0..100 {
ctx.condvar.notify_one();
}
let (m, cv) = ctx.clone_refs();
let cv2 = Arc::clone(&ctx.condvar);
let handle = thread::spawn(move || {
let guard = m.lock();
let _guard = cv.wait(guard);
true
});
wait_for_waiters(&ctx.condvar, 1);
cv2.notify_one();
assert!(handle.join().unwrap());
}
#[test]
fn test_mutex_unlocked_while_waiting() {
let ctx = TestCtx::new();
let waiting = Arc::new(AtomicBool::new(false));
let acquired = Arc::new(AtomicBool::new(false));
let (m, cv) = ctx.clone_refs();
let w = Arc::clone(&waiting);
let waiter = thread::spawn(move || {
let guard = m.lock();
w.store(true, Ordering::Release);
let _guard = cv.wait(guard);
});
assert!(
SCondVar::wait_until(WAIT_TIMEOUT, || waiting.load(Ordering::Acquire)),
"waiter never reached the wait path"
);
wait_for_waiters(&ctx.condvar, 1);
let m2 = Arc::clone(&ctx.mutex);
let acq = Arc::clone(&acquired);
let checker = thread::spawn(move || {
let _guard = m2.lock();
acq.store(true, Ordering::Release);
});
checker.join().unwrap();
assert!(acquired.load(Ordering::Acquire));
ctx.condvar.notify_one();
waiter.join().unwrap();
}
#[test]
fn test_notify_all_idempotent() {
let ctx = TestCtx::new();
let wake_count = Arc::new(AtomicUsize::new(0));
const N: usize = 3;
let handles: Vec<_> = (0..N)
.map(|_| {
let (m, cv) = ctx.clone_refs();
let cnt = Arc::clone(&wake_count);
thread::spawn(move || {
let guard = m.lock();
let _guard = cv.wait(guard);
cnt.fetch_add(1, Ordering::AcqRel);
})
})
.collect();
wait_for_waiters(&ctx.condvar, N);
ctx.condvar.notify_all();
ctx.condvar.notify_all();
for h in handles {
h.join().unwrap();
}
assert_eq!(wake_count.load(Ordering::Acquire), N);
}
}