mod common;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use common::{CleanupClosure, TestNodeFnCleanup, TestRuntime, TestValue};
use graphrefly_core::CleanupTrigger;
fn counter_hook(counter: Arc<AtomicU64>) -> CleanupClosure {
Arc::new(move || {
counter.fetch_add(1, Ordering::SeqCst);
})
}
#[test]
fn r2_4_5_on_rerun_fires_before_second_fn() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let derived_node = rt.derived(&[s.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(n * 10)),
_ => panic!("type"),
});
let _rec = rt.subscribe_recorder(derived_node);
let on_rerun_count = Arc::new(AtomicU64::new(0));
rt.binding.register_cleanup(
derived_node,
TestNodeFnCleanup {
on_rerun: Some(counter_hook(on_rerun_count.clone())),
..Default::default()
},
);
assert_eq!(
on_rerun_count.load(Ordering::SeqCst),
0,
"OnRerun must not fire before any prior fn run"
);
s.set(TestValue::Int(2));
assert_eq!(
on_rerun_count.load(Ordering::SeqCst),
1,
"OnRerun fires before second invoke_fn"
);
let calls = rt.binding.cleanup_calls_for(CleanupTrigger::OnRerun);
assert_eq!(calls, vec![derived_node]);
}
#[test]
fn r2_4_5_on_rerun_skipped_on_first_fire() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(7)));
let derived_node = rt.derived(&[s.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(n + 1)),
_ => panic!("type"),
});
let on_rerun_count = Arc::new(AtomicU64::new(0));
rt.binding.register_cleanup(
derived_node,
TestNodeFnCleanup {
on_rerun: Some(counter_hook(on_rerun_count.clone())),
..Default::default()
},
);
let _rec = rt.subscribe_recorder(derived_node);
assert_eq!(
on_rerun_count.load(Ordering::SeqCst),
0,
"first fire skips OnRerun (no prior run to clean up)"
);
assert!(rt
.binding
.cleanup_calls_for(CleanupTrigger::OnRerun)
.is_empty());
}
#[test]
fn r2_4_5_on_deactivation_fires_on_last_unsub() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let derived_node = rt.derived(&[s.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(*n)),
_ => panic!("type"),
});
let on_deact_count = Arc::new(AtomicU64::new(0));
rt.binding.register_cleanup(
derived_node,
TestNodeFnCleanup {
on_deactivation: Some(counter_hook(on_deact_count.clone())),
..Default::default()
},
);
let rec = rt.subscribe_recorder(derived_node);
drop(rec);
assert_eq!(
on_deact_count.load(Ordering::SeqCst),
1,
"OnDeactivation fires once on last-sub-drop"
);
}
#[test]
fn r2_4_5_on_deactivation_precedes_producer_deactivate() {
use graphrefly_core::{BindingBoundary, CleanupTrigger as CT, NodeId};
use std::sync::Mutex;
struct OrderingBinding {
inner: Arc<common::TestBinding>,
events: Mutex<Vec<String>>,
}
impl BindingBoundary for OrderingBinding {
fn invoke_fn(
&self,
n: NodeId,
f: graphrefly_core::FnId,
d: &[graphrefly_core::DepBatch],
) -> graphrefly_core::FnResult {
self.inner.invoke_fn(n, f, d)
}
fn custom_equals(
&self,
f: graphrefly_core::FnId,
a: graphrefly_core::HandleId,
b: graphrefly_core::HandleId,
) -> bool {
self.inner.custom_equals(f, a, b)
}
fn release_handle(&self, h: graphrefly_core::HandleId) {
self.inner.release_handle(h);
}
fn retain_handle(&self, h: graphrefly_core::HandleId) {
self.inner.retain_handle(h);
}
fn cleanup_for(&self, n: NodeId, t: CT) {
self.events
.lock()
.unwrap()
.push(format!("cleanup_for({n:?}, {t:?})"));
self.inner.cleanup_for(n, t);
}
fn producer_deactivate(&self, n: NodeId) {
self.events
.lock()
.unwrap()
.push(format!("producer_deactivate({n:?})"));
self.inner.producer_deactivate(n);
}
fn wipe_ctx(&self, n: NodeId) {
self.inner.wipe_ctx(n);
}
}
let test_binding = common::TestBinding::new();
let ordering = Arc::new(OrderingBinding {
inner: test_binding.clone(),
events: Mutex::new(Vec::new()),
});
let core = graphrefly_core::Core::new(ordering.clone() as Arc<dyn BindingBoundary>);
let producer_fn = test_binding.register_fn(|_| Some(TestValue::Int(42)));
let prod_id = core.register_producer(producer_fn).unwrap();
let on_deact_count = Arc::new(AtomicU64::new(0));
test_binding.register_cleanup(
prod_id,
TestNodeFnCleanup {
on_deactivation: Some(counter_hook(on_deact_count.clone())),
..Default::default()
},
);
let sink: graphrefly_core::Sink = Arc::new(|_| {});
let sub = core.subscribe(prod_id, sink);
drop(sub);
let events = ordering.events.lock().unwrap().clone();
let cleanup_idx = events
.iter()
.position(|s| s.starts_with("cleanup_for"))
.expect("cleanup_for fired");
let prod_deact_idx = events
.iter()
.position(|s| s.starts_with("producer_deactivate"))
.expect("producer_deactivate fired");
assert!(
cleanup_idx < prod_deact_idx,
"OnDeactivation must fire BEFORE producer_deactivate (D056); got {events:?}"
);
assert_eq!(on_deact_count.load(Ordering::SeqCst), 1);
}
#[test]
fn r2_4_5_on_deactivation_skipped_on_never_fired() {
let rt = TestRuntime::new();
let s = rt.state(None);
let on_deact_count = Arc::new(AtomicU64::new(0));
rt.binding.register_cleanup(
s.id,
TestNodeFnCleanup {
on_deactivation: Some(counter_hook(on_deact_count.clone())),
..Default::default()
},
);
let rec = rt.subscribe_recorder(s.id);
drop(rec);
assert_eq!(
on_deact_count.load(Ordering::SeqCst),
0,
"never-fired node skips OnDeactivation"
);
assert!(rt
.binding
.cleanup_calls_for(CleanupTrigger::OnDeactivation)
.is_empty());
}
#[test]
fn r1_3_9_b_on_invalidate_dedup_diamond() {
let rt = TestRuntime::new();
let a = rt.state(Some(TestValue::Int(10)));
let b = rt.derived(&[a.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(n + 1)),
_ => panic!("type"),
});
let c = rt.derived(&[a.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(n + 100)),
_ => panic!("type"),
});
let d = rt.derived(&[b, c], |deps| {
if let (TestValue::Int(x), TestValue::Int(y)) = (&deps[0], &deps[1]) {
Some(TestValue::Int(x + y))
} else {
panic!("type");
}
});
let _rec = rt.subscribe_recorder(d);
let d_invalidate_count = Arc::new(AtomicU64::new(0));
rt.binding.register_cleanup(
d,
TestNodeFnCleanup {
on_invalidate: Some(counter_hook(d_invalidate_count.clone())),
..Default::default()
},
);
rt.core.invalidate(a.id);
assert_eq!(
d_invalidate_count.load(Ordering::SeqCst),
1,
"OnInvalidate fires exactly once on D despite diamond fan-in (R1.3.9.b)"
);
let invalidate_targets = rt.binding.cleanup_calls_for(CleanupTrigger::OnInvalidate);
let d_count = invalidate_targets.iter().filter(|n| **n == d).count();
assert_eq!(d_count, 1, "Core fired cleanup_for(d, OnInvalidate) once");
}
#[test]
fn r1_3_9_c_on_invalidate_skipped_on_never_populated() {
let rt = TestRuntime::new();
let s = rt.state(None);
let _rec = rt.subscribe_recorder(s.id);
let on_inv_count = Arc::new(AtomicU64::new(0));
rt.binding.register_cleanup(
s.id,
TestNodeFnCleanup {
on_invalidate: Some(counter_hook(on_inv_count.clone())),
..Default::default()
},
);
rt.core.invalidate(s.id);
assert_eq!(
on_inv_count.load(Ordering::SeqCst),
0,
"OnInvalidate skipped when cache is never-populated sentinel (R1.3.9.c)"
);
assert!(rt
.binding
.cleanup_calls_for(CleanupTrigger::OnInvalidate)
.is_empty());
}
#[test]
fn r2_4_6_store_persists_across_deactivation() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
let derived_node = rt.derived(&[s.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(*n)),
_ => panic!("type"),
});
{
let _rec = rt.subscribe_recorder(derived_node);
rt.binding
.store_set(derived_node, "counter", TestValue::Int(7));
}
assert_eq!(
rt.binding.store_get(derived_node, "counter"),
Some(TestValue::Int(7)),
"store persists across deactivation by default per R2.4.6"
);
assert!(
rt.binding.has_ctx(derived_node),
"NodeCtxState entry remains after deactivation"
);
{
let _rec2 = rt.subscribe_recorder(derived_node);
assert_eq!(
rt.binding.store_get(derived_node, "counter"),
Some(TestValue::Int(7)),
"second activation reads prior store value"
);
}
}
#[test]
fn r2_4_6_store_wiped_on_resubscribable_reset() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
rt.core.set_resubscribable(s.id, true);
let _rec = rt.subscribe_recorder(s.id);
rt.binding.store_set(s.id, "scratch", TestValue::Int(99));
assert_eq!(
rt.binding.store_get(s.id, "scratch"),
Some(TestValue::Int(99))
);
drop(_rec);
rt.core.complete(s.id);
let _rec2 = rt.subscribe_recorder(s.id);
assert_eq!(
rt.binding.store_get(s.id, "scratch"),
None,
"store wiped on resubscribable terminal reset per R2.4.6"
);
assert!(
!rt.binding.has_ctx(s.id),
"NodeCtxState entry removed entirely on wipe_ctx"
);
let wipes = rt.binding.wipe_calls();
assert!(
wipes == vec![s.id] || wipes == vec![s.id, s.id],
"wipe_ctx fired for the resubscribable node (eagerly via D069 \
and/or via subscribe reset path); got: {wipes:?}"
);
}
#[test]
fn on_deactivation_runs_before_wipe_on_terminal_reset() {
use std::sync::Mutex;
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
rt.core.set_resubscribable(s.id, true);
let derived_node = rt.derived(&[s.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(*n)),
_ => panic!("type"),
});
rt.core.set_resubscribable(derived_node, true);
let observed_store_at_deact: Arc<Mutex<Option<TestValue>>> = Arc::new(Mutex::new(None));
let binding = rt.binding.clone();
let observed = observed_store_at_deact.clone();
let on_deact: CleanupClosure = Arc::new(move || {
let v = binding.store_get(derived_node, "k");
*observed.lock().unwrap() = v;
});
rt.binding.register_cleanup(
derived_node,
TestNodeFnCleanup {
on_deactivation: Some(on_deact),
..Default::default()
},
);
let rec = rt.subscribe_recorder(derived_node);
rt.binding
.store_set(derived_node, "k", TestValue::Int(2026));
drop(rec);
rt.core.complete(derived_node);
assert_eq!(
*observed_store_at_deact.lock().unwrap(),
Some(TestValue::Int(2026)),
"OnDeactivation fires BEFORE wipe; sees pre-wipe store"
);
let _rec2 = rt.subscribe_recorder(derived_node);
assert!(
!rt.binding.has_ctx(derived_node),
"store wiped after re-subscribe (post-OnDeactivation)"
);
}
#[test]
fn cleanup_can_reenter_core_lock_released() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let other = rt.state(Some(TestValue::Int(0)));
let derived_node = rt.derived(&[s.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(*n)),
_ => panic!("type"),
});
let other_id = other.id;
let other_binding = rt.binding.clone();
let other_core = rt.core.clone();
let on_deact: CleanupClosure = Arc::new(move || {
let h = other_binding.intern(TestValue::Int(42));
other_core.emit(other_id, h);
});
rt.binding.register_cleanup(
derived_node,
TestNodeFnCleanup {
on_deactivation: Some(on_deact),
..Default::default()
},
);
let rec = rt.subscribe_recorder(derived_node);
drop(rec);
assert_eq!(
rt.cache_value(other_id),
Some(TestValue::Int(42)),
"cleanup re-entered Core::emit lock-released; other's cache updated"
);
}
#[test]
fn on_rerun_panic_isolated_does_not_corrupt_wave() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let derived_node = rt.derived(&[s.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(n * 2)),
_ => panic!("type"),
});
let _rec = rt.subscribe_recorder(derived_node);
assert_eq!(rt.cache_value(derived_node), Some(TestValue::Int(2)));
let panicking: CleanupClosure = Arc::new(|| panic!("test panic in OnRerun"));
rt.binding.register_cleanup(
derived_node,
TestNodeFnCleanup {
on_rerun: Some(panicking),
..Default::default()
},
);
s.set(TestValue::Int(3));
assert_eq!(
rt.binding.cleanup_panics(),
vec![(derived_node, CleanupTrigger::OnRerun)],
"panicking OnRerun was caught + recorded"
);
assert_eq!(
rt.cache_value(derived_node),
Some(TestValue::Int(6)),
"wave completed despite OnRerun panic; second fn fire ran"
);
s.set(TestValue::Int(5));
assert_eq!(rt.cache_value(derived_node), Some(TestValue::Int(10)));
}
#[test]
fn on_invalidate_fires_at_cache_clear_not_replay() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let derived_node = rt.derived(&[s.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(*n)),
_ => panic!("type"),
});
let _rec = rt.subscribe_recorder(derived_node);
let on_inv_count = Arc::new(AtomicU64::new(0));
rt.binding.register_cleanup(
derived_node,
TestNodeFnCleanup {
on_invalidate: Some(counter_hook(on_inv_count.clone())),
..Default::default()
},
);
let lock_id = rt.core.alloc_lock_id();
rt.core.pause(derived_node, lock_id).unwrap();
rt.core.invalidate(s.id);
assert_eq!(
on_inv_count.load(Ordering::SeqCst),
1,
"OnInvalidate fires at cache-clear time (D058); pause buffering doesn't defer the hook"
);
rt.core.resume(derived_node, lock_id).unwrap();
assert_eq!(
on_inv_count.load(Ordering::SeqCst),
1,
"OnInvalidate did not re-fire on wire replay"
);
}
#[test]
fn non_fn_nodes_skip_on_rerun() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let on_rerun_count = Arc::new(AtomicU64::new(0));
rt.binding.register_cleanup(
s.id,
TestNodeFnCleanup {
on_rerun: Some(counter_hook(on_rerun_count.clone())),
..Default::default()
},
);
let _rec = rt.subscribe_recorder(s.id);
s.set(TestValue::Int(2));
s.set(TestValue::Int(3));
assert_eq!(
on_rerun_count.load(Ordering::SeqCst),
0,
"state nodes (no fn_id) never reach fire_regular Phase 1.5 OnRerun"
);
assert!(rt
.binding
.cleanup_calls_for(CleanupTrigger::OnRerun)
.is_empty());
}
#[test]
fn r1_3_9_b_strict_dedup_across_repopulate() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let derived_node = rt.derived(&[s.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(*n)),
_ => panic!("type"),
});
let _rec = rt.subscribe_recorder(derived_node);
let on_inv_count = Arc::new(AtomicU64::new(0));
rt.binding.register_cleanup(
derived_node,
TestNodeFnCleanup {
on_invalidate: Some(counter_hook(on_inv_count.clone())),
..Default::default()
},
);
let s_id = s.id;
let derived_node_clone = derived_node;
let core = rt.core.clone();
rt.core.batch(|| {
core.invalidate(s_id); let h = rt.binding.intern(TestValue::Int(99));
core.emit(s_id, h); core.invalidate(s_id); let _ = derived_node_clone;
});
assert_eq!(
on_inv_count.load(Ordering::SeqCst),
1,
"OnInvalidate fires AT MOST ONCE per wave per node despite re-populate (D057)"
);
let h = rt.binding.intern(TestValue::Int(7));
rt.core.emit(s_id, h);
rt.core.invalidate(s_id);
assert_eq!(
on_inv_count.load(Ordering::SeqCst),
2,
"next wave re-arms the dedup set; new OnInvalidate fires"
);
}
#[test]
fn d067_set_deps_fires_on_rerun_for_dynamic() {
let rt = TestRuntime::new();
let s1 = rt.state(Some(TestValue::Int(1)));
let s2 = rt.state(Some(TestValue::Int(2)));
let dyn_id = rt.dynamic(&[s1.id], |deps| match &deps[0] {
TestValue::Int(n) => (Some(TestValue::Int(*n)), Some(vec![0])),
_ => panic!("type"),
});
let _rec = rt.subscribe_recorder(dyn_id);
assert_eq!(rt.cache_value(dyn_id), Some(TestValue::Int(1)));
let on_rerun_count = Arc::new(AtomicU64::new(0));
rt.binding.register_cleanup(
dyn_id,
TestNodeFnCleanup {
on_rerun: Some(counter_hook(on_rerun_count.clone())),
..Default::default()
},
);
rt.core.set_deps(dyn_id, &[s2.id]).unwrap();
assert_eq!(
on_rerun_count.load(Ordering::SeqCst),
1,
"OnRerun fires from set_deps on dynamic-with-prior-fire (D067)"
);
let calls = rt.binding.cleanup_calls_for(CleanupTrigger::OnRerun);
assert_eq!(
calls,
vec![dyn_id],
"Core fired cleanup_for(dyn, OnRerun) once during set_deps"
);
}
#[test]
fn batch_multi_emit_fires_on_rerun_once_per_wave() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let derived_node = rt.derived(&[s.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(*n)),
_ => panic!("type"),
});
let _rec = rt.subscribe_recorder(derived_node);
let on_rerun_count = Arc::new(AtomicU64::new(0));
rt.binding.register_cleanup(
derived_node,
TestNodeFnCleanup {
on_rerun: Some(counter_hook(on_rerun_count.clone())),
..Default::default()
},
);
let s_id = s.id;
let core = rt.core.clone();
let binding = rt.binding.clone();
rt.core.batch(|| {
for v in [10, 20, 30] {
let h = binding.intern(TestValue::Int(v));
core.emit(s_id, h);
}
});
assert_eq!(
on_rerun_count.load(Ordering::SeqCst),
1,
"OnRerun fires once per wave despite 3 emits (R1.3.6.b coalescing)"
);
}
#[test]
fn on_deactivation_fires_on_paused_node() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let derived_node = rt.derived(&[s.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(*n)),
_ => panic!("type"),
});
let on_deact_count = Arc::new(AtomicU64::new(0));
rt.binding.register_cleanup(
derived_node,
TestNodeFnCleanup {
on_deactivation: Some(counter_hook(on_deact_count.clone())),
..Default::default()
},
);
let rec = rt.subscribe_recorder(derived_node);
let lock_id = rt.core.alloc_lock_id();
rt.core.pause(derived_node, lock_id).unwrap();
drop(rec);
assert_eq!(
on_deact_count.load(Ordering::SeqCst),
1,
"OnDeactivation fires when last sub leaves even on paused node (pause is wire-side, not lifecycle)"
);
}
#[test]
fn teardown_does_not_fire_any_cleanup_slot() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let derived_node = rt.derived(&[s.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(*n)),
_ => panic!("type"),
});
let _rec = rt.subscribe_recorder(derived_node);
let r = Arc::new(AtomicU64::new(0));
let d = Arc::new(AtomicU64::new(0));
let i = Arc::new(AtomicU64::new(0));
rt.binding.register_cleanup(
derived_node,
TestNodeFnCleanup {
on_rerun: Some(counter_hook(r.clone())),
on_deactivation: Some(counter_hook(d.clone())),
on_invalidate: Some(counter_hook(i.clone())),
},
);
rt.core.teardown(derived_node);
assert_eq!(
r.load(Ordering::SeqCst),
0,
"teardown does not fire OnRerun"
);
assert_eq!(
d.load(Ordering::SeqCst),
0,
"teardown does not fire OnDeactivation"
);
assert_eq!(
i.load(Ordering::SeqCst),
0,
"teardown does not fire OnInvalidate"
);
let calls: Vec<_> = rt
.binding
.cleanup_calls()
.into_iter()
.filter(|(n, _)| *n == derived_node)
.collect();
assert!(
calls.is_empty(),
"Core fired no cleanup_for for derived_node during teardown; got {calls:?}"
);
}
#[test]
fn d068_state_with_initial_value_skips_on_deactivation() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(7)));
let on_deact_count = Arc::new(AtomicU64::new(0));
rt.binding.register_cleanup(
s.id,
TestNodeFnCleanup {
on_deactivation: Some(counter_hook(on_deact_count.clone())),
..Default::default()
},
);
let rec = rt.subscribe_recorder(s.id);
drop(rec);
assert_eq!(
on_deact_count.load(Ordering::SeqCst),
0,
"state node skips OnDeactivation per D068 (no fn_id)"
);
assert!(rt
.binding
.cleanup_calls_for(CleanupTrigger::OnDeactivation)
.is_empty());
}
#[test]
fn d069_terminate_with_no_subs_fires_eager_wipe() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
rt.core.set_resubscribable(s.id, true);
{
let _rec = rt.subscribe_recorder(s.id);
rt.binding.store_set(s.id, "k", TestValue::Int(99));
}
assert!(
rt.binding.has_ctx(s.id),
"store still present after deactivation (R2.4.6)"
);
rt.core.complete(s.id);
assert!(
!rt.binding.has_ctx(s.id),
"D069: NodeCtxState wiped eagerly when terminate fires with no subs"
);
assert_eq!(
rt.binding.wipe_calls(),
vec![s.id],
"Core fired wipe_ctx exactly once via terminate_node's pending_wipes drain"
);
}
#[test]
fn d069_terminate_then_last_sub_drops_fires_wipe_via_subscription_drop() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
rt.core.set_resubscribable(s.id, true);
let rec = rt.subscribe_recorder(s.id);
rt.binding.store_set(s.id, "k", TestValue::Int(2026));
rt.core.complete(s.id);
assert!(
rt.binding.has_ctx(s.id),
"store still present while sub is alive (terminate didn't queue wipe)"
);
assert!(
rt.binding.wipe_calls().is_empty(),
"no wipe fired yet — terminate_node skipped because subs were live"
);
drop(rec);
assert!(
!rt.binding.has_ctx(s.id),
"D069: Subscription::Drop fired wipe when last sub dropped on terminal-resubscribable node"
);
assert_eq!(
rt.binding.wipe_calls(),
vec![s.id],
"exactly one wipe fired via Subscription::Drop direct-fire path"
);
}
#[test]
fn on_invalidate_dropped_silently_on_panic_discard_wave() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let derived_node = rt.derived(&[s.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(*n)),
_ => panic!("type"),
});
let _rec = rt.subscribe_recorder(derived_node);
let on_inv_count = Arc::new(AtomicU64::new(0));
rt.binding.register_cleanup(
derived_node,
TestNodeFnCleanup {
on_invalidate: Some(counter_hook(on_inv_count.clone())),
..Default::default()
},
);
let s_id = s.id;
let core = rt.core.clone();
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
core.batch(|| {
core.invalidate(s_id); panic!("test panic mid-wave");
});
}));
assert!(result.is_err(), "panic propagated out of batch closure");
assert_eq!(
on_inv_count.load(Ordering::SeqCst),
0,
"panic-discard drops queued OnInvalidate silently per D061"
);
let h = rt.binding.intern(TestValue::Int(42));
rt.core.emit(s_id, h);
rt.core.invalidate(s_id);
assert_eq!(
on_inv_count.load(Ordering::SeqCst),
1,
"subsequent wave fires OnInvalidate normally (no leftover state)"
);
}