mod common;
use common::{TestRuntime, TestValue};
#[test]
fn phase_g_compute_node_releases_cache_on_deactivation() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(10)));
let d = rt.derived(&[s.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(*n * 2)),
_ => panic!("type"),
});
let rec = rt.subscribe_recorder(d);
assert_eq!(rt.cache_value(d), Some(TestValue::Int(20)));
let live_during_sub = rt.binding.live_handles();
assert!(live_during_sub > 0, "shares retained while subscribed");
rt.unsub_recorder(&rec);
let h_after = rt.core().cache_of(d);
assert_eq!(
h_after,
graphrefly_core::NO_HANDLE,
"compute node cache cleared on deactivation (R2.2.8)"
);
let live_after_dropoff = rt.binding.live_handles();
assert!(
live_after_dropoff < live_during_sub,
"deactivation must release shares; live before={live_during_sub}, after={live_after_dropoff}"
);
}
#[test]
fn phase_g_state_node_preserves_cache_on_deactivation() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(42)));
let rec = rt.subscribe_recorder(s.id);
assert_eq!(rt.cache_value(s.id), Some(TestValue::Int(42)));
rt.unsub_recorder(&rec);
assert_eq!(
rt.cache_value(s.id),
Some(TestValue::Int(42)),
"state node cache preserved across deactivation (R2.2.8 ROM rule)"
);
}
#[test]
fn phase_g_releases_dep_terminal_error_handles_on_deactivation() {
let rt = TestRuntime::new();
let upstream = rt.state(Some(TestValue::Int(1)));
let consumer = rt.derived(&[upstream.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(*n)),
_ => panic!("type"),
});
let err_handle = rt.binding.intern(TestValue::Str("upstream-failed".into()));
let baseline_refcount = rt.binding.refcount_of(err_handle);
let rec = rt.subscribe_recorder(consumer);
rt.core().error(upstream.id, err_handle);
let refcount_during_terminal = rt.binding.refcount_of(err_handle);
assert!(
refcount_during_terminal > baseline_refcount,
"error handle retained while subscribed terminal: baseline={baseline_refcount}, \
during={refcount_during_terminal}"
);
rt.unsub_recorder(&rec);
let refcount_after_drop = rt.binding.refcount_of(err_handle);
assert!(
refcount_after_drop < refcount_during_terminal,
"Phase G must release per-edge dep_terminals Error retain on deactivation: \
during={refcount_during_terminal}, after_drop={refcount_after_drop}"
);
}
#[test]
fn phase_g_resets_has_fired_once_on_deactivation() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(10)));
let calls = std::sync::Arc::new(std::sync::Mutex::new(0u32));
let calls_inner = calls.clone();
let d = rt.derived(&[s.id], move |deps| {
*calls_inner.lock().unwrap() += 1;
match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(*n)),
_ => panic!("type"),
}
});
let rec = rt.subscribe_recorder(d);
let calls_after_first = *calls.lock().unwrap();
assert_eq!(calls_after_first, 1, "fn fires once on first subscribe");
rt.unsub_recorder(&rec);
let _rec2 = rt.subscribe_recorder(d);
let calls_after_resub = *calls.lock().unwrap();
assert_eq!(
calls_after_resub, 2,
"fn re-fires after deactivate-reactivate cycle (Phase G reset has_fired_once)"
);
}
#[test]
fn phase_g_keeps_per_node_terminal_for_late_subscriber_replay() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(7)));
rt.core().set_resubscribable(s.id, true);
let rec = rt.subscribe_recorder(s.id);
rt.core().complete(s.id);
rt.unsub_recorder(&rec);
let rec2 = rt.subscribe_recorder(s.id);
let snap = rec2.snapshot();
use common::RecordedEvent;
let has_complete = snap.iter().any(|e| matches!(e, RecordedEvent::Complete));
assert!(
!has_complete,
"resubscribable + terminal: re-subscribe resets to fresh lifecycle (R2.2.7.a)"
);
let has_data_7 = snap
.iter()
.any(|e| matches!(e, RecordedEvent::Data(TestValue::Int(7))));
assert!(has_data_7, "state cache preserved across reset (R2.2.8)");
}
#[test]
fn phase_g_does_not_corrupt_state_node_status_on_unsubscribe_resubscribe() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(99)));
for _ in 0..3 {
let rec = rt.subscribe_recorder(s.id);
assert_eq!(rt.cache_value(s.id), Some(TestValue::Int(99)));
rt.unsub_recorder(&rec);
assert_eq!(
rt.cache_value(s.id),
Some(TestValue::Int(99)),
"state cache stable across deactivate cycles"
);
}
}
#[test]
fn phase_g_preserves_terminal_slot_for_non_resubscribable_rejection() {
use graphrefly_core::{Sink, SubscribeError};
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(7)));
let rec = rt.subscribe_recorder(s.id);
rt.core().complete(s.id);
rt.unsub_recorder(&rec);
let sink: Sink = std::sync::Arc::new(|_msgs| {});
match rt.core().try_subscribe(s.id, sink) {
Err(SubscribeError::TornDown { node }) => assert_eq!(node, s.id),
Err(e) => panic!("expected TornDown, got Err({e:?})"),
Ok(_) => panic!(
"expected TornDown rejection — Phase G must NOT clear `rec.terminal` (D121); \
non-resubscribable terminal node must continue to reject after Phase G ran"
),
}
}
#[test]
fn phase_g_skips_cache_clear_when_cleanup_hook_re_subscribes() {
use std::sync::Mutex;
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(10)));
let d = rt.derived(&[s.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(*n * 100)),
_ => panic!("type"),
});
let rec1 = rt.subscribe_recorder(d);
assert_eq!(rt.cache_value(d), Some(TestValue::Int(1000)));
let _cache_h = rt.core().cache_of(d);
let live_before_drop = rt.binding.live_handles();
assert!(live_before_drop > 0);
let new_sub_slot: Mutex<Option<graphrefly_core::SubscriptionId>> = Mutex::new(None);
rt.unsub_recorder(&rec1);
let rec2 = rt.subscribe_recorder(d);
*new_sub_slot.lock().unwrap() = None; let _keep_alive = rec2; let new_cache_h = rt.core().cache_of(d);
assert!(new_cache_h != graphrefly_core::NO_HANDLE);
assert_eq!(rt.cache_value(d), Some(TestValue::Int(1000)));
let live_now = rt.binding.live_handles();
assert!(
live_now > 0,
"after re-subscribe: re-activated state holds live shares"
);
}
#[test]
fn r2_2_7_b_rejects_subscribe_between_complete_and_teardown_cascade() {
use graphrefly_core::{Sink, SubscribeError};
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let rec = rt.subscribe_recorder(s.id);
rt.core().complete(s.id);
rt.unsub_recorder(&rec);
let sink: Sink = std::sync::Arc::new(|_msgs| {});
match rt.core().try_subscribe(s.id, sink) {
Err(SubscribeError::TornDown { .. }) => {}
Err(e) => panic!("expected TornDown, got Err({e:?})"),
Ok(_) => panic!(
"R2.2.7.b: non-resubscribable terminal node must reject subscribe \
regardless of whether TEARDOWN already cascaded"
),
}
}