#![forbid(unsafe_code)]
#![allow(clippy::too_many_lines)]
mod common;
#[cfg(not(loom))]
mod std_thread_tests {
use super::common::{Recorder, TestRuntime, TestValue};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
fn wait_until(flag: &AtomicU64, target: u64, secs: u64, what: &str) {
let start = Instant::now();
while flag.load(Ordering::SeqCst) != target {
assert!(
start.elapsed() < Duration::from_secs(secs),
"timed out waiting for {what} to reach {target} (current = {})",
flag.load(Ordering::SeqCst),
);
thread::sleep(Duration::from_millis(1));
}
}
fn join_with_timeout(handle: thread::JoinHandle<()>, secs: u64, what: &str) {
let start = Instant::now();
loop {
if handle.is_finished() {
handle.join().expect("thread panicked");
return;
}
assert!(
start.elapsed() < Duration::from_secs(secs),
"{what} did not finish within {secs}s — likely deadlock"
);
thread::sleep(Duration::from_millis(5));
}
}
#[test]
fn three_disjoint_partitions_emit_returns_concurrently_while_one_blocked() {
let rt = TestRuntime::new();
let s1 = rt.state(None);
let s2 = rt.state(Some(TestValue::Int(0)));
let s3 = rt.state(Some(TestValue::Int(0)));
let p1 = rt.core.partition_of(s1.id).expect("registered");
let p2 = rt.core.partition_of(s2.id).expect("registered");
let p3 = rt.core.partition_of(s3.id).expect("registered");
assert_ne!(p1, p2);
assert_ne!(p1, p3);
assert_ne!(p2, p3);
let (tx, rx) = std::sync::mpsc::channel::<()>();
let rx = Arc::new(Mutex::new(Some(rx)));
let rx_for_fn = rx.clone();
let entered = Arc::new(AtomicU64::new(0));
let entered_for_fn = entered.clone();
let d1 = rt.derived(&[s1.id], move |deps| {
entered_for_fn.fetch_add(1, Ordering::SeqCst);
let recv = rx_for_fn.lock().unwrap().take();
if let Some(rx) = recv {
let _ = rx.recv();
}
Some(deps[0].clone())
});
let _r1: Recorder = rt.subscribe_recorder(d1);
let core_1 = rt.core.clone();
let s1_id = s1.id;
let binding_1 = rt.binding.clone();
let thread_1 = thread::spawn(move || {
let h = binding_1.intern(TestValue::Int(1));
core_1.emit(s1_id, h);
});
wait_until(&entered, 1, 5, "Thread 1 fn entry");
let t2_done = Arc::new(AtomicU64::new(0));
let t3_done = Arc::new(AtomicU64::new(0));
let done_2 = t2_done.clone();
let done_3 = t3_done.clone();
let core_2 = rt.core.clone();
let core_3 = rt.core.clone();
let s2_id = s2.id;
let s3_id = s3.id;
let binding_2 = rt.binding.clone();
let binding_3 = rt.binding.clone();
let thread_2 = thread::spawn(move || {
let h = binding_2.intern(TestValue::Int(20));
core_2.emit(s2_id, h);
done_2.store(1, Ordering::SeqCst);
});
let thread_3 = thread::spawn(move || {
let h = binding_3.intern(TestValue::Int(30));
core_3.emit(s3_id, h);
done_3.store(1, Ordering::SeqCst);
});
wait_until(&t2_done, 1, 10, "Thread 2 emit");
wait_until(&t3_done, 1, 10, "Thread 3 emit");
tx.send(()).expect("release Thread 1");
join_with_timeout(thread_1, 5, "thread 1");
join_with_timeout(thread_2, 5, "thread 2");
join_with_timeout(thread_3, 5, "thread 3");
}
#[test]
fn cross_partition_cascade_does_not_block_disjoint_third_partition() {
let rt = TestRuntime::new();
let s_a = rt.state(None);
let s_b = rt.state(Some(TestValue::Int(0)));
let s_c = rt.state(Some(TestValue::Int(0)));
let p_a = rt.core.partition_of(s_a.id).expect("registered");
let p_b = rt.core.partition_of(s_b.id).expect("registered");
let p_c = rt.core.partition_of(s_c.id).expect("registered");
assert_ne!(p_a, p_b);
assert_ne!(p_a, p_c);
assert_ne!(p_b, p_c);
rt.core.add_meta_companion(s_a.id, s_b.id);
assert_eq!(rt.core.partition_of(s_a.id), Some(p_a));
assert_eq!(rt.core.partition_of(s_b.id), Some(p_b));
let (tx, rx) = std::sync::mpsc::channel::<()>();
let rx = Arc::new(Mutex::new(Some(rx)));
let rx_for_fn = rx.clone();
let entered = Arc::new(AtomicU64::new(0));
let entered_for_fn = entered.clone();
let d_a = rt.derived(&[s_a.id], move |deps| {
entered_for_fn.fetch_add(1, Ordering::SeqCst);
let recv = rx_for_fn.lock().unwrap().take();
if let Some(rx) = recv {
let _ = rx.recv();
}
Some(deps[0].clone())
});
let _rec_a: Recorder = rt.subscribe_recorder(d_a);
let core_a = rt.core.clone();
let s_a_id = s_a.id;
let binding_a = rt.binding.clone();
let thread_1 = thread::spawn(move || {
let h = binding_a.intern(TestValue::Int(1));
core_a.emit(s_a_id, h);
});
wait_until(&entered, 1, 5, "Thread 1 fn entry");
let thread_2_done = Arc::new(AtomicU64::new(0));
let done_for_t2 = thread_2_done.clone();
let core_c = rt.core.clone();
let s_c_id = s_c.id;
let binding_c = rt.binding.clone();
let thread_2 = thread::spawn(move || {
let h = binding_c.intern(TestValue::Int(99));
core_c.emit(s_c_id, h);
done_for_t2.store(1, Ordering::SeqCst);
});
let t3_entered = Arc::new(AtomicU64::new(0));
let t3_exited = Arc::new(AtomicU64::new(0));
let entered_for_t3 = t3_entered.clone();
let exited_for_t3 = t3_exited.clone();
let core_b = rt.core.clone();
let s_b_id = s_b.id;
let binding_b = rt.binding.clone();
let thread_3 = thread::spawn(move || {
let h = binding_b.intern(TestValue::Int(77));
entered_for_t3.store(1, Ordering::SeqCst);
core_b.emit(s_b_id, h);
exited_for_t3.store(1, Ordering::SeqCst);
});
wait_until(&thread_2_done, 1, 10, "Thread 2 emit");
wait_until(&t3_entered, 1, 5, "Thread 3 emit entry");
thread::sleep(Duration::from_millis(100));
assert_eq!(
t3_exited.load(Ordering::SeqCst),
0,
"Thread 3's emit on s_b must block on partition(s_b)'s wave_owner \
held by Thread 1's meta-companion-driven cascade. The exit flag \
being set early would mean `compute_touched_partitions` did NOT \
walk the meta-edge — meta-walk regression."
);
tx.send(()).expect("release Thread 1");
join_with_timeout(thread_1, 5, "Thread 1");
join_with_timeout(thread_2, 5, "Thread 2");
join_with_timeout(thread_3, 5, "Thread 3");
}
#[test]
fn reciprocal_cross_partition_cascades_complete_without_deadlock() {
let rt = TestRuntime::new();
let a = rt.state(None);
let b = rt.state(None);
let a2 = rt.state(None);
let b2 = rt.state(None);
let _join_x = rt.derived(&[a.id, a2.id], |_deps| Some(TestValue::Int(0)));
let _join_y = rt.derived(&[b.id, b2.id], |_deps| Some(TestValue::Int(0)));
let p_a = rt.core.partition_of(a.id).expect("registered");
let p_b = rt.core.partition_of(b.id).expect("registered");
assert_ne!(p_a, p_b);
assert_eq!(rt.core.partition_of(a2.id), Some(p_a));
assert_eq!(rt.core.partition_of(b2.id), Some(p_b));
rt.core.add_meta_companion(a.id, b.id); rt.core.add_meta_companion(b2.id, a2.id); assert_ne!(rt.core.partition_of(a.id), rt.core.partition_of(b.id));
let d_a = rt.derived(&[a.id], |deps| Some(deps[0].clone()));
let d_b2 = rt.derived(&[b2.id], |deps| Some(deps[0].clone()));
let _rec_a: Recorder = rt.subscribe_recorder(d_a);
let _rec_b2: Recorder = rt.subscribe_recorder(d_b2);
const N: usize = 200;
let t1_progress = Arc::new(AtomicU64::new(0));
let t2_progress = Arc::new(AtomicU64::new(0));
let progress_1 = t1_progress.clone();
let progress_2 = t2_progress.clone();
let core1 = rt.core.clone();
let a_id = a.id;
let binding1 = rt.binding.clone();
let thread_1 = thread::spawn(move || {
for i in 0..N {
let h = binding1.intern(TestValue::Int(i as i64));
core1.emit(a_id, h);
progress_1.fetch_add(1, Ordering::SeqCst);
}
});
let core2 = rt.core.clone();
let b2_id = b2.id;
let binding2 = rt.binding.clone();
let thread_2 = thread::spawn(move || {
for i in 0..N {
let h = binding2.intern(TestValue::Int((1_000 + i) as i64));
core2.emit(b2_id, h);
progress_2.fetch_add(1, Ordering::SeqCst);
}
});
join_with_timeout(thread_1, 30, "Thread 1 reciprocal cascade");
join_with_timeout(thread_2, 30, "Thread 2 reciprocal cascade");
assert_eq!(
t1_progress.load(Ordering::SeqCst),
N as u64,
"Thread 1 did not complete all {N} emits — partial deadlock \
or stall (counter shows iter count reached, not full N)"
);
assert_eq!(
t2_progress.load(Ordering::SeqCst),
N as u64,
"Thread 2 did not complete all {N} emits — partial deadlock \
or stall"
);
let cache_a = rt.core.cache_of(a.id);
let cache_b2 = rt.core.cache_of(b2.id);
assert_ne!(cache_a, graphrefly_core::HandleId::new(0));
assert_ne!(cache_b2, graphrefly_core::HandleId::new(0));
}
#[test]
fn producer_cross_partition_emit_defers_and_succeeds() {
let rt = TestRuntime::new();
let s_b = rt.state(Some(TestValue::Int(0)));
let s_a = rt.state(None);
let p_b = rt.core.partition_of(s_b.id).expect("registered");
let p_a = rt.core.partition_of(s_a.id).expect("registered");
assert!(
p_a.raw() > p_b.raw(),
"fixture invariant: partition(s_a) must be > partition(s_b) \
for the cross-partition emit to trigger descending-order deferral. \
Got p_a={p_a:?}, p_b={p_b:?}"
);
let core_for_fn = rt.core.clone();
let binding_for_fn = rt.binding.clone();
let s_b_id = s_b.id;
let d_producer = rt.derived(&[s_a.id], move |deps| {
if let TestValue::Int(n) = deps[0] {
let h = binding_for_fn.intern(TestValue::Int(n * 100));
core_for_fn.emit_or_defer(s_b_id, h);
}
Some(deps[0].clone())
});
let rec_d: Recorder = rt.subscribe_recorder(d_producer);
let rec_b: Recorder = rt.subscribe_recorder(s_b.id);
let h = rt.binding.intern(TestValue::Int(7));
rt.core.emit(s_a.id, h);
let d_data = rec_d.data_values();
assert!(
d_data.contains(&TestValue::Int(7)),
"d_producer should see s_a's emission; got: {d_data:?}"
);
let b_data = rec_b.data_values();
assert!(
b_data.contains(&TestValue::Int(700)),
"s_b should receive the deferred emission (700); got: {b_data:?}"
);
}
#[test]
fn producer_cross_partition_subscribe_defers_and_succeeds() {
let rt = TestRuntime::new();
let s_low = rt.state(Some(TestValue::Int(42)));
let s_trigger = rt.state(None);
let p_low = rt.core.partition_of(s_low.id).expect("registered");
let p_trigger = rt.core.partition_of(s_trigger.id).expect("registered");
assert!(
p_trigger.raw() > p_low.raw(),
"fixture invariant: partition(s_trigger) > partition(s_low). \
Got p_trigger={p_trigger:?}, p_low={p_low:?}"
);
let deferred_events: Arc<Mutex<Vec<TestValue>>> = Arc::new(Mutex::new(Vec::new()));
let events_for_sub = deferred_events.clone();
let binding_for_sub = rt.binding.clone();
let core_for_fn = rt.core.clone();
let s_low_id = s_low.id;
let d_build = rt.derived(&[s_trigger.id], move |deps| {
if let TestValue::Int(_n) = deps[0] {
let events_c = events_for_sub.clone();
let binding_c = binding_for_sub.clone();
let core_c = core_for_fn.clone();
let sink: graphrefly_core::Sink =
Arc::new(move |msgs: &[graphrefly_core::Message]| {
for msg in msgs {
if let graphrefly_core::Message::Data(h) = msg {
let v = binding_c.deref(*h);
events_c.lock().unwrap().push(v);
}
}
});
let core_for_cb = core_c.clone();
core_c.push_deferred_producer_op(graphrefly_core::DeferredProducerOp::Callback(
Box::new(move || {
let sub = core_for_cb.subscribe(s_low_id, sink);
let _ = std::mem::ManuallyDrop::new(sub);
}),
));
}
Some(deps[0].clone())
});
let _rec_d: Recorder = rt.subscribe_recorder(d_build);
let h = rt.binding.intern(TestValue::Int(1));
rt.core.emit(s_trigger.id, h);
let events = deferred_events.lock().unwrap();
assert!(
events.contains(&TestValue::Int(42)),
"deferred subscribe should receive s_low's cached value (42); \
got: {events:?}"
);
}
#[test]
fn deferred_emit_ordering_preserved() {
let rt = TestRuntime::new();
let s_target = rt.state(Some(TestValue::Int(0)));
let s_source = rt.state(None);
let p_target = rt.core.partition_of(s_target.id).expect("registered");
let p_source = rt.core.partition_of(s_source.id).expect("registered");
assert!(
p_source.raw() > p_target.raw(),
"fixture invariant: partition(s_source) > partition(s_target). \
Got p_source={p_source:?}, p_target={p_target:?}"
);
let core_for_fn = rt.core.clone();
let binding_for_fn = rt.binding.clone();
let s_target_id = s_target.id;
let d_multi = rt.derived(&[s_source.id], move |deps| {
if let TestValue::Int(_n) = deps[0] {
for i in 1..=3i64 {
let h = binding_for_fn.intern(TestValue::Int(i * 10));
core_for_fn.emit_or_defer(s_target_id, h);
}
}
Some(deps[0].clone())
});
let _rec_d: Recorder = rt.subscribe_recorder(d_multi);
let rec_target: Recorder = rt.subscribe_recorder(s_target.id);
let h = rt.binding.intern(TestValue::Int(1));
rt.core.emit(s_source.id, h);
let target_data = rec_target.data_values();
let deferred_values: Vec<i64> = target_data
.iter()
.filter_map(|v| match v {
TestValue::Int(n) if *n > 0 => Some(*n),
_ => None,
})
.collect();
assert_eq!(
deferred_values,
vec![10, 20, 30],
"deferred emissions must arrive in FIFO order; got: {deferred_values:?}"
);
}
#[test]
fn user_fn_cross_partition_emit_during_fire_panics_with_h_plus_diagnostic() {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let rt = TestRuntime::new();
let s_in = rt.state(Some(TestValue::Int(0)));
let s_side = rt.state(Some(TestValue::Int(0)));
let core = rt.core.clone();
let s_side_id = s_side.id;
let binding = rt.binding.clone();
let d = rt.derived(&[s_in.id], move |deps| {
if let TestValue::Int(n) = deps[0] {
let h = binding.intern(TestValue::Int(n * 10));
core.emit(s_side_id, h);
}
Some(deps[0].clone())
});
let p_d = rt.core.partition_of(d).expect("d registered");
let p_side = rt.core.partition_of(s_side.id).expect("s_side registered");
assert!(
p_d.raw() > p_side.raw(),
"fixture invariant violated: this test requires partition(d) \
> partition(s_side) so the inner emit is descending order. \
Got partition(d)={p_d:?}, partition(s_side)={p_side:?}. \
If union_nodes' tiebreak was changed (e.g., to pick \
smaller-id-as-root on equal rank), reconstruct the topology \
to force descending order — DO NOT assume the test still \
covers the H+ panic path."
);
let _rec_d = rt.subscribe_recorder(d);
}));
let payload = result.expect_err(
"expected the H+ ascending-order check to panic when a user fn \
does a cross-partition emit DESCENDING during fire — instead the \
closure completed without panic. H+ enforcement regression?",
);
let msg: &str = payload
.downcast_ref::<&'static str>()
.copied()
.or_else(|| payload.downcast_ref::<String>().map(String::as_str))
.unwrap_or("(non-string panic payload)");
assert!(
msg.contains("Phase H+ ascending-order violation"),
"expected H+ ascending-order panic; got: {msg}"
);
let held = graphrefly_core::held_snapshot_for_tests();
assert!(
held.is_empty(),
"Phase H+ thread-local `HELD` is dirty post-panic: {held:?}. \
The scope-guard in `partition_wave_owner_lock_arc` (/qa A1) \
must release the refcount on every exit path including \
panic unwinds; a non-empty held set here means a leak."
);
}
#[test]
fn panic_in_derived_discards_deferred_handles_without_leaking() {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let rt = TestRuntime::new();
let s_target = rt.state(Some(TestValue::Int(0)));
let s_trigger = rt.state(None);
let p_target = rt.core.partition_of(s_target.id).expect("registered");
let p_trigger = rt.core.partition_of(s_trigger.id).expect("registered");
assert!(
p_trigger.raw() > p_target.raw(),
"fixture invariant: partition(s_trigger) > partition(s_target). \
Got p_trigger={p_trigger:?}, p_target={p_target:?}"
);
let core_for_fn = rt.core.clone();
let binding_for_fn = rt.binding.clone();
let s_target_id = s_target.id;
let _d = rt.derived(&[s_trigger.id], move |deps| {
if let TestValue::Int(_n) = deps[0] {
let h = binding_for_fn.intern(TestValue::Int(999));
core_for_fn.emit_or_defer(s_target_id, h);
panic!("intentional panic for discard test");
}
Some(deps[0].clone())
});
let _rec = rt.subscribe_recorder(_d);
let h = rt.binding.intern(TestValue::Int(1));
rt.core.emit(s_trigger.id, h);
}));
assert!(
result.is_err(),
"expected panic from derived fn, but closure completed normally"
);
let held = graphrefly_core::held_snapshot_for_tests();
assert!(
held.is_empty(),
"held_partitions dirty after panic-discard: {held:?}"
);
}
#[test]
fn chained_deferred_ops_drain_multi_iteration() {
let rt = TestRuntime::new();
let s_target = rt.state(Some(TestValue::Int(0)));
let s_trigger = rt.state(None);
let p_target = rt.core.partition_of(s_target.id).expect("registered");
let p_trigger = rt.core.partition_of(s_trigger.id).expect("registered");
assert!(
p_trigger.raw() > p_target.raw(),
"fixture invariant: partition(s_trigger) > partition(s_target)"
);
let core_for_fn = rt.core.clone();
let binding_for_fn = rt.binding.clone();
let s_target_id = s_target.id;
let d = rt.derived(&[s_trigger.id], move |deps| {
if let TestValue::Int(_n) = deps[0] {
let core_cb = core_for_fn.clone();
let binding_cb = binding_for_fn.clone();
core_for_fn.push_deferred_producer_op(
graphrefly_core::DeferredProducerOp::Callback(Box::new(move || {
let h = binding_cb.intern(TestValue::Int(777));
core_cb.emit_or_defer(s_target_id, h);
})),
);
}
Some(deps[0].clone())
});
let _rec_d = rt.subscribe_recorder(d);
let rec_target = rt.subscribe_recorder(s_target.id);
let h = rt.binding.intern(TestValue::Int(1));
rt.core.emit(s_trigger.id, h);
let target_data = rec_target.data_values();
assert!(
target_data.contains(&TestValue::Int(777)),
"chained deferred op (emit 777 from callback) should drain; \
got target data: {target_data:?}"
);
}
#[test]
fn subscribe_triggered_drain_of_deferred_ops() {
let rt = TestRuntime::new();
let s_low = rt.state(Some(TestValue::Int(100)));
let s_trigger = rt.state(Some(TestValue::Int(0)));
let p_low = rt.core.partition_of(s_low.id).expect("registered");
let p_trigger = rt.core.partition_of(s_trigger.id).expect("registered");
assert!(
p_trigger.raw() > p_low.raw(),
"fixture invariant: partition(s_trigger) > partition(s_low)"
);
let core_for_fn = rt.core.clone();
let binding_for_fn = rt.binding.clone();
let s_low_id = s_low.id;
let d = rt.derived(&[s_trigger.id], move |deps| {
if let TestValue::Int(n) = deps[0] {
let h = binding_for_fn.intern(TestValue::Int(n + 500));
core_for_fn.emit_or_defer(s_low_id, h);
}
Some(deps[0].clone())
});
let rec_low = rt.subscribe_recorder(s_low.id);
let _rec_d = rt.subscribe_recorder(d);
let low_data = rec_low.data_values();
assert!(
low_data.contains(&TestValue::Int(500)),
"subscribe-path drain should fire deferred emit (500); \
got s_low data: {low_data:?}"
);
}
#[test]
fn mixed_deferred_op_types_preserve_fifo_order() {
let rt = TestRuntime::new();
let s_target = rt.state(Some(TestValue::Int(0)));
let s_trigger = rt.state(None);
let p_target = rt.core.partition_of(s_target.id).expect("registered");
let p_trigger = rt.core.partition_of(s_trigger.id).expect("registered");
assert!(
p_trigger.raw() > p_target.raw(),
"fixture invariant: partition(s_trigger) > partition(s_target)"
);
let core_for_fn = rt.core.clone();
let binding_for_fn = rt.binding.clone();
let s_target_id = s_target.id;
let d = rt.derived(&[s_trigger.id], move |deps| {
if let TestValue::Int(_n) = deps[0] {
let h = binding_for_fn.intern(TestValue::Int(42));
core_for_fn.emit_or_defer(s_target_id, h);
core_for_fn.complete_or_defer(s_target_id);
}
Some(deps[0].clone())
});
let _rec_d = rt.subscribe_recorder(d);
let rec_target = rt.subscribe_recorder(s_target.id);
let h = rt.binding.intern(TestValue::Int(1));
rt.core.emit(s_trigger.id, h);
let events = rec_target.snapshot();
let data_idx = events
.iter()
.position(|e| matches!(e, super::common::RecordedEvent::Data(TestValue::Int(42))));
let complete_idx = events
.iter()
.position(|e| matches!(e, super::common::RecordedEvent::Complete));
assert!(
data_idx.is_some(),
"s_target should receive Data(42) from deferred emit; events: {events:?}"
);
assert!(
complete_idx.is_some(),
"s_target should receive Complete from deferred complete; events: {events:?}"
);
assert!(
data_idx.unwrap() < complete_idx.unwrap(),
"FIFO order violation: Data(42) at index {:?} should precede \
Complete at index {:?}; events: {events:?}",
data_idx,
complete_idx
);
}
#[test]
fn disjoint_partition_concurrent_emits_each_drain_and_deliver() {
const N: i64 = 300;
let rt = TestRuntime::new();
let s_a = rt.state(None);
let s_b = rt.state(None);
let p_a = rt.core.partition_of(s_a.id).expect("registered");
let p_b = rt.core.partition_of(s_b.id).expect("registered");
assert_ne!(p_a, p_b, "s_a and s_b must be in disjoint partitions");
let d_a = rt.derived(&[s_a.id], |deps| Some(deps[0].clone()));
let d_b = rt.derived(&[s_b.id], |deps| Some(deps[0].clone()));
let rec_a = rt.subscribe_recorder(d_a);
let rec_b = rt.subscribe_recorder(d_b);
let baseline_handles = rt.binding.live_handles();
let start = Arc::new(std::sync::Barrier::new(2));
let mk = |sh: super::common::StateHandle, b: Arc<std::sync::Barrier>| {
thread::spawn(move || {
b.wait();
for i in 0..N {
sh.set(TestValue::Int(i));
}
})
};
let t_a = mk(s_a, start.clone());
let t_b = mk(s_b, start.clone());
join_with_timeout(t_a, 10, "thread A");
join_with_timeout(t_b, 10, "thread B");
let want: Vec<TestValue> = (0..N).map(TestValue::Int).collect();
assert_eq!(
rec_a.data_values(),
want,
"thread A's disjoint wave must deliver every value (got {} of {N})",
rec_a.data_values().len()
);
assert_eq!(
rec_b.data_values(),
want,
"thread B's disjoint wave must deliver every value (got {} of {N})",
rec_b.data_values().len()
);
let after = rt.binding.live_handles();
assert!(
after <= baseline_handles + 8,
"payload retains leaked: live_handles {after} vs baseline \
{baseline_handles} after {N} emits/thread (skipped drain?)"
);
}
}
#[cfg(loom)]
mod loom_tests {
use loom::sync::atomic::{AtomicUsize, Ordering};
use loom::sync::{Arc, Mutex};
struct PartitionLock {
id: usize,
guard: Mutex<()>,
}
fn acquire_two_in_ascending_order<'a>(
a: &'a Arc<PartitionLock>,
b: &'a Arc<PartitionLock>,
) -> (
loom::sync::MutexGuard<'a, ()>,
loom::sync::MutexGuard<'a, ()>,
) {
let (low, high) = if a.id <= b.id { (a, b) } else { (b, a) };
let g_low = low.guard.lock().unwrap();
let g_high = high.guard.lock().unwrap();
(g_low, g_high)
}
#[test]
fn cross_partition_reciprocal_arg_order_acquire_no_deadlock() {
loom::model(|| {
let part_x = Arc::new(PartitionLock {
id: 1,
guard: Mutex::new(()),
});
let part_y = Arc::new(PartitionLock {
id: 2,
guard: Mutex::new(()),
});
let acquired_x = Arc::new(AtomicUsize::new(0));
let acquired_y = Arc::new(AtomicUsize::new(0));
let x_for_a = part_x.clone();
let y_for_a = part_y.clone();
let acq_x_a = acquired_x.clone();
let acq_y_a = acquired_y.clone();
let h1 = loom::thread::spawn(move || {
let (gx, gy) = acquire_two_in_ascending_order(&x_for_a, &y_for_a);
acq_x_a.fetch_add(1, Ordering::SeqCst);
acq_y_a.fetch_add(1, Ordering::SeqCst);
drop(gy);
drop(gx);
});
let x_for_b = part_x.clone();
let y_for_b = part_y.clone();
let acq_x_b = acquired_x.clone();
let acq_y_b = acquired_y.clone();
let h2 = loom::thread::spawn(move || {
let (gx, gy) = acquire_two_in_ascending_order(&y_for_b, &x_for_b);
acq_x_b.fetch_add(1, Ordering::SeqCst);
acq_y_b.fetch_add(1, Ordering::SeqCst);
drop(gy);
drop(gx);
});
h1.join().unwrap();
h2.join().unwrap();
let x_count = acquired_x.load(Ordering::SeqCst);
let y_count = acquired_y.load(Ordering::SeqCst);
assert_eq!(x_count, 2, "partition X acquired by both threads");
assert_eq!(y_count, 2, "partition Y acquired by both threads");
});
}
#[test]
fn reciprocal_cross_partition_acquisitions_no_deadlock() {
loom::model(|| {
let part_x = Arc::new(PartitionLock {
id: 7,
guard: Mutex::new(()),
});
let part_y = Arc::new(PartitionLock {
id: 13,
guard: Mutex::new(()),
});
let x_a = part_x.clone();
let y_a = part_y.clone();
let h1 = loom::thread::spawn(move || {
let (gl, gh) = acquire_two_in_ascending_order(&x_a, &y_a);
drop(gh);
drop(gl);
});
let x_b = part_x.clone();
let y_b = part_y.clone();
let h2 = loom::thread::spawn(move || {
let (gl, gh) = acquire_two_in_ascending_order(&y_b, &x_b);
drop(gh);
drop(gl);
});
h1.join().unwrap();
h2.join().unwrap();
});
}
}