#![allow(dead_code)]
mod common;
use std::sync::{Arc, Mutex};
use graphrefly_core::{EqualsMode, FnEmission, FnResult, Message, NodeId, Sink};
use smallvec::smallvec;
use common::{RecordedEvent, TestRuntime, TestValue};
#[derive(Clone, Debug, PartialEq)]
enum LogTier {
Dirty,
Tier3, Invalidate,
Complete,
Error,
Teardown,
Start,
Pause,
Resume,
}
#[derive(Clone)]
struct GlobalLog {
inner: Arc<Mutex<Vec<(NodeId, LogTier)>>>,
}
impl GlobalLog {
fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(Vec::new())),
}
}
fn sink_for(&self, node: NodeId) -> Sink {
let inner = self.inner.clone();
Arc::new(move |msgs: &[Message]| {
let mut g = inner.lock().expect("global log");
for m in msgs {
let tier = match m {
Message::Start => LogTier::Start,
Message::Dirty => LogTier::Dirty,
Message::Data(_) | Message::Resolved => LogTier::Tier3,
Message::Invalidate => LogTier::Invalidate,
Message::Complete => LogTier::Complete,
Message::Error(_) => LogTier::Error,
Message::Teardown => LogTier::Teardown,
Message::Pause(_) => LogTier::Pause,
Message::Resume(_) => LogTier::Resume,
};
g.push((node, tier));
}
})
}
fn snapshot(&self) -> Vec<(NodeId, LogTier)> {
self.inner.lock().expect("global log").clone()
}
fn reset(&self) {
self.inner.lock().expect("global log").clear();
}
}
#[track_caller]
fn assert_dirty_before_settle(entries: &[(NodeId, LogTier)]) {
let last_dirty_idx = entries
.iter()
.rposition(|(_, t)| matches!(t, LogTier::Dirty));
let first_settle_idx = entries
.iter()
.position(|(_, t)| matches!(t, LogTier::Tier3 | LogTier::Invalidate));
if let (Some(d), Some(s)) = (last_dirty_idx, first_settle_idx) {
assert!(
d < s,
"R1.3.1.b violation: settle (idx {s}) precedes a DIRTY (idx {d}) in {entries:?}"
);
}
}
struct Diamond {
runtime: TestRuntime,
a: NodeId,
b: NodeId,
c: NodeId,
d: NodeId,
log: GlobalLog,
}
impl Diamond {
fn new() -> Self {
let runtime = TestRuntime::new();
let a_state = runtime.state(Some(TestValue::Int(0)));
let a = a_state.id;
let b = runtime.derived(&[a], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(n * 2)),
_ => None,
});
let c = runtime.derived(&[a], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(n + 100)),
_ => None,
});
let d = runtime.derived(&[b, c], |deps| match (&deps[0], &deps[1]) {
(TestValue::Int(b), TestValue::Int(c)) => Some(TestValue::Int(b + c)),
_ => None,
});
let log = GlobalLog::new();
for n in [a, b, c, d] {
std::mem::forget(runtime.core.subscribe(n, log.sink_for(n)));
}
log.reset();
Self {
runtime,
a,
b,
c,
d,
log,
}
}
fn emit_a(&self, value: i64) {
let h = self.runtime.binding.intern(TestValue::Int(value));
self.runtime.core.emit(self.a, h);
}
}
#[test]
fn r1_3_1_b_diamond_dirty_first_across_nodes() {
let g = Diamond::new();
g.emit_a(7);
let entries = g.log.snapshot();
let nodes_with_dirty: Vec<NodeId> = entries
.iter()
.filter_map(|(n, t)| {
if matches!(t, LogTier::Dirty) {
Some(*n)
} else {
None
}
})
.collect();
assert_eq!(
nodes_with_dirty.len(),
4,
"expected one DIRTY per node (A,B,C,D); got {entries:?}"
);
assert_dirty_before_settle(&entries);
}
#[test]
fn r2_7_1_diamond_d_fires_once_per_wave() {
let g = Diamond::new();
g.emit_a(5);
let entries = g.log.snapshot();
let d_settles = entries
.iter()
.filter(|(n, t)| *n == g.d && matches!(t, LogTier::Tier3))
.count();
assert_eq!(d_settles, 1, "D should settle exactly once per source emit");
let d_dirty = entries
.iter()
.filter(|(n, t)| *n == g.d && matches!(t, LogTier::Dirty))
.count();
assert_eq!(d_dirty, 1, "D should dirty exactly once per source emit");
}
#[test]
fn r1_3_1_b_diamond_multi_emit_outside_batch_still_orders_dirty_first() {
let g = Diamond::new();
g.emit_a(1);
g.emit_a(2);
let entries = g.log.snapshot();
let mut segments: Vec<Vec<(NodeId, LogTier)>> = vec![Vec::new()];
let mut last_was_settle = false;
for e in &entries {
if last_was_settle && matches!(e.1, LogTier::Dirty) {
segments.push(Vec::new());
}
last_was_settle = matches!(e.1, LogTier::Tier3 | LogTier::Invalidate);
segments.last_mut().unwrap().push(e.clone());
}
for seg in &segments {
assert_dirty_before_settle(seg);
}
}
#[test]
fn r1_3_1_b_complex_topology_dirty_first() {
let runtime = TestRuntime::new();
let a_state = runtime.state(Some(TestValue::Int(0)));
let a = a_state.id;
let mk_unary = |coeff: i64, offset: i64| {
move |deps: &[TestValue]| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(n * coeff + offset)),
_ => None,
}
};
let b = runtime.derived(&[a], mk_unary(2, 0));
let c = runtime.derived(&[a], mk_unary(1, 100));
let e = runtime.derived(&[a], mk_unary(3, 1));
let d = runtime.derived(&[b, c], |deps| match (&deps[0], &deps[1]) {
(TestValue::Int(b), TestValue::Int(c)) => Some(TestValue::Int(b + c)),
_ => None,
});
let f = runtime.derived(&[d, e], |deps| match (&deps[0], &deps[1]) {
(TestValue::Int(d), TestValue::Int(e)) => Some(TestValue::Int(d + e)),
_ => None,
});
let log = GlobalLog::new();
for &n in &[a, b, c, d, e, f] {
std::mem::forget(runtime.core.subscribe(n, log.sink_for(n)));
}
log.reset();
let h = runtime.binding.intern(TestValue::Int(10));
runtime.core.emit(a, h);
let entries = log.snapshot();
assert_dirty_before_settle(&entries);
for n in [a, b, c, d, e, f] {
let dirty_count = entries
.iter()
.filter(|(nn, t)| *nn == n && matches!(t, LogTier::Dirty))
.count();
let settle_count = entries
.iter()
.filter(|(nn, t)| *nn == n && matches!(t, LogTier::Tier3))
.count();
assert_eq!(dirty_count, 1, "node {n:?} should DIRTY once");
assert_eq!(settle_count, 1, "node {n:?} should settle once");
}
if let TestValue::Int(v) = runtime.cache_value(f).unwrap() {
assert_eq!(v, 161);
} else {
panic!("expected Int");
}
}
#[test]
fn batch_closure_coalesces_two_state_emits_into_one_wave() {
let runtime = TestRuntime::new();
let a_state = runtime.state(Some(TestValue::Int(0)));
let b_state = runtime.state(Some(TestValue::Int(100)));
let sum = runtime.derived(&[a_state.id, b_state.id], |deps| {
match (&deps[0], &deps[1]) {
(TestValue::Int(a), TestValue::Int(b)) => Some(TestValue::Int(a + b)),
_ => None,
}
});
let rec = runtime.subscribe_recorder(sum);
let baseline_data_count = rec
.snapshot()
.iter()
.filter(|e| matches!(e, common::RecordedEvent::Data(_)))
.count();
let h_a = runtime.binding.intern(TestValue::Int(7));
let h_b = runtime.binding.intern(TestValue::Int(13));
runtime.core.batch(|| {
runtime.core.emit(a_state.id, h_a);
runtime.core.emit(b_state.id, h_b);
});
let post = rec.snapshot();
let post_data_count = post
.iter()
.filter(|e| matches!(e, common::RecordedEvent::Data(_)))
.count();
let new_data = post_data_count - baseline_data_count;
assert_eq!(
new_data, 1,
"sum should DATA exactly once after coalesced batch — got events {post:?}"
);
if let TestValue::Int(v) = runtime.cache_value(sum).unwrap() {
assert_eq!(v, 7 + 13);
} else {
panic!("expected Int");
}
}
#[test]
fn batch_diamond_d_fires_once_when_two_state_emits_coalesce() {
let runtime = TestRuntime::new();
let s_left = runtime.state(Some(TestValue::Int(0)));
let s_right = runtime.state(Some(TestValue::Int(0)));
let b = runtime.derived(&[s_left.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(n * 2)),
_ => None,
});
let c = runtime.derived(&[s_right.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(n + 100)),
_ => None,
});
let d_fire_count = Arc::new(Mutex::new(0u32));
let d_fire_count_inner = d_fire_count.clone();
let d = runtime.derived(&[b, c], move |deps| {
*d_fire_count_inner.lock().expect("d count") += 1;
match (&deps[0], &deps[1]) {
(TestValue::Int(b), TestValue::Int(c)) => Some(TestValue::Int(b + c)),
_ => None,
}
});
let _rec = runtime.subscribe_recorder(d);
let baseline = *d_fire_count.lock().expect("d count");
let h_l = runtime.binding.intern(TestValue::Int(5));
let h_r = runtime.binding.intern(TestValue::Int(10));
runtime.core.batch(|| {
runtime.core.emit(s_left.id, h_l);
runtime.core.emit(s_right.id, h_r);
});
let after = *d_fire_count.lock().expect("d count");
assert_eq!(
after - baseline,
1,
"D fn should fire exactly once for the coalesced wave (was {baseline}, now {after})"
);
if let TestValue::Int(v) = runtime.cache_value(d).unwrap() {
assert_eq!(v, 10 + 110); }
}
#[test]
fn batch_nested_only_outer_drains() {
let runtime = TestRuntime::new();
let s = runtime.state(Some(TestValue::Int(0)));
let derived = runtime.derived(&[s.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(n + 1)),
_ => None,
});
let rec = runtime.subscribe_recorder(derived);
let pre = rec
.snapshot()
.iter()
.filter(|e| matches!(e, common::RecordedEvent::Data(_)))
.count();
let h1 = runtime.binding.intern(TestValue::Int(1));
let h2 = runtime.binding.intern(TestValue::Int(2));
let h3 = runtime.binding.intern(TestValue::Int(3));
runtime.core.batch(|| {
runtime.core.emit(s.id, h1);
runtime.core.batch(|| {
runtime.core.emit(s.id, h2);
});
runtime.core.emit(s.id, h3);
});
let post = rec
.snapshot()
.iter()
.filter(|e| matches!(e, common::RecordedEvent::Data(_)))
.count();
assert_eq!(
post - pre,
1,
"nested batch should not pre-drain; outer should drain once"
);
if let TestValue::Int(v) = runtime.cache_value(derived).unwrap() {
assert_eq!(v, 4); }
}
#[test]
fn begin_batch_raii_drains_on_drop() {
let runtime = TestRuntime::new();
let s = runtime.state(Some(TestValue::Int(0)));
let derived = runtime.derived(&[s.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(n * 2)),
_ => None,
});
let rec = runtime.subscribe_recorder(derived);
let pre = rec
.snapshot()
.iter()
.filter(|e| matches!(e, common::RecordedEvent::Data(_)))
.count();
{
let _g = runtime.core.begin_batch();
let h1 = runtime.binding.intern(TestValue::Int(7));
let h2 = runtime.binding.intern(TestValue::Int(11));
runtime.core.emit(s.id, h1);
runtime.core.emit(s.id, h2);
let mid = rec
.snapshot()
.iter()
.filter(|e| matches!(e, common::RecordedEvent::Data(_)))
.count();
assert_eq!(
mid,
pre,
"no drain expected mid-batch (BatchGuard alive); got events {:?}",
rec.snapshot()
);
}
let post = rec
.snapshot()
.iter()
.filter(|e| matches!(e, common::RecordedEvent::Data(_)))
.count();
assert_eq!(post - pre, 1, "BatchGuard drop should drain the wave");
if let TestValue::Int(v) = runtime.cache_value(derived).unwrap() {
assert_eq!(v, 22); }
}
#[test]
fn batch_with_complete_inside_drains_terminal_at_scope_end() {
let runtime = TestRuntime::new();
let s = runtime.state(Some(TestValue::Int(0)));
let rec = runtime.subscribe_recorder(s.id);
let baseline_len = rec.snapshot().len();
runtime.core.batch(|| {
let h = runtime.binding.intern(TestValue::Int(5));
runtime.core.emit(s.id, h);
runtime.core.complete(s.id);
});
let events = rec.snapshot();
let new_events = &events[baseline_len..];
let kinds: Vec<&str> = new_events
.iter()
.map(|e| match e {
common::RecordedEvent::Dirty => "Dirty",
common::RecordedEvent::Data(_) => "Data",
common::RecordedEvent::Resolved => "Resolved",
common::RecordedEvent::Complete => "Complete",
_ => "other",
})
.collect();
let p_dirty = kinds.iter().position(|x| *x == "Dirty");
let p_data = kinds.iter().position(|x| *x == "Data");
let p_complete = kinds.iter().position(|x| *x == "Complete");
assert!(
matches!((p_dirty, p_data, p_complete), (Some(d), Some(da), Some(c)) if d < da && da < c),
"tier ordering violated; new events {new_events:?}"
);
}
#[test]
fn batch_panic_restores_state_node_caches() {
use std::panic;
let runtime = TestRuntime::new();
let s = runtime.state(Some(TestValue::Int(7)));
let s_id = s.id;
let initial_cache = runtime.cache_value(s_id).expect("initial cache");
assert_eq!(initial_cache, TestValue::Int(7));
let core = runtime.core.clone();
let binding = runtime.binding.clone();
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
core.batch(|| {
let h_99 = binding.intern(TestValue::Int(99));
core.emit(s_id, h_99);
let mid = core.cache_of(s_id);
assert_eq!(mid, h_99, "mid-batch cache should hold the new value");
panic!("user code threw mid-batch");
});
}));
assert!(result.is_err(), "panic should propagate out of batch()");
let post_cache = runtime.cache_value(s_id).expect("post-panic cache");
assert_eq!(
post_cache,
TestValue::Int(7),
"BatchGuard panic-discard should restore state-node cache to pre-wave value"
);
}
#[test]
fn batch_panic_restores_multi_emit_state_node_to_first_pre_wave_value() {
use std::panic;
let runtime = TestRuntime::new();
let s = runtime.state(Some(TestValue::Int(0)));
let s_id = s.id;
let core = runtime.core.clone();
let binding = runtime.binding.clone();
let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
core.batch(|| {
let h1 = binding.intern(TestValue::Int(1));
let h2 = binding.intern(TestValue::Int(2));
let h3 = binding.intern(TestValue::Int(3));
core.emit(s_id, h1);
core.emit(s_id, h2);
core.emit(s_id, h3);
panic!("user threw");
});
}));
assert_eq!(
runtime.cache_value(s_id).unwrap(),
TestValue::Int(0),
"panic-discard should restore to the value BEFORE the first emit (0), not the partially-committed last value"
);
}
#[test]
fn batch_success_does_not_revert_caches() {
let runtime = TestRuntime::new();
let s = runtime.state(Some(TestValue::Int(0)));
let s_id = s.id;
let core = runtime.core.clone();
let binding = runtime.binding.clone();
core.batch(|| {
let h = binding.intern(TestValue::Int(42));
core.emit(s_id, h);
});
assert_eq!(
runtime.cache_value(s_id).unwrap(),
TestValue::Int(42),
"successful batch should commit the new cache"
);
}
#[test]
fn batch_panic_discards_pending_wave() {
use std::panic;
let runtime = TestRuntime::new();
let s = runtime.state(Some(TestValue::Int(0)));
let derived = runtime.derived(&[s.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(n + 1)),
_ => None,
});
let rec = runtime.subscribe_recorder(derived);
let pre = rec
.snapshot()
.iter()
.filter(|e| matches!(e, common::RecordedEvent::Data(_)))
.count();
let core = runtime.core.clone();
let s_id = s.id;
let binding = runtime.binding.clone();
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
core.batch(|| {
let h = binding.intern(TestValue::Int(42));
core.emit(s_id, h);
panic!("user code threw mid-batch");
});
}));
assert!(
result.is_err(),
"expected panic to propagate out of batch()"
);
let post = rec
.snapshot()
.iter()
.filter(|e| matches!(e, common::RecordedEvent::Data(_)))
.count();
assert_eq!(
post,
pre,
"derived sink should not see tier-3 from a panicked batch; got events {:?}",
rec.snapshot()
);
let h2 = runtime.binding.intern(TestValue::Int(100));
runtime.core.emit(s.id, h2);
let after_recover = rec
.snapshot()
.iter()
.filter(|e| matches!(e, common::RecordedEvent::Data(_)))
.count();
assert!(
after_recover > post,
"post-panic recovery emit should drive a normal wave; events {:?}",
rec.snapshot()
);
}
#[test]
fn batch_with_pause_resume_buffers_and_replays_at_resume() {
let runtime = TestRuntime::new();
let s = runtime.state(Some(TestValue::Int(0)));
let derived = runtime.derived(&[s.id], |deps| match &deps[0] {
TestValue::Int(n) => Some(TestValue::Int(n * 10)),
_ => None,
});
let rec = runtime.subscribe_recorder(derived);
let pre = rec
.snapshot()
.iter()
.filter(|e| matches!(e, common::RecordedEvent::Data(_)))
.count();
let lock = runtime.core.alloc_lock_id();
runtime.core.batch(|| {
runtime.core.pause(derived, lock).expect("pause");
let h1 = runtime.binding.intern(TestValue::Int(1));
let h2 = runtime.binding.intern(TestValue::Int(2));
runtime.core.emit(s.id, h1);
runtime.core.emit(s.id, h2);
});
let mid = rec
.snapshot()
.iter()
.filter(|e| matches!(e, common::RecordedEvent::Data(_)))
.count();
assert_eq!(
mid,
pre,
"paused derived should not emit tier-3 mid-batch; got events {:?}",
rec.snapshot()
);
let report = runtime
.core
.resume(derived, lock)
.expect("resume")
.expect("final lock report");
let _ = report;
let post = rec
.snapshot()
.iter()
.filter(|e| matches!(e, common::RecordedEvent::Data(_)))
.count();
assert!(
post > mid,
"resume should replay buffered tier-3; events {:?}",
rec.snapshot()
);
}
#[test]
fn flush_preserves_per_node_message_order_within_tier() {
let runtime = TestRuntime::new();
let s = runtime.state(Some(TestValue::Int(0)));
let rec = runtime.subscribe_recorder(s.id);
let pre_data: Vec<TestValue> = rec.data_values();
let h1 = runtime.binding.intern(TestValue::Int(1));
let h2 = runtime.binding.intern(TestValue::Int(2));
let h3 = runtime.binding.intern(TestValue::Int(3));
runtime.core.batch(|| {
runtime.core.emit(s.id, h1);
runtime.core.emit(s.id, h2);
runtime.core.emit(s.id, h3);
});
let post_data: Vec<TestValue> = rec.data_values();
let new_data: Vec<TestValue> = post_data[pre_data.len()..].to_vec();
assert_eq!(
new_data,
vec![TestValue::Int(1), TestValue::Int(2), TestValue::Int(3),],
"coalesced emits should deliver in emit order"
);
}
#[test]
fn batch_fn_result_multi_data_delivers_all_values() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
let binding = rt.binding.clone();
let fn_id = rt
.binding
.register_raw_fn(move |dep_data: &[graphrefly_core::DepBatch]| {
let input = dep_data[0].latest();
let v = binding.deref(input);
let TestValue::Int(n) = v else {
panic!("expected Int")
};
let h1 = binding.intern(TestValue::Int(n * 10));
let h2 = binding.intern(TestValue::Int(n * 100));
FnResult::Batch {
emissions: smallvec![FnEmission::Data(h1), FnEmission::Data(h2)],
tracked: None,
}
});
let derived = rt
.core
.register_derived(&[s.id], fn_id, EqualsMode::Identity, false)
.unwrap();
let rec = rt.subscribe_recorder(derived);
let events = rec.snapshot();
assert!(
events.contains(&RecordedEvent::Start),
"should see Start from handshake"
);
s.set(TestValue::Int(5));
let data = rec.data_values();
assert_eq!(
data,
vec![
TestValue::Int(0), TestValue::Int(0), TestValue::Int(50), TestValue::Int(500), ],
);
}
#[test]
fn batch_fn_result_data_then_complete() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let binding = rt.binding.clone();
let fn_id = rt
.binding
.register_raw_fn(move |dep_data: &[graphrefly_core::DepBatch]| {
let h = dep_data[0].latest();
let v = binding.deref(h);
let TestValue::Int(n) = v else {
panic!("expected Int")
};
let data_h = binding.intern(TestValue::Int(n * 2));
FnResult::Batch {
emissions: smallvec![FnEmission::Data(data_h), FnEmission::Complete],
tracked: None,
}
});
let derived = rt
.core
.register_derived(&[s.id], fn_id, EqualsMode::Identity, false)
.unwrap();
let rec = rt.subscribe_recorder(derived);
let events = rec.snapshot();
assert!(events.contains(&RecordedEvent::Data(TestValue::Int(2))));
assert!(events.contains(&RecordedEvent::Complete));
}
#[test]
fn batch_fn_result_data_then_error() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let binding = rt.binding.clone();
let fn_id = rt
.binding
.register_raw_fn(move |dep_data: &[graphrefly_core::DepBatch]| {
let h = dep_data[0].latest();
let v = binding.deref(h);
let TestValue::Int(n) = v else {
panic!("expected Int")
};
let data_h = binding.intern(TestValue::Int(n * 3));
let err_h = binding.intern(TestValue::Str("boom".into()));
FnResult::Batch {
emissions: smallvec![FnEmission::Data(data_h), FnEmission::Error(err_h)],
tracked: None,
}
});
let derived = rt
.core
.register_derived(&[s.id], fn_id, EqualsMode::Identity, false)
.unwrap();
let rec = rt.subscribe_recorder(derived);
let events = rec.snapshot();
assert!(events.contains(&RecordedEvent::Data(TestValue::Int(3))));
assert!(events.contains(&RecordedEvent::Error(TestValue::Str("boom".into()))));
}
#[test]
fn batch_fn_result_dirty_queued_once_per_wave() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let binding = rt.binding.clone();
let fn_id = rt
.binding
.register_raw_fn(move |dep_data: &[graphrefly_core::DepBatch]| {
let h = dep_data[0].latest();
let v = binding.deref(h);
let TestValue::Int(n) = v else {
panic!("expected Int")
};
let h1 = binding.intern(TestValue::Int(n));
let h2 = binding.intern(TestValue::Int(n + 1));
let h3 = binding.intern(TestValue::Int(n + 2));
FnResult::Batch {
emissions: smallvec![
FnEmission::Data(h1),
FnEmission::Data(h2),
FnEmission::Data(h3),
],
tracked: None,
}
});
let derived = rt
.core
.register_derived(&[s.id], fn_id, EqualsMode::Identity, false)
.unwrap();
let rec = rt.subscribe_recorder(derived);
s.set(TestValue::Int(10));
let events = rec.snapshot();
let dirty_count = events
.iter()
.filter(|e| matches!(e, RecordedEvent::Dirty))
.count();
let data_count = events
.iter()
.filter(|e| matches!(e, RecordedEvent::Data(_)))
.count();
assert_eq!(
dirty_count, 2,
"expected exactly 2 Dirty events (one per wave)"
);
assert_eq!(
data_count, 6,
"expected 6 Data events (3 per wave × 2 waves)"
);
}
#[test]
fn batch_fn_result_no_equals_substitution() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(42)));
let binding = rt.binding.clone();
let fn_id = rt
.binding
.register_raw_fn(move |_dep_data: &[graphrefly_core::DepBatch]| {
let h = binding.intern(TestValue::Int(999));
FnResult::Batch {
emissions: smallvec![FnEmission::Data(h)],
tracked: None,
}
});
let derived = rt
.core
.register_derived(&[s.id], fn_id, EqualsMode::Identity, false)
.unwrap();
let rec = rt.subscribe_recorder(derived);
s.set(TestValue::Int(99));
let events = rec.snapshot();
let data_count = events
.iter()
.filter(|e| matches!(e, RecordedEvent::Data(_)))
.count();
assert_eq!(
data_count, 2,
"Batch should deliver DATA even when equal to cache"
);
}
#[test]
fn batch_fn_result_propagates_to_grandchild() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let binding = rt.binding.clone();
let fn_id_mid = rt
.binding
.register_raw_fn(move |dep_data: &[graphrefly_core::DepBatch]| {
let h = dep_data[0].latest();
let v = binding.deref(h);
let TestValue::Int(n) = v else {
panic!("expected Int")
};
let h1 = binding.intern(TestValue::Int(n * 10));
let h2 = binding.intern(TestValue::Int(n * 20));
FnResult::Batch {
emissions: smallvec![FnEmission::Data(h1), FnEmission::Data(h2)],
tracked: None,
}
});
let mid = rt
.core
.register_derived(&[s.id], fn_id_mid, EqualsMode::Identity, false)
.unwrap();
let grandchild = rt.derived(&[mid], |vals| Some(vals[0].clone()));
let rec = rt.subscribe_recorder(grandchild);
s.set(TestValue::Int(5));
let data = rec.data_values();
assert!(
data.contains(&TestValue::Int(20)),
"activation: grandchild sees mid's last batch value"
);
assert!(
data.contains(&TestValue::Int(100)),
"wave: grandchild sees mid's last batch value"
);
}
#[test]
fn cross_core_same_thread_batchguard_isolation() {
let rt_a = TestRuntime::new();
let rt_b = TestRuntime::new();
let s_b = rt_b.state(Some(TestValue::Int(0)));
let d_b = rt_b.derived(&[s_b.id], |deps| Some(deps[0].clone()));
let rec_b = rt_b.subscribe_recorder(d_b);
let s_a = rt_a.state(Some(TestValue::Int(0)));
let guard_a = rt_a.core.begin_batch();
let h = rt_b.binding.intern(TestValue::Int(7));
rt_b.core.emit(s_b.id, h);
assert!(
rec_b.data_values().contains(&TestValue::Int(7)),
"Core-B's wave must drain despite a live Core-A BatchGuard on the \
same thread (cross-Core isolation / qa F1); got {:?}",
rec_b.data_values()
);
drop(guard_a);
let d_a = rt_a.derived(&[s_a.id], |deps| Some(deps[0].clone()));
let rec_a = rt_a.subscribe_recorder(d_a);
s_a.set(TestValue::Int(99));
assert!(
rec_a.data_values().contains(&TestValue::Int(99)),
"Core-A must work normally after its BatchGuard drops; got {:?}",
rec_a.data_values()
);
}
#[test]
fn panic_in_drain_phase_releases_wave_ownership_for_next_wave() {
use std::panic;
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
let d = rt.derived(&[s.id], |deps| match &deps[0] {
TestValue::Int(999) => panic!("fn panic during drain phase (success path)"),
v => Some(v.clone()),
});
let rec = rt.subscribe_recorder(d);
let core = rt.core.clone();
let sid = s.id;
let binding = rt.binding.clone();
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
let h = binding.intern(TestValue::Int(999));
core.emit(sid, h);
}));
assert!(
result.is_err(),
"fn panic during the drain phase must propagate out of emit"
);
s.set(TestValue::Int(7));
assert!(
rec.data_values().contains(&TestValue::Int(7)),
"after a drain-phase panic, the next wave on the same (Core, \
thread) must own + drain normally; got {:?}",
rec.data_values()
);
}