mod common;
use std::sync::{Arc, Mutex};
use common::{TestBinding, TestRuntime, TestValue};
use graphrefly_core::{
BindingBoundary, Core, EqualsMode, FnId, FnResult, HandleId, NodeId, SetDepsError,
};
const _: fn() = || {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
assert_send::<graphrefly_core::Core>();
assert_sync::<graphrefly_core::Core>();
assert_send::<graphrefly_core::SubgraphId>();
assert_sync::<graphrefly_core::SubgraphId>();
};
#[test]
fn singleton_registration_creates_partition() {
let rt = TestRuntime::new();
let _s = rt.state(Some(TestValue::Int(0)));
assert_eq!(rt.core.partition_count(), 1);
}
#[test]
fn derived_unions_with_dep() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
let d = rt.derived(&[s.id], |_| Some(TestValue::Int(0)));
assert_eq!(rt.core.partition_count(), 1);
assert_eq!(rt.core.partition_of(s.id), rt.core.partition_of(d));
}
#[test]
fn disjoint_state_nodes_have_distinct_partitions() {
let rt = TestRuntime::new();
let s1 = rt.state(Some(TestValue::Int(1)));
let s2 = rt.state(Some(TestValue::Int(2)));
assert_eq!(rt.core.partition_count(), 2);
assert_ne!(rt.core.partition_of(s1.id), rt.core.partition_of(s2.id));
}
#[test]
fn diamond_topology_collapses_to_one_partition() {
let rt = TestRuntime::new();
let s1 = rt.state(Some(TestValue::Int(1)));
let s2 = rt.state(Some(TestValue::Int(2)));
let _d = rt.derived(&[s1.id, s2.id], |_| Some(TestValue::Int(0)));
assert_eq!(rt.core.partition_count(), 1);
assert_eq!(
rt.core.partition_of(s1.id),
rt.core.partition_of(s2.id),
"s1 and s2 connected via shared derived consumer"
);
}
#[test]
fn set_deps_add_edge_unions_partitions() {
let rt = TestRuntime::new();
let s1 = rt.state(Some(TestValue::Int(1)));
let s2 = rt.state(Some(TestValue::Int(2)));
let d = rt.dynamic(&[s1.id], |_| (Some(TestValue::Int(0)), None));
assert_eq!(rt.core.partition_count(), 2);
assert_eq!(rt.core.partition_of(d), rt.core.partition_of(s1.id));
assert_ne!(rt.core.partition_of(d), rt.core.partition_of(s2.id));
rt.core.set_deps(d, &[s1.id, s2.id]).expect("set_deps");
assert_eq!(rt.core.partition_count(), 1);
assert_eq!(
rt.core.partition_of(d),
rt.core.partition_of(s2.id),
"s2 unioned in via set_deps add-edge"
);
}
#[test]
fn set_deps_remove_edge_splits_partition_when_disconnects() {
let rt = TestRuntime::new();
let s1 = rt.state(Some(TestValue::Int(1)));
let s2 = rt.state(Some(TestValue::Int(2)));
let d = rt.dynamic(&[s1.id, s2.id], |_| (Some(TestValue::Int(0)), None));
assert_eq!(rt.core.partition_count(), 1, "{{d, s1, s2}} merged");
rt.core.set_deps(d, &[s1.id]).expect("set_deps");
assert_eq!(
rt.core.partition_count(),
2,
"Phase F split-eager: edge removal disconnects → split into 2 components"
);
assert_eq!(
rt.core.partition_of(d),
rt.core.partition_of(s1.id),
"d and s1 stay together (still connected via the s1 → d edge)"
);
assert_ne!(
rt.core.partition_of(d),
rt.core.partition_of(s2.id),
"s2 lives in its own (orphan) partition after split"
);
}
#[test]
fn set_deps_remove_edge_keeps_partition_when_still_connected() {
let rt = TestRuntime::new();
let s1 = rt.state(Some(TestValue::Int(1)));
let d1 = rt.dynamic(&[s1.id], |_| (Some(TestValue::Int(0)), None));
let d2 = rt.dynamic(&[s1.id, d1], |_| (Some(TestValue::Int(0)), None));
assert_eq!(rt.core.partition_count(), 1);
rt.core.set_deps(d2, &[d1]).expect("set_deps");
assert_eq!(
rt.core.partition_count(),
1,
"alternative dep path keeps the component connected — no split"
);
}
#[test]
fn set_deps_mixed_add_and_remove_in_one_call() {
let rt = TestRuntime::new();
let s1 = rt.state(Some(TestValue::Int(1)));
let s2 = rt.state(Some(TestValue::Int(2)));
let s3 = rt.state(Some(TestValue::Int(3)));
let d = rt.dynamic(&[s1.id, s2.id], |_| (Some(TestValue::Int(0)), None));
assert_eq!(rt.core.partition_count(), 2); assert_ne!(rt.core.partition_of(d), rt.core.partition_of(s3.id));
rt.core.set_deps(d, &[s1.id, s3.id]).expect("set_deps");
assert_eq!(
rt.core.partition_count(),
2,
"Phase F split-eager: post-mixed-set_deps, s2 splits off into \
its own partition while {{d, s1, s3}} stays merged via the \
remaining dep edges"
);
assert_eq!(rt.core.partition_of(d), rt.core.partition_of(s1.id));
assert_eq!(rt.core.partition_of(d), rt.core.partition_of(s3.id));
assert_ne!(
rt.core.partition_of(d),
rt.core.partition_of(s2.id),
"s2 is the orphan side of the split"
);
}
#[test]
fn unrelated_subgraphs_stay_disjoint_through_topology_mutation() {
let rt = TestRuntime::new();
let s1 = rt.state(Some(TestValue::Int(1)));
let d1 = rt.derived(&[s1.id], |_| Some(TestValue::Int(0)));
let s2 = rt.state(Some(TestValue::Int(2)));
let d2 = rt.derived(&[s2.id], |_| Some(TestValue::Int(0)));
assert_eq!(rt.core.partition_count(), 2);
let p_a = rt.core.partition_of(d1).expect("registered");
let p_b = rt.core.partition_of(d2).expect("registered");
assert_ne!(p_a, p_b);
assert_eq!(rt.core.partition_of(s1.id), Some(p_a));
assert_eq!(rt.core.partition_of(s2.id), Some(p_b));
}
#[test]
fn p12_register_partition_visible_immediately() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
assert!(
rt.core.partition_of(s.id).is_some(),
"register MUST publish partition membership atomically with the \
node's appearance in s.nodes — the P12-fixed lock-discipline \
invariant `state lock → registry mutex` underwrites Y1's \
lock_for resolution"
);
}
#[test]
fn p12_set_deps_partition_visible_immediately() {
let rt = TestRuntime::new();
let s1 = rt.state(Some(TestValue::Int(1)));
let s2 = rt.state(Some(TestValue::Int(2)));
let d = rt.dynamic(&[s1.id], |_| (Some(TestValue::Int(0)), None));
let p_before = rt.core.partition_of(d).expect("registered");
let p_s2_before = rt.core.partition_of(s2.id).expect("registered");
assert_ne!(p_before, p_s2_before, "pre-set_deps disjoint");
rt.core.set_deps(d, &[s1.id, s2.id]).expect("set_deps");
assert_eq!(
rt.core.partition_of(d),
rt.core.partition_of(s2.id),
"set_deps add-edge MUST publish partition union atomically with \
the children-map mutation in s.nodes — P12 fix invariant"
);
}
struct CrossPartitionRewireBinding {
inner: Arc<TestBinding>,
firing_node: Mutex<Option<NodeId>>,
target: Mutex<Option<NodeId>>,
new_deps: Mutex<Vec<NodeId>>,
core: Mutex<Option<Core>>,
captured_result: Mutex<Option<Result<(), SetDepsError>>>,
}
impl BindingBoundary for CrossPartitionRewireBinding {
fn invoke_fn(
&self,
node_id: NodeId,
fn_id: FnId,
dep_data: &[graphrefly_core::DepBatch],
) -> FnResult {
if Some(node_id) == *self.firing_node.lock().unwrap() {
let target = *self.target.lock().unwrap();
let new_deps = self.new_deps.lock().unwrap().clone();
let core = self.core.lock().unwrap().clone();
if let (Some(t), Some(c)) = (target, core) {
let result = c.set_deps(t, &new_deps);
*self.captured_result.lock().unwrap() = Some(result);
}
}
self.inner.invoke_fn(node_id, fn_id, dep_data)
}
fn release_handle(&self, handle: HandleId) {
self.inner.release_handle(handle);
}
fn retain_handle(&self, handle: HandleId) {
self.inner.retain_handle(handle);
}
fn custom_equals(&self, fn_id: FnId, a: HandleId, b: HandleId) -> bool {
self.inner.custom_equals(fn_id, a, b)
}
}
#[test]
fn p13_cross_partition_set_deps_during_fire_rejected() {
let inner = TestBinding::new();
let bind = Arc::new(CrossPartitionRewireBinding {
inner: inner.clone(),
firing_node: Mutex::new(None),
target: Mutex::new(None),
new_deps: Mutex::new(Vec::new()),
core: Mutex::new(None),
captured_result: Mutex::new(None),
});
let core = Core::new(bind.clone() as Arc<dyn BindingBoundary>);
*bind.core.lock().unwrap() = Some(core.clone());
let s1_init = inner.intern(TestValue::Int(10));
let s2_init = inner.intern(TestValue::Int(20));
let s1 = core.register_state(s1_init, false).unwrap();
let s2 = core.register_state(s2_init, false).unwrap();
let fn_id = inner.register_fn(|deps: &[TestValue]| match deps[0] {
TestValue::Int(v) => Some(TestValue::Int(v * 2)),
_ => None,
});
let n = core
.register_derived(&[s1], fn_id, EqualsMode::Identity, false)
.unwrap();
let m_fn = inner.register_fn(|deps: &[TestValue]| match deps[0] {
TestValue::Int(v) => Some(TestValue::Int(v + 1)),
_ => None,
});
let m = core
.register_dynamic(&[s2], m_fn, EqualsMode::Identity, false)
.unwrap();
let p_a = core.partition_of(n).expect("registered");
let p_b = core.partition_of(m).expect("registered");
assert_ne!(p_a, p_b, "n and m start in disjoint partitions");
*bind.firing_node.lock().unwrap() = Some(n);
*bind.target.lock().unwrap() = Some(m);
*bind.new_deps.lock().unwrap() = vec![s2, s1];
let _sub = core.subscribe(n, Arc::new(|_msgs: &[graphrefly_core::Message]| {}));
let result = bind
.captured_result
.lock()
.unwrap()
.take()
.expect("set_deps was attempted from inside invoke_fn");
assert!(
matches!(
result,
Err(SetDepsError::PartitionMigrationDuringFire { n: rejected_n, firing })
if rejected_n == m && firing == n
),
"expected PartitionMigrationDuringFire {{ n: {m:?}, firing: {n:?} }}; got {result:?}"
);
assert_eq!(
core.deps_of(m),
vec![s2],
"m's deps survived the rejected rewire"
);
assert_ne!(
core.partition_of(n),
core.partition_of(m),
"partitions remained disjoint — no spurious union"
);
}
#[test]
fn p13_same_partition_set_deps_during_fire_allowed() {
let inner = TestBinding::new();
let bind = Arc::new(CrossPartitionRewireBinding {
inner: inner.clone(),
firing_node: Mutex::new(None),
target: Mutex::new(None),
new_deps: Mutex::new(Vec::new()),
core: Mutex::new(None),
captured_result: Mutex::new(None),
});
let core = Core::new(bind.clone() as Arc<dyn BindingBoundary>);
*bind.core.lock().unwrap() = Some(core.clone());
let s1_init = inner.intern(TestValue::Int(1));
let s2_init = inner.intern(TestValue::Int(2));
let s1 = core.register_state(s1_init, false).unwrap();
let s2 = core.register_state(s2_init, false).unwrap();
let fn_id = inner.register_fn(|deps: &[TestValue]| match deps[0] {
TestValue::Int(v) => Some(TestValue::Int(v * 2)),
_ => None,
});
let n_fire = core
.register_derived(&[s1], fn_id, EqualsMode::Identity, false)
.unwrap();
let bridge_fn = inner.register_fn(|_: &[TestValue]| Some(TestValue::Int(0)));
let _bridge = core
.register_derived(&[s1, s2], bridge_fn, EqualsMode::Identity, false)
.unwrap();
let m_fn = inner.register_fn(|_: &[TestValue]| Some(TestValue::Int(0)));
let m = core
.register_dynamic(&[s1, s2], m_fn, EqualsMode::Identity, false)
.unwrap();
assert_eq!(
core.partition_count(),
1,
"s1, s2, bridge, n_fire, m all unioned in one partition"
);
*bind.firing_node.lock().unwrap() = Some(n_fire);
*bind.target.lock().unwrap() = Some(m);
*bind.new_deps.lock().unwrap() = vec![s1];
let _sub = core.subscribe(n_fire, Arc::new(|_msgs: &[graphrefly_core::Message]| {}));
let result = bind
.captured_result
.lock()
.unwrap()
.take()
.expect("set_deps was attempted from inside invoke_fn");
assert!(
result.is_ok(),
"no-migration set_deps from inside another node's fire MUST pass \
— alternative dep path via `bridge` keeps the component \
connected, so removal triggers no split; got {result:?}"
);
assert_eq!(
core.partition_count(),
1,
"no split — alternative dep path preserves connectivity"
);
}
#[test]
fn p13_split_during_fire_rejected() {
let inner = TestBinding::new();
let bind = Arc::new(CrossPartitionRewireBinding {
inner: inner.clone(),
firing_node: Mutex::new(None),
target: Mutex::new(None),
new_deps: Mutex::new(Vec::new()),
core: Mutex::new(None),
captured_result: Mutex::new(None),
});
let core = Core::new(bind.clone() as Arc<dyn BindingBoundary>);
*bind.core.lock().unwrap() = Some(core.clone());
let s1_init = inner.intern(TestValue::Int(1));
let s2_init = inner.intern(TestValue::Int(2));
let s1 = core.register_state(s1_init, false).unwrap();
let s2 = core.register_state(s2_init, false).unwrap();
let fn_id = inner.register_fn(|deps: &[TestValue]| match deps[0] {
TestValue::Int(v) => Some(TestValue::Int(v * 2)),
_ => None,
});
let n_fire = core
.register_derived(&[s1], fn_id, EqualsMode::Identity, false)
.unwrap();
let m_fn = inner.register_fn(|_: &[TestValue]| Some(TestValue::Int(0)));
let m = core
.register_dynamic(&[s1, s2], m_fn, EqualsMode::Identity, false)
.unwrap();
assert_eq!(
core.partition_count(),
1,
"single component pre-set_deps: s2's only edge is to m"
);
*bind.firing_node.lock().unwrap() = Some(n_fire);
*bind.target.lock().unwrap() = Some(m);
*bind.new_deps.lock().unwrap() = vec![s1];
let _sub = core.subscribe(n_fire, Arc::new(|_msgs: &[graphrefly_core::Message]| {}));
let result = bind
.captured_result
.lock()
.unwrap()
.take()
.expect("set_deps was attempted from inside invoke_fn");
assert!(
matches!(
result,
Err(SetDepsError::PartitionMigrationDuringFire { n: rejected_n, firing })
if rejected_n == m && firing == n_fire
),
"expected PartitionMigrationDuringFire {{ n: {m:?}, firing: {n_fire:?} }}; got {result:?}"
);
assert_eq!(
core.deps_of(m),
vec![s1, s2],
"m's deps survived the rejected rewire"
);
assert_eq!(core.partition_count(), 1, "rejected — no split occurred");
}