mod common;
use std::thread;
use common::{TestRuntime, TestValue};
use graphrefly_core::{NodeFnOrOp, NodeOpts, NodeRegistration};
const WORKERS: usize = 4;
const EMITS: i64 = 20;
fn run_cascade(rt: &TestRuntime) -> Vec<TestValue> {
let init = rt.binding.intern(TestValue::Int(0));
let s = rt
.core()
.register(NodeRegistration {
deps: vec![],
fn_or_op: None,
opts: NodeOpts {
initial: init,
..Default::default()
},
})
.expect("register state");
let fid = rt
.binding
.register_fn(|vals: &[TestValue]| Some(vals[0].clone()));
let d = rt
.core()
.register(NodeRegistration {
deps: vec![s],
fn_or_op: Some(NodeFnOrOp::Fn(fid)),
opts: NodeOpts::default(),
})
.expect("register derived");
let rec = rt.subscribe_recorder(d);
for k in 1..=EMITS {
let h = rt.binding.intern(TestValue::Int(k));
rt.core().emit(s, h);
}
rt.drain_mailbox();
rec.data_values()
}
#[test]
fn independent_cores_each_serialize_without_deadlock() {
let handles: Vec<_> = (0..WORKERS)
.map(|_| {
thread::spawn(|| {
let rt = TestRuntime::new();
run_cascade(&rt)
})
})
.collect();
let expected: Vec<TestValue> = std::iter::once(TestValue::Int(0))
.chain((1..=EMITS).map(TestValue::Int))
.collect();
for h in handles {
let delivered = h
.join()
.expect("worker Core completed without panic/deadlock");
assert_eq!(
delivered, expected,
"each independent same-group Core serializes its own wave \
in-order, exactly once (cached initial replayed first)"
);
}
}
#[test]
fn independent_cores_both_complete() {
let a = thread::spawn(|| {
let rt = TestRuntime::new();
run_cascade(&rt)
});
let b = thread::spawn(|| {
let rt = TestRuntime::new();
run_cascade(&rt)
});
let expected: Vec<TestValue> = std::iter::once(TestValue::Int(0))
.chain((1..=EMITS).map(TestValue::Int))
.collect();
let av = a.join().expect("Core A completed");
let bv = b.join().expect("Core B completed");
assert_eq!(av, expected, "independent Core A delivered fully");
assert_eq!(bv, expected, "independent Core B delivered fully");
}
#[test]
fn all_none_default_independent_cores_cascade_integrity() {
let handles: Vec<_> = (0..WORKERS)
.map(|_| {
thread::spawn(|| {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
let a = rt.derived(&[s.id], |v| Some(v[0].clone()));
let b = rt.derived(&[s.id], |v| Some(v[0].clone()));
let c = rt.derived(&[a, b], |v| {
let int = |t: &TestValue| match t {
TestValue::Int(n) => *n,
other => panic!("diamond expected Int, got {other:?}"),
};
Some(TestValue::Int(int(&v[0]) + int(&v[1])))
});
let rec = rt.subscribe_recorder(c);
for k in 1..=EMITS {
s.set(TestValue::Int(k));
}
rt.drain_mailbox();
rec.data_values()
})
})
.collect();
let expected: Vec<TestValue> = std::iter::once(TestValue::Int(0))
.chain((1..=EMITS).map(|k| TestValue::Int(2 * k)))
.collect();
for h in handles {
let delivered = h.join().expect("worker diamond Core completed");
assert_eq!(
delivered, expected,
"all-None diamond keeps cascade integrity per independent Core \
(no torn a/b → every settled c is 2*k, in order)"
);
}
}