#![forbid(unsafe_code)]
#![allow(clippy::too_many_lines)]
mod common;
#[cfg(not(loom))]
mod std_thread_tests {
use super::common::{Recorder, TestRuntime, TestValue};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
fn wait_until(flag: &AtomicU64, target: u64, secs: u64, what: &str) {
let start = Instant::now();
while flag.load(Ordering::SeqCst) != target {
assert!(
start.elapsed() < Duration::from_secs(secs),
"timed out waiting for {what} to reach {target} (current = {})",
flag.load(Ordering::SeqCst),
);
thread::sleep(Duration::from_millis(1));
}
}
fn join_with_timeout(handle: thread::JoinHandle<()>, secs: u64, what: &str) {
let start = Instant::now();
loop {
if handle.is_finished() {
handle.join().expect("thread panicked");
return;
}
assert!(
start.elapsed() < Duration::from_secs(secs),
"{what} did not finish within {secs}s — likely deadlock"
);
thread::sleep(Duration::from_millis(5));
}
}
#[test]
fn three_disjoint_partitions_emit_returns_concurrently_while_one_blocked() {
let rt = TestRuntime::new();
let s1 = rt.state(None);
let s2 = rt.state(Some(TestValue::Int(0)));
let s3 = rt.state(Some(TestValue::Int(0)));
let p1 = rt.core.partition_of(s1.id).expect("registered");
let p2 = rt.core.partition_of(s2.id).expect("registered");
let p3 = rt.core.partition_of(s3.id).expect("registered");
assert_ne!(p1, p2);
assert_ne!(p1, p3);
assert_ne!(p2, p3);
let (tx, rx) = std::sync::mpsc::channel::<()>();
let rx = Arc::new(Mutex::new(Some(rx)));
let rx_for_fn = rx.clone();
let entered = Arc::new(AtomicU64::new(0));
let entered_for_fn = entered.clone();
let d1 = rt.derived(&[s1.id], move |deps| {
entered_for_fn.fetch_add(1, Ordering::SeqCst);
let recv = rx_for_fn.lock().unwrap().take();
if let Some(rx) = recv {
let _ = rx.recv();
}
Some(deps[0].clone())
});
let _r1: Recorder = rt.subscribe_recorder(d1);
let core_1 = rt.core.clone();
let s1_id = s1.id;
let binding_1 = rt.binding.clone();
let thread_1 = thread::spawn(move || {
let h = binding_1.intern(TestValue::Int(1));
core_1.emit(s1_id, h);
});
wait_until(&entered, 1, 5, "Thread 1 fn entry");
let t2_done = Arc::new(AtomicU64::new(0));
let t3_done = Arc::new(AtomicU64::new(0));
let done_2 = t2_done.clone();
let done_3 = t3_done.clone();
let core_2 = rt.core.clone();
let core_3 = rt.core.clone();
let s2_id = s2.id;
let s3_id = s3.id;
let binding_2 = rt.binding.clone();
let binding_3 = rt.binding.clone();
let thread_2 = thread::spawn(move || {
let h = binding_2.intern(TestValue::Int(20));
core_2.emit(s2_id, h);
done_2.store(1, Ordering::SeqCst);
});
let thread_3 = thread::spawn(move || {
let h = binding_3.intern(TestValue::Int(30));
core_3.emit(s3_id, h);
done_3.store(1, Ordering::SeqCst);
});
wait_until(&t2_done, 1, 10, "Thread 2 emit");
wait_until(&t3_done, 1, 10, "Thread 3 emit");
tx.send(()).expect("release Thread 1");
join_with_timeout(thread_1, 5, "thread 1");
join_with_timeout(thread_2, 5, "thread 2");
join_with_timeout(thread_3, 5, "thread 3");
}
#[test]
fn cross_partition_cascade_does_not_block_disjoint_third_partition() {
let rt = TestRuntime::new();
let s_a = rt.state(None);
let s_b = rt.state(Some(TestValue::Int(0)));
let s_c = rt.state(Some(TestValue::Int(0)));
let p_a = rt.core.partition_of(s_a.id).expect("registered");
let p_b = rt.core.partition_of(s_b.id).expect("registered");
let p_c = rt.core.partition_of(s_c.id).expect("registered");
assert_ne!(p_a, p_b);
assert_ne!(p_a, p_c);
assert_ne!(p_b, p_c);
rt.core.add_meta_companion(s_a.id, s_b.id);
assert_eq!(rt.core.partition_of(s_a.id), Some(p_a));
assert_eq!(rt.core.partition_of(s_b.id), Some(p_b));
let (tx, rx) = std::sync::mpsc::channel::<()>();
let rx = Arc::new(Mutex::new(Some(rx)));
let rx_for_fn = rx.clone();
let entered = Arc::new(AtomicU64::new(0));
let entered_for_fn = entered.clone();
let d_a = rt.derived(&[s_a.id], move |deps| {
entered_for_fn.fetch_add(1, Ordering::SeqCst);
let recv = rx_for_fn.lock().unwrap().take();
if let Some(rx) = recv {
let _ = rx.recv();
}
Some(deps[0].clone())
});
let _rec_a: Recorder = rt.subscribe_recorder(d_a);
let core_a = rt.core.clone();
let s_a_id = s_a.id;
let binding_a = rt.binding.clone();
let thread_1 = thread::spawn(move || {
let h = binding_a.intern(TestValue::Int(1));
core_a.emit(s_a_id, h);
});
wait_until(&entered, 1, 5, "Thread 1 fn entry");
let thread_2_done = Arc::new(AtomicU64::new(0));
let done_for_t2 = thread_2_done.clone();
let core_c = rt.core.clone();
let s_c_id = s_c.id;
let binding_c = rt.binding.clone();
let thread_2 = thread::spawn(move || {
let h = binding_c.intern(TestValue::Int(99));
core_c.emit(s_c_id, h);
done_for_t2.store(1, Ordering::SeqCst);
});
let t3_entered = Arc::new(AtomicU64::new(0));
let t3_exited = Arc::new(AtomicU64::new(0));
let entered_for_t3 = t3_entered.clone();
let exited_for_t3 = t3_exited.clone();
let core_b = rt.core.clone();
let s_b_id = s_b.id;
let binding_b = rt.binding.clone();
let thread_3 = thread::spawn(move || {
let h = binding_b.intern(TestValue::Int(77));
entered_for_t3.store(1, Ordering::SeqCst);
core_b.emit(s_b_id, h);
exited_for_t3.store(1, Ordering::SeqCst);
});
wait_until(&thread_2_done, 1, 10, "Thread 2 emit");
wait_until(&t3_entered, 1, 5, "Thread 3 emit entry");
thread::sleep(Duration::from_millis(100));
assert_eq!(
t3_exited.load(Ordering::SeqCst),
0,
"Thread 3's emit on s_b must block on partition(s_b)'s wave_owner \
held by Thread 1's meta-companion-driven cascade. The exit flag \
being set early would mean `compute_touched_partitions` did NOT \
walk the meta-edge — meta-walk regression."
);
tx.send(()).expect("release Thread 1");
join_with_timeout(thread_1, 5, "Thread 1");
join_with_timeout(thread_2, 5, "Thread 2");
join_with_timeout(thread_3, 5, "Thread 3");
}
#[test]
fn reciprocal_cross_partition_cascades_complete_without_deadlock() {
let rt = TestRuntime::new();
let a = rt.state(None);
let b = rt.state(None);
let a2 = rt.state(None);
let b2 = rt.state(None);
let _join_x = rt.derived(&[a.id, a2.id], |_deps| Some(TestValue::Int(0)));
let _join_y = rt.derived(&[b.id, b2.id], |_deps| Some(TestValue::Int(0)));
let p_a = rt.core.partition_of(a.id).expect("registered");
let p_b = rt.core.partition_of(b.id).expect("registered");
assert_ne!(p_a, p_b);
assert_eq!(rt.core.partition_of(a2.id), Some(p_a));
assert_eq!(rt.core.partition_of(b2.id), Some(p_b));
rt.core.add_meta_companion(a.id, b.id); rt.core.add_meta_companion(b2.id, a2.id); assert_ne!(rt.core.partition_of(a.id), rt.core.partition_of(b.id));
let d_a = rt.derived(&[a.id], |deps| Some(deps[0].clone()));
let d_b2 = rt.derived(&[b2.id], |deps| Some(deps[0].clone()));
let _rec_a: Recorder = rt.subscribe_recorder(d_a);
let _rec_b2: Recorder = rt.subscribe_recorder(d_b2);
const N: usize = 200;
let t1_progress = Arc::new(AtomicU64::new(0));
let t2_progress = Arc::new(AtomicU64::new(0));
let progress_1 = t1_progress.clone();
let progress_2 = t2_progress.clone();
let core1 = rt.core.clone();
let a_id = a.id;
let binding1 = rt.binding.clone();
let thread_1 = thread::spawn(move || {
for i in 0..N {
let h = binding1.intern(TestValue::Int(i as i64));
core1.emit(a_id, h);
progress_1.fetch_add(1, Ordering::SeqCst);
}
});
let core2 = rt.core.clone();
let b2_id = b2.id;
let binding2 = rt.binding.clone();
let thread_2 = thread::spawn(move || {
for i in 0..N {
let h = binding2.intern(TestValue::Int((1_000 + i) as i64));
core2.emit(b2_id, h);
progress_2.fetch_add(1, Ordering::SeqCst);
}
});
join_with_timeout(thread_1, 30, "Thread 1 reciprocal cascade");
join_with_timeout(thread_2, 30, "Thread 2 reciprocal cascade");
assert_eq!(
t1_progress.load(Ordering::SeqCst),
N as u64,
"Thread 1 did not complete all {N} emits — partial deadlock \
or stall (counter shows iter count reached, not full N)"
);
assert_eq!(
t2_progress.load(Ordering::SeqCst),
N as u64,
"Thread 2 did not complete all {N} emits — partial deadlock \
or stall"
);
let cache_a = rt.core.cache_of(a.id);
let cache_b2 = rt.core.cache_of(b2.id);
assert_ne!(cache_a, graphrefly_core::HandleId::new(0));
assert_ne!(cache_b2, graphrefly_core::HandleId::new(0));
}
#[test]
fn user_fn_cross_partition_emit_during_fire_panics_with_h_plus_diagnostic() {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let rt = TestRuntime::new();
let s_in = rt.state(Some(TestValue::Int(0)));
let s_side = rt.state(Some(TestValue::Int(0)));
let core = rt.core.clone();
let s_side_id = s_side.id;
let binding = rt.binding.clone();
let d = rt.derived(&[s_in.id], move |deps| {
if let TestValue::Int(n) = deps[0] {
let h = binding.intern(TestValue::Int(n * 10));
core.emit(s_side_id, h);
}
Some(deps[0].clone())
});
let p_d = rt.core.partition_of(d).expect("d registered");
let p_side = rt.core.partition_of(s_side.id).expect("s_side registered");
assert!(
p_d.raw() > p_side.raw(),
"fixture invariant violated: this test requires partition(d) \
> partition(s_side) so the inner emit is descending order. \
Got partition(d)={p_d:?}, partition(s_side)={p_side:?}. \
If union_nodes' tiebreak was changed (e.g., to pick \
smaller-id-as-root on equal rank), reconstruct the topology \
to force descending order — DO NOT assume the test still \
covers the H+ panic path."
);
let _rec_d = rt.subscribe_recorder(d);
}));
let payload = result.expect_err(
"expected the H+ ascending-order check to panic when a user fn \
does a cross-partition emit DESCENDING during fire — instead the \
closure completed without panic. H+ enforcement regression?",
);
let msg: &str = payload
.downcast_ref::<&'static str>()
.copied()
.or_else(|| payload.downcast_ref::<String>().map(String::as_str))
.unwrap_or("(non-string panic payload)");
assert!(
msg.contains("Phase H+ ascending-order violation"),
"expected H+ ascending-order panic; got: {msg}"
);
let held = graphrefly_core::held_snapshot_for_tests();
assert!(
held.is_empty(),
"Phase H+ thread-local `HELD` is dirty post-panic: {held:?}. \
The scope-guard in `partition_wave_owner_lock_arc` (/qa A1) \
must release the refcount on every exit path including \
panic unwinds; a non-empty held set here means a leak."
);
let in_pb = graphrefly_core::in_producer_build_for_tests();
assert_eq!(
in_pb, 0,
"Phase H+ thread-local `IN_PRODUCER_BUILD` is dirty \
post-panic: refcount={in_pb}. `FiringGuard::Drop` must \
pair every `producer_build_enter()` with a `producer_build_exit()`."
);
}
}
#[cfg(loom)]
mod loom_tests {
use loom::sync::atomic::{AtomicUsize, Ordering};
use loom::sync::{Arc, Mutex};
struct PartitionLock {
id: usize,
guard: Mutex<()>,
}
fn acquire_two_in_ascending_order<'a>(
a: &'a Arc<PartitionLock>,
b: &'a Arc<PartitionLock>,
) -> (
loom::sync::MutexGuard<'a, ()>,
loom::sync::MutexGuard<'a, ()>,
) {
let (low, high) = if a.id <= b.id { (a, b) } else { (b, a) };
let g_low = low.guard.lock().unwrap();
let g_high = high.guard.lock().unwrap();
(g_low, g_high)
}
#[test]
fn cross_partition_reciprocal_arg_order_acquire_no_deadlock() {
loom::model(|| {
let part_x = Arc::new(PartitionLock {
id: 1,
guard: Mutex::new(()),
});
let part_y = Arc::new(PartitionLock {
id: 2,
guard: Mutex::new(()),
});
let acquired_x = Arc::new(AtomicUsize::new(0));
let acquired_y = Arc::new(AtomicUsize::new(0));
let x_for_a = part_x.clone();
let y_for_a = part_y.clone();
let acq_x_a = acquired_x.clone();
let acq_y_a = acquired_y.clone();
let h1 = loom::thread::spawn(move || {
let (gx, gy) = acquire_two_in_ascending_order(&x_for_a, &y_for_a);
acq_x_a.fetch_add(1, Ordering::SeqCst);
acq_y_a.fetch_add(1, Ordering::SeqCst);
drop(gy);
drop(gx);
});
let x_for_b = part_x.clone();
let y_for_b = part_y.clone();
let acq_x_b = acquired_x.clone();
let acq_y_b = acquired_y.clone();
let h2 = loom::thread::spawn(move || {
let (gx, gy) = acquire_two_in_ascending_order(&y_for_b, &x_for_b);
acq_x_b.fetch_add(1, Ordering::SeqCst);
acq_y_b.fetch_add(1, Ordering::SeqCst);
drop(gy);
drop(gx);
});
h1.join().unwrap();
h2.join().unwrap();
let x_count = acquired_x.load(Ordering::SeqCst);
let y_count = acquired_y.load(Ordering::SeqCst);
assert_eq!(x_count, 2, "partition X acquired by both threads");
assert_eq!(y_count, 2, "partition Y acquired by both threads");
});
}
#[test]
fn reciprocal_cross_partition_acquisitions_no_deadlock() {
loom::model(|| {
let part_x = Arc::new(PartitionLock {
id: 7,
guard: Mutex::new(()),
});
let part_y = Arc::new(PartitionLock {
id: 13,
guard: Mutex::new(()),
});
let x_a = part_x.clone();
let y_a = part_y.clone();
let h1 = loom::thread::spawn(move || {
let (gl, gh) = acquire_two_in_ascending_order(&x_a, &y_a);
drop(gh);
drop(gl);
});
let x_b = part_x.clone();
let y_b = part_y.clone();
let h2 = loom::thread::spawn(move || {
let (gl, gh) = acquire_two_in_ascending_order(&y_b, &x_b);
drop(gh);
drop(gl);
});
h1.join().unwrap();
h2.join().unwrap();
});
}
}