mod common;
use common::{RecordedEvent, TestRuntime, TestValue};
use graphrefly_core::SubscribeError;
#[test]
fn subscribe_to_non_resubscribable_completed_returns_torn_down_error() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(42)));
rt.core().complete(s.id);
let sink: graphrefly_core::Sink = std::sync::Arc::new(|_msgs| {});
match rt.core().try_subscribe(s.id, sink) {
Err(SubscribeError::TornDown { node }) => assert_eq!(node, s.id),
Err(e) => panic!("expected TornDown, got Err({e:?})"),
Ok(_) => panic!("expected TornDown, got Ok(_)"),
}
}
#[test]
fn subscribe_to_non_resubscribable_completed_panics() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(42)));
rt.core().complete(s.id);
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _rec = rt.subscribe_recorder(s.id);
}));
assert!(
result.is_err(),
"subscribe to non-resubscribable completed node must panic (R2.2.7.b)"
);
}
#[test]
fn subscribe_to_non_resubscribable_errored_returns_torn_down_error() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(42)));
let err = rt.binding.intern(TestValue::Str("boom".into()));
rt.core().error(s.id, err);
let sink: graphrefly_core::Sink = std::sync::Arc::new(|_msgs| {});
match rt.core().try_subscribe(s.id, sink) {
Err(SubscribeError::TornDown { node }) => assert_eq!(node, s.id),
Err(e) => panic!("expected TornDown, got Err({e:?})"),
Ok(_) => panic!("expected TornDown, got Ok(_)"),
}
}
#[test]
fn subscribe_to_non_resubscribable_torndown_returns_torn_down_error() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(5)));
rt.core().teardown(s.id);
let sink: graphrefly_core::Sink = std::sync::Arc::new(|_msgs| {});
match rt.core().try_subscribe(s.id, sink) {
Err(SubscribeError::TornDown { node }) => assert_eq!(node, s.id),
Err(e) => panic!("expected TornDown, got Err({e:?})"),
Ok(_) => panic!("expected TornDown, got Ok(_)"),
}
}
#[test]
fn resubscribable_late_subscriber_resets_terminal_state() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(42)));
rt.core().set_resubscribable(s.id, true);
rt.core().complete(s.id);
assert!(!rt
.core()
.holds_pause_lock(s.id, graphrefly_core::LockId::new(0)));
let rec = rt.subscribe_recorder(s.id);
let snap = rec.snapshot();
let has_complete = snap.iter().any(|e| matches!(e, RecordedEvent::Complete));
assert!(
!has_complete,
"resubscribable: terminal cleared on resubscribe"
);
let has_data = snap
.iter()
.any(|e| matches!(e, RecordedEvent::Data(TestValue::Int(42))));
assert!(has_data, "state cache survives reset");
}
#[test]
fn resubscribable_resumes_after_reset_accepts_new_emits() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(42)));
rt.core().set_resubscribable(s.id, true);
rt.core().complete(s.id);
let rec = rt.subscribe_recorder(s.id);
let baseline = rec.snapshot().len();
s.set(TestValue::Int(100));
let snap = rec.snapshot();
let has_new_data = snap[baseline..]
.iter()
.any(|e| matches!(e, RecordedEvent::Data(TestValue::Int(100))));
assert!(has_new_data, "post-reset emit should propagate");
}
#[test]
fn resubscribable_clears_dep_handles_for_compute_nodes() {
let rt = TestRuntime::new();
let a = rt.state(Some(TestValue::Int(10)));
let calls = std::sync::Arc::new(std::sync::Mutex::new(0u32));
let calls_inner = calls.clone();
let b = rt.derived(&[a.id], move |deps| {
*calls_inner.lock().unwrap() += 1;
match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(*n)),
_ => None,
}
});
rt.core().set_resubscribable(b, true);
let rec1 = rt.subscribe_recorder(b);
assert_eq!(*calls.lock().unwrap(), 1);
rt.unsub_recorder(&rec1);
rt.core().complete(b);
let _rec2 = rt.subscribe_recorder(b);
assert!(
*calls.lock().unwrap() >= 2,
"fn re-fires post-resubscribe-reset on compute node"
);
}
#[test]
fn set_resubscribable_after_subscribe_panics() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
let _rec = rt.subscribe_recorder(s.id);
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
rt.core().set_resubscribable(s.id, true);
}));
assert!(
result.is_err(),
"set_resubscribable after first subscribe should panic"
);
}
#[test]
fn resubscribable_resets_pause_state_drops_buffer() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
rt.core().set_resubscribable(s.id, true);
let rec = rt.subscribe_recorder(s.id);
let lock = rt.core().alloc_lock_id();
rt.core().pause(s.id, lock).expect("pause");
s.set(TestValue::Int(1));
s.set(TestValue::Int(2));
rt.core().complete(s.id);
rt.unsub_recorder(&rec);
let rec2 = rt.subscribe_recorder(s.id);
let snap = rec2.snapshot();
let buffered_data_post_reset = snap
.iter()
.filter(|e| matches!(e, RecordedEvent::Data(_)))
.count();
assert_eq!(
buffered_data_post_reset, 1,
"exactly one DATA from cache; buffered DATAs not replayed post-reset"
);
assert!(!rt.core().is_paused(s.id), "pause state cleared on reset");
}
#[test]
fn resubscribable_resets_after_teardown() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(42)));
rt.core().set_resubscribable(s.id, true);
rt.core().teardown(s.id);
let rec = rt.subscribe_recorder(s.id);
let snap = rec.snapshot();
let has_complete = snap.iter().any(|e| matches!(e, RecordedEvent::Complete));
let has_teardown = snap.iter().any(|e| matches!(e, RecordedEvent::Teardown));
assert!(
!has_complete,
"resubscribable + torn-down: reset clears COMPLETE; subscriber sees fresh lifecycle"
);
assert!(
!has_teardown,
"resubscribable + torn-down: reset clears TEARDOWN; subscriber sees fresh lifecycle"
);
let has_data_42 = snap
.iter()
.any(|e| matches!(e, RecordedEvent::Data(TestValue::Int(42))));
assert!(has_data_42, "state cache survives resubscribable reset");
s.set(TestValue::Int(999));
assert_eq!(
rt.cache_value(s.id),
Some(TestValue::Int(999)),
"post-reset emit advances cache in the fresh lifecycle"
);
}
#[test]
fn resubscribable_resets_after_any_terminal_including_teardown() {
let rt = TestRuntime::new();
let a = rt.state(Some(TestValue::Int(1)));
rt.core().set_resubscribable(a.id, true);
rt.core().complete(a.id);
let rec_a = rt.subscribe_recorder(a.id);
let snap_a = rec_a.snapshot();
let a_has_complete = snap_a.iter().any(|e| matches!(e, RecordedEvent::Complete));
assert!(!a_has_complete, "complete: reset clears terminal");
let b = rt.state(Some(TestValue::Int(1)));
rt.core().set_resubscribable(b.id, true);
rt.core().teardown(b.id);
let rec_b = rt.subscribe_recorder(b.id);
let snap_b = rec_b.snapshot();
let b_has_complete = snap_b.iter().any(|e| matches!(e, RecordedEvent::Complete));
let b_has_teardown = snap_b.iter().any(|e| matches!(e, RecordedEvent::Teardown));
assert!(
!b_has_complete,
"teardown: reset clears COMPLETE (D118 — TEARDOWN no longer blocks reset)"
);
assert!(
!b_has_teardown,
"teardown: reset clears TEARDOWN (D118 — TEARDOWN no longer blocks reset)"
);
}
#[test]
fn subscribe_to_non_resubscribable_terminated_with_invalidated_cache_returns_torn_down() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(42)));
rt.core().invalidate(s.id); rt.core().complete(s.id);
let sink: graphrefly_core::Sink = std::sync::Arc::new(|_msgs| {});
match rt.core().try_subscribe(s.id, sink) {
Err(SubscribeError::TornDown { node }) => assert_eq!(node, s.id),
Err(e) => panic!("expected TornDown, got Err({e:?})"),
Ok(_) => panic!("expected TornDown, got Ok(_)"),
}
}