mod common;
use std::sync::{Arc, Mutex};
use common::{RecordedEvent, TestBinding, TestRuntime, TestValue};
use graphrefly_core::{
BindingBoundary, Core, EqualsMode, FnId, FnResult, HandleId, LockId, Message, NodeId,
PausableMode,
};
#[test]
fn a2_max_batch_drain_iterations_default_is_10000() {
let rt = TestRuntime::new();
rt.core().set_max_batch_drain_iterations(5);
}
#[test]
#[should_panic(expected = "max_batch_drain_iterations must be > 0")]
fn a2_zero_cap_rejected() {
let rt = TestRuntime::new();
rt.core().set_max_batch_drain_iterations(0);
}
#[test]
fn a4_alloc_lock_id_starts_above_user_range() {
let rt = TestRuntime::new();
let allocated = rt.core().alloc_lock_id();
assert!(
allocated.raw() >= (1u64 << 31),
"alloc_lock_id should start at >= 2^31; got raw={}",
allocated.raw()
);
assert!(
allocated.raw() <= u64::from(u32::MAX),
"alloc_lock_id seed must fit in u32 for napi round-trip; got raw={}",
allocated.raw()
);
}
#[test]
fn a4_user_supplied_low_lock_id_does_not_collide_with_alloc() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
let user_lock = LockId::new(42);
rt.core().pause(s.id, user_lock).expect("pause");
assert!(rt.core().is_paused(s.id));
assert!(rt.core().holds_pause_lock(s.id, user_lock));
let alloc_lock = rt.core().alloc_lock_id();
assert_ne!(user_lock, alloc_lock);
rt.core().pause(s.id, alloc_lock).expect("alloc pause");
assert_eq!(rt.core().pause_lock_count(s.id), 2);
rt.core().resume(s.id, alloc_lock).expect("resume alloc");
assert!(rt.core().is_paused(s.id));
let report = rt
.core()
.resume(s.id, user_lock)
.expect("resume user")
.expect("final");
assert!(!rt.core().is_paused(s.id));
assert_eq!(report.replayed, 0);
}
#[test]
fn a7_handshake_panic_removes_sink_and_does_not_re_fire_on_next_wave() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(99)));
let panicked = Arc::new(Mutex::new(false));
let panicked_clone = panicked.clone();
let bad_sink: graphrefly_core::Sink = Arc::new(move |msgs: &[Message]| {
for m in msgs {
if matches!(m, Message::Data(_)) {
*panicked_clone.lock().unwrap() = true;
panic!("intentional sink panic on Data during handshake");
}
}
});
let panic_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _ = rt.core().subscribe(s.id, bad_sink);
}));
assert!(panic_result.is_err(), "subscribe should have unwound");
assert!(*panicked.lock().unwrap(), "the sink did get called");
let counter = Arc::new(std::sync::atomic::AtomicU64::new(0));
let counter_clone = counter.clone();
let good_sink: graphrefly_core::Sink = Arc::new(move |msgs: &[Message]| {
for _ in msgs {
counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
});
let _sub = rt.core().subscribe(s.id, good_sink);
s.set(TestValue::Int(100));
let n = counter.load(std::sync::atomic::Ordering::SeqCst);
assert!(
n > 0,
"good sink received messages — subscribe didn't deadlock against the orphaned sink"
);
}
#[test]
fn a8_post_invalidate_compute_node_cache_is_sentinel() {
let rt = TestRuntime::new();
let s1 = rt.state(Some(TestValue::Int(7)));
let n = rt.derived(&[s1.id], |deps| match deps[0] {
TestValue::Int(v) => Some(TestValue::Int(v * 10)),
_ => None,
});
let _rec = rt.subscribe_recorder(n);
assert!(rt.core().has_fired_once(n));
let cache_pre = rt.core().cache_of(n);
assert_ne!(cache_pre, HandleId::new(0), "n cached before invalidate");
rt.core().invalidate(n);
let cache_post = rt.core().cache_of(n);
assert_eq!(
cache_post,
HandleId::new(0),
"INVALIDATE clears compute cache to NO_HANDLE"
);
}
struct OverflowBinding {
inner: Arc<TestBinding>,
captured: Mutex<Vec<(NodeId, u32, usize, u64)>>,
}
impl BindingBoundary for OverflowBinding {
fn invoke_fn(
&self,
node_id: NodeId,
fn_id: FnId,
dep_data: &[graphrefly_core::DepBatch],
) -> FnResult {
self.inner.invoke_fn(node_id, fn_id, dep_data)
}
fn release_handle(&self, handle: HandleId) {
self.inner.release_handle(handle);
}
fn retain_handle(&self, handle: HandleId) {
self.inner.retain_handle(handle);
}
fn custom_equals(&self, fn_id: FnId, a: HandleId, b: HandleId) -> bool {
self.inner.custom_equals(fn_id, a, b)
}
fn synthesize_pause_overflow_error(
&self,
node_id: NodeId,
dropped_count: u32,
configured_max: usize,
lock_held_duration_ms: u64,
) -> Option<HandleId> {
self.captured.lock().unwrap().push((
node_id,
dropped_count,
configured_max,
lock_held_duration_ms,
));
Some(self.inner.intern(TestValue::Str(format!(
"overflow:n={}:dropped={}:cap={}:held={}ms",
node_id.raw(),
dropped_count,
configured_max,
lock_held_duration_ms
))))
}
}
#[test]
fn a3_pause_overflow_synthesizes_error_once_per_cycle() {
let inner = TestBinding::new();
let ovr = Arc::new(OverflowBinding {
inner: inner.clone(),
captured: Mutex::new(Vec::new()),
});
let core = Core::new(ovr.clone() as Arc<dyn BindingBoundary>);
core.set_pause_buffer_cap(Some(2));
let initial = inner.intern(TestValue::Int(0));
let s = core.register_state(initial, false).unwrap();
core.set_pausable_mode(s, PausableMode::ResumeAll).unwrap();
let events = Arc::new(Mutex::new(Vec::<RecordedEvent>::new()));
let events_clone = events.clone();
let inner_clone = inner.clone();
let sink: graphrefly_core::Sink = Arc::new(move |msgs: &[Message]| {
let mut g = events_clone.lock().unwrap();
for m in msgs {
g.push(match m {
Message::Start => RecordedEvent::Start,
Message::Dirty => RecordedEvent::Dirty,
Message::Resolved => RecordedEvent::Resolved,
Message::Data(h) => RecordedEvent::Data(inner_clone.deref(*h)),
Message::Invalidate => RecordedEvent::Invalidate,
Message::Pause(l) => RecordedEvent::Pause(*l),
Message::Resume(l) => RecordedEvent::Resume(*l),
Message::Complete => RecordedEvent::Complete,
Message::Error(h) => RecordedEvent::Error(inner_clone.deref(*h)),
Message::Teardown => RecordedEvent::Teardown,
});
}
});
let _sub = core.subscribe(s, sink);
let lock = core.alloc_lock_id();
core.pause(s, lock).expect("pause");
for v in 1..=5 {
let h = inner.intern(TestValue::Int(v));
core.emit(s, h);
}
let captured = ovr.captured.lock().unwrap().clone();
assert_eq!(
captured.len(),
1,
"synthesize_pause_overflow_error called once per overflow event; got {captured:?}"
);
let (node_id, dropped, cap, _held_ms) = captured[0];
assert_eq!(node_id, s);
assert_eq!(
dropped, 1,
"first overflow event captured at the moment synthesis schedules — \
post-terminal emits are silent"
);
assert_eq!(cap, 2);
let evs = events.lock().unwrap().clone();
let saw_error = evs.iter().any(|e| matches!(e, RecordedEvent::Error(_)));
assert!(saw_error, "subscriber received the synthesized ERROR");
let h = inner.intern(TestValue::Int(99));
core.emit(s, h);
let saw_data_99 = events
.lock()
.unwrap()
.iter()
.any(|e| matches!(e, RecordedEvent::Data(TestValue::Int(99))));
assert!(!saw_data_99, "post-terminal emits dropped");
}
#[test]
fn a3_overflow_silently_dropped_when_binding_returns_none() {
let rt = TestRuntime::new();
rt.core().set_pause_buffer_cap(Some(2));
let s = rt.state(Some(TestValue::Int(0)));
rt.core()
.set_pausable_mode(s.id, PausableMode::ResumeAll)
.unwrap();
let rec = rt.subscribe_recorder(s.id);
let lock = rt.core().alloc_lock_id();
rt.core().pause(s.id, lock).expect("pause");
for v in 1..=5 {
s.set(TestValue::Int(v));
}
let report = rt
.core()
.resume(s.id, lock)
.expect("resume")
.expect("final");
assert_eq!(report.dropped, 3, "dropped count surfaced via ResumeReport");
let saw_error = rec
.snapshot()
.iter()
.any(|e| matches!(e, RecordedEvent::Error(_)));
assert!(
!saw_error,
"no ERROR synthesized when binding returns None (backward-compat fallback)"
);
}
#[test]
fn item4_register_rejects_non_resubscribable_terminal_dep() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let _rec = rt.subscribe_recorder(s.id);
rt.core().complete(s.id);
let fn_id = rt
.binding
.register_fn(|deps: &[TestValue]| Some(deps[0].clone()));
let result = rt
.core()
.register_derived(&[s.id], fn_id, EqualsMode::Identity, false);
assert_eq!(
result,
Err(graphrefly_core::RegisterError::TerminalDep(s.id))
);
}
#[test]
fn item4_register_accepts_resubscribable_terminal_dep() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
rt.core().set_resubscribable(s.id, true);
let _rec = rt.subscribe_recorder(s.id);
rt.core().complete(s.id);
let fn_id = rt
.binding
.register_fn(|deps: &[TestValue]| Some(deps[0].clone()));
let _ok = rt
.core()
.register_derived(&[s.id], fn_id, EqualsMode::Identity, false)
.unwrap();
}
#[test]
fn item5_default_mode_consolidates_to_one_fn_fire_on_resume() {
let rt = TestRuntime::new();
let a = rt.state(Some(TestValue::Int(1)));
let calls = std::sync::Arc::new(std::sync::Mutex::new(0u32));
let calls_inner = calls.clone();
let n = rt.derived(&[a.id], move |deps| {
*calls_inner.lock().unwrap() += 1;
match deps[0] {
TestValue::Int(v) => Some(TestValue::Int(v * 10)),
_ => None,
}
});
rt.core()
.set_pausable_mode(n, PausableMode::Default)
.unwrap();
let rec = rt.subscribe_recorder(n);
assert_eq!(*calls.lock().unwrap(), 1);
let baseline_calls = 1u32;
let lock = rt.core().alloc_lock_id();
rt.core().pause(n, lock).expect("pause n");
let baseline_data = rec.data_values().len();
a.set(TestValue::Int(10));
a.set(TestValue::Int(20));
a.set(TestValue::Int(30));
assert_eq!(
*calls.lock().unwrap(),
baseline_calls,
"fn fire suppressed during pause (default mode)"
);
let mid_data = rec.data_values().len() - baseline_data;
assert_eq!(
mid_data, 0,
"no DATA emitted from n while paused-default-mode"
);
let report = rt.core().resume(n, lock).expect("resume").expect("final");
assert_eq!(
*calls.lock().unwrap(),
baseline_calls + 1,
"exactly one consolidated fn fire on RESUME"
);
assert_eq!(report.replayed, 0, "default mode has no buffered messages");
assert_eq!(rt.cache_value(n), Some(TestValue::Int(300)));
let post_data: Vec<TestValue> = rec.data_values().into_iter().skip(baseline_data).collect();
assert_eq!(
post_data,
vec![TestValue::Int(300)],
"subscriber sees single consolidated DATA"
);
}
#[test]
fn item5_default_mode_no_emit_during_pause_means_no_fire_on_resume() {
let rt = TestRuntime::new();
let a = rt.state(Some(TestValue::Int(1)));
let calls = std::sync::Arc::new(std::sync::Mutex::new(0u32));
let calls_inner = calls.clone();
let n = rt.derived(&[a.id], move |deps| {
*calls_inner.lock().unwrap() += 1;
match deps[0] {
TestValue::Int(v) => Some(TestValue::Int(v * 10)),
_ => None,
}
});
let _rec = rt.subscribe_recorder(n);
let baseline = *calls.lock().unwrap();
let lock = rt.core().alloc_lock_id();
rt.core().pause(n, lock).expect("pause");
let report = rt.core().resume(n, lock).expect("resume").expect("final");
assert_eq!(
*calls.lock().unwrap(),
baseline,
"no fire — no pending wave"
);
assert_eq!(report.replayed, 0);
}
#[test]
fn item5_off_mode_pause_is_no_op() {
let rt = TestRuntime::new();
let a = rt.state(Some(TestValue::Int(1)));
let calls = std::sync::Arc::new(std::sync::Mutex::new(0u32));
let calls_inner = calls.clone();
let n = rt.derived(&[a.id], move |deps| {
*calls_inner.lock().unwrap() += 1;
match deps[0] {
TestValue::Int(v) => Some(TestValue::Int(v * 10)),
_ => None,
}
});
rt.core().set_pausable_mode(n, PausableMode::Off).unwrap();
let _rec = rt.subscribe_recorder(n);
let baseline = *calls.lock().unwrap();
let lock = rt.core().alloc_lock_id();
rt.core().pause(n, lock).expect("pause"); assert!(
!rt.core().is_paused(n),
"Off mode treats pause() as a no-op"
);
a.set(TestValue::Int(5));
assert_eq!(
*calls.lock().unwrap(),
baseline + 1,
"fn fired immediately even after pause() — Off mode"
);
}
#[test]
fn r2_6_0_default_leaf_source_self_emit_delivers_immediately_while_self_paused() {
let rt = TestRuntime::new();
let n = rt.state(Some(TestValue::Int(1)));
assert_eq!(rt.cache_value(n.id), Some(TestValue::Int(1)));
let rec = rt.subscribe_recorder(n.id);
let lock = rt.core().alloc_lock_id();
rt.core().pause(n.id, lock).expect("pause leaf source");
assert!(rt.core().is_paused(n.id), "leaf source holds its own lock");
let baseline = rec.snapshot().len();
let baseline_data = rec.data_values().len();
n.set(TestValue::Int(42));
let post_pause: Vec<RecordedEvent> = rec.snapshot().into_iter().skip(baseline).collect();
let data_after: Vec<TestValue> = rec.data_values().into_iter().skip(baseline_data).collect();
assert_eq!(
data_after,
vec![TestValue::Int(42)],
"leaf source self-emit must be delivered immediately while self-paused (R2.6.0)"
);
assert_eq!(
rt.cache_value(n.id),
Some(TestValue::Int(42)),
"cache must advance to the self-emitted value immediately (R2.6.0)"
);
assert!(
!post_pause
.iter()
.any(|e| matches!(e, RecordedEvent::Pause(_))),
"no PAUSE surfaced to the sink for a self-paused leaf source (R2.6.0)"
);
let report = rt
.core()
.resume(n.id, lock)
.expect("resume")
.expect("final resume");
assert_eq!(
report.replayed, 0,
"RESUME of a self-paused leaf source replays nothing (R2.6.0)"
);
let total_data: Vec<TestValue> = rec.data_values().into_iter().skip(baseline_data).collect();
assert_eq!(
total_data,
vec![TestValue::Int(42)],
"exactly one DATA=42 across the whole pause/resume cycle — no duplicate, no reorder (R2.6.0)"
);
assert_eq!(
rt.cache_value(n.id),
Some(TestValue::Int(42)),
"cache unchanged by RESUME (R2.6.0)"
);
}
#[test]
fn item5_set_pausable_mode_rejects_when_paused() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
let lock = rt.core().alloc_lock_id();
rt.core().pause(s.id, lock).expect("pause");
let result = rt.core().set_pausable_mode(s.id, PausableMode::ResumeAll);
assert_eq!(
result,
Err(graphrefly_core::SetPausableModeError::WhilePaused)
);
}
#[test]
fn f2_up_rejects_tier3_data() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let n = rt.derived(&[s.id], |deps| deps.first().cloned());
let h = rt.binding.intern(TestValue::Int(99));
let result = rt.core().up(n, Message::Data(h));
assert!(
matches!(
result,
Err(graphrefly_core::UpError::TierForbidden { tier: 3 })
),
"tier 3 (Data) must be rejected by up(); got {result:?}"
);
}
#[test]
fn f2_up_rejects_tier3_resolved() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let n = rt.derived(&[s.id], |deps| deps.first().cloned());
let result = rt.core().up(n, Message::Resolved);
assert!(matches!(
result,
Err(graphrefly_core::UpError::TierForbidden { tier: 3 })
));
}
#[test]
fn f2_up_rejects_tier5_complete() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let n = rt.derived(&[s.id], |deps| deps.first().cloned());
let result = rt.core().up(n, Message::Complete);
assert!(matches!(
result,
Err(graphrefly_core::UpError::TierForbidden { tier: 5 })
));
}
#[test]
fn f2_up_rejects_tier5_error() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let n = rt.derived(&[s.id], |deps| deps.first().cloned());
let h = rt.binding.intern(TestValue::Int(0));
let result = rt.core().up(n, Message::Error(h));
assert!(matches!(
result,
Err(graphrefly_core::UpError::TierForbidden { tier: 5 })
));
}
#[test]
fn f2_up_invalidate_clears_dep_cache() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(7)));
let n = rt.derived(&[s.id], |deps| match deps[0] {
TestValue::Int(v) => Some(TestValue::Int(v * 2)),
_ => None,
});
let _rec = rt.subscribe_recorder(n);
assert_ne!(rt.core().cache_of(s.id), HandleId::new(0));
rt.core().up(n, Message::Invalidate).expect("up ok");
assert_eq!(rt.core().cache_of(s.id), HandleId::new(0));
}
#[test]
fn f2_up_pause_routes_to_each_dep() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let n = rt.derived(&[s.id], |deps| deps.first().cloned());
let _rec = rt.subscribe_recorder(n);
let lock = LockId::new(7);
rt.core().up(n, Message::Pause(lock)).expect("up pause");
assert!(
rt.core().is_paused(s.id),
"up(Pause) should pause each dep of n"
);
rt.core().up(n, Message::Resume(lock)).expect("up resume");
assert!(
!rt.core().is_paused(s.id),
"up(Resume) should resume each dep of n"
);
}
#[test]
fn f2_up_unknown_node_rejected() {
let rt = TestRuntime::new();
let bogus = graphrefly_core::NodeId::new(99_999);
let result = rt.core().up(bogus, Message::Invalidate);
assert!(matches!(
result,
Err(graphrefly_core::UpError::UnknownNode(_))
));
}
#[test]
fn f2_up_dirty_and_start_are_no_ops() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(5)));
let n = rt.derived(&[s.id], |deps| deps.first().cloned());
let _rec = rt.subscribe_recorder(n);
let cache_pre = rt.core().cache_of(s.id);
let paused_pre = rt.core().is_paused(s.id);
rt.core().up(n, Message::Dirty).expect("up dirty");
rt.core().up(n, Message::Start).expect("up start");
assert_eq!(rt.core().cache_of(s.id), cache_pre);
assert_eq!(rt.core().is_paused(s.id), paused_pre);
}
#[test]
fn slice_g_batch_multi_same_value_emit_does_not_produce_multi_resolved() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(42)));
let rec = rt.subscribe_recorder(s.id);
let baseline = rec.snapshot().len();
rt.core().batch(|| {
s.set(TestValue::Int(42));
s.set(TestValue::Int(42));
});
let post: Vec<_> = rec.snapshot()[baseline..].to_vec();
let resolved_count = post
.iter()
.filter(|e| matches!(e, RecordedEvent::Resolved))
.count();
let data_count = post
.iter()
.filter(|e| matches!(e, RecordedEvent::Data(_)))
.count();
assert!(
resolved_count <= 1,
"R1.3.3.a: at most one Resolved per wave at one node; got {resolved_count} in {post:?}"
);
assert!(
data_count >= 1,
"expected at least one Data after multi-emit batch; got 0 in {post:?}"
);
}
#[test]
fn slice_e1_replay_buffer_disabled_by_default() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
s.set(TestValue::Int(2));
s.set(TestValue::Int(3));
let rec = rt.subscribe_recorder(s.id);
let data: Vec<TestValue> = rec.data_values();
assert_eq!(data, vec![TestValue::Int(3)]);
}
#[test]
fn slice_e1_replay_buffer_replays_recent_data_to_late_subscriber() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
rt.core().set_replay_buffer_cap(s.id, Some(3));
s.set(TestValue::Int(2));
s.set(TestValue::Int(3));
s.set(TestValue::Int(4));
s.set(TestValue::Int(5));
let rec = rt.subscribe_recorder(s.id);
let data: Vec<TestValue> = rec.data_values();
assert!(
data.contains(&TestValue::Int(3))
&& data.contains(&TestValue::Int(4))
&& data.contains(&TestValue::Int(5)),
"buffered DATA replayed; got {data:?}"
);
assert!(
!data.contains(&TestValue::Int(2)),
"evicted Int(2) should NOT appear; got {data:?}"
);
}
#[test]
fn slice_e1_replay_buffer_evicts_oldest_when_cap_exceeded() {
let rt = TestRuntime::new();
let s = rt.state(None);
rt.core().set_replay_buffer_cap(s.id, Some(2));
for v in 1..=5 {
s.set(TestValue::Int(v));
}
let rec = rt.subscribe_recorder(s.id);
let data: Vec<TestValue> = rec.data_values();
assert!(data.contains(&TestValue::Int(4)));
assert!(data.contains(&TestValue::Int(5)));
assert!(!data.contains(&TestValue::Int(1)));
assert!(!data.contains(&TestValue::Int(2)));
assert!(!data.contains(&TestValue::Int(3)));
}
#[test]
fn slice_e1_set_replay_buffer_cap_to_none_drains_existing() {
let rt = TestRuntime::new();
let s = rt.state(None);
rt.core().set_replay_buffer_cap(s.id, Some(3));
s.set(TestValue::Int(1));
s.set(TestValue::Int(2));
rt.core().set_replay_buffer_cap(s.id, None);
let rec = rt.subscribe_recorder(s.id);
let data: Vec<TestValue> = rec.data_values();
assert_eq!(data, vec![TestValue::Int(2)]);
}
#[test]
fn slice_g_single_emit_equals_match_still_produces_resolved() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(5)));
let rec = rt.subscribe_recorder(s.id);
let baseline = rec.snapshot().len();
s.set(TestValue::Int(5));
let post: Vec<_> = rec.snapshot()[baseline..].to_vec();
let resolved_count = post
.iter()
.filter(|e| matches!(e, RecordedEvent::Resolved))
.count();
assert_eq!(
resolved_count, 1,
"single-emit equals-match → exactly 1 Resolved (R1.3.2.a)"
);
}