mod common;
use std::sync::Arc;
use common::{RecordedEvent, TestRuntime, TestValue};
#[test]
fn fn_can_reenter_core_emit_during_invoke_fn_runs_nested_wave() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let side = rt.state(None);
let side_obs = rt.derived(&[side.id], |deps| Some(deps[0].clone()));
let side_rec = rt.subscribe_recorder(side_obs);
let mailbox = rt.mailbox();
let binding = rt.binding.clone();
let side_id = side.id;
let d = rt.derived(&[s.id], move |deps| {
let TestValue::Int(n) = &deps[0] else {
panic!("type")
};
let h = binding.intern(TestValue::Int(n * 100));
assert!(mailbox.post_emit(side_id, h), "Core alive");
Some(TestValue::Int(*n))
});
let _rec_d = rt.subscribe_recorder(d);
assert_eq!(rt.cache_value(d), Some(TestValue::Int(1)));
assert_eq!(
side_rec.data_values(),
vec![TestValue::Int(100)],
"re-entrant emit delivered as an in-wave nested wave"
);
rt.drain_mailbox();
assert_eq!(
side_rec.data_values(),
vec![TestValue::Int(100)],
"explicit drain is a no-op once the in-wave drain emptied the mailbox"
);
}
#[test]
fn fn_can_reenter_core_invalidate_during_invoke_fn() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(7)));
let sibling = rt.state(Some(TestValue::Int(99)));
let sib_obs = rt.derived(&[sibling.id], |deps| Some(deps[0].clone()));
let sib_rec = rt.subscribe_recorder(sib_obs);
let mailbox = rt.mailbox();
let sib_id = sibling.id;
let d = rt.derived(&[s.id], move |deps| {
assert!(
mailbox.post_defer(Box::new(move |cf| {
cf.invalidate(sib_id);
})),
"Core alive"
);
Some(deps[0].clone())
});
let _rec_d = rt.subscribe_recorder(d);
assert_eq!(rt.cache_value(d), Some(TestValue::Int(7)));
assert!(
sib_rec
.snapshot()
.iter()
.any(|e| matches!(e, RecordedEvent::Invalidate)),
"re-entrant Defer invalidate delivered as an in-wave nested wave; got {:?}",
sib_rec.snapshot()
);
assert_eq!(
rt.cache_value(sibling.id),
None,
"sibling cache cleared by the re-entrant invalidate"
);
}
#[test]
fn custom_equals_can_reenter_core_during_emission() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let side = rt.state(None);
let side_obs = rt.derived(&[side.id], |deps| Some(deps[0].clone()));
let side_rec = rt.subscribe_recorder(side_obs);
let mailbox = rt.mailbox();
let binding = rt.binding.clone();
let side_id = side.id;
let d = rt.derived_with_equals(
&[s.id],
|deps| Some(deps[0].clone()),
move |a, b| {
let h = binding.intern(TestValue::Int(42));
assert!(mailbox.post_emit(side_id, h), "Core alive");
a == b
},
);
let _rec_d = rt.subscribe_recorder(d);
let _ = side_rec.snapshot();
s.set(TestValue::Int(2));
rt.drain_mailbox(); assert!(
side_rec.data_values().contains(&TestValue::Int(42)),
"custom_equals re-entrant emit delivered as an in-wave nested wave; got {:?}",
side_rec.data_values()
);
}
#[test]
fn handshake_tier_split_sentinel_state_one_call() {
let rt = TestRuntime::new();
let s = rt.state(None);
let rec = rt.subscribe_recorder(s.id);
assert_eq!(rec.call_count(), 1, "sentinel handshake = 1 sink call");
assert_eq!(rec.call_boundaries(), vec![1]);
assert_eq!(rec.snapshot(), vec![RecordedEvent::Start]);
}
#[test]
fn handshake_tier_split_cached_state_two_calls() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(42)));
let rec = rt.subscribe_recorder(s.id);
assert_eq!(rec.call_count(), 2, "cached handshake = 2 sink calls");
assert_eq!(rec.call_boundaries(), vec![1, 1]);
assert_eq!(
rec.snapshot(),
vec![
RecordedEvent::Start,
RecordedEvent::Data(TestValue::Int(42))
]
);
}
#[test]
fn late_subscriber_installed_after_first_queue_notify_does_not_double_receive_data() {
#[derive(Debug, PartialEq)]
enum Ev {
Start,
Data(i64),
Other,
}
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
let d = rt.derived(&[s.id], |deps| Some(deps[0].clone()));
let late: std::sync::Arc<std::sync::Mutex<Vec<Ev>>> =
std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
let mailbox = rt.mailbox();
let binding = rt.binding.clone();
let s_id = s.id;
let late_for_defer = std::sync::Arc::clone(&late);
let posted = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let trigger: graphrefly_core::Sink =
std::sync::Arc::new(move |msgs: &[graphrefly_core::Message]| {
for m in msgs {
if matches!(m, graphrefly_core::Message::Data(_))
&& posted
.compare_exchange(
false,
true,
std::sync::atomic::Ordering::SeqCst,
std::sync::atomic::Ordering::SeqCst,
)
.is_ok()
{
let binding = binding.clone();
let late = std::sync::Arc::clone(&late_for_defer);
let _ =
mailbox.post_defer(Box::new(move |cf: &dyn graphrefly_core::CoreFull| {
let b = binding.clone();
let buf = std::sync::Arc::clone(&late);
let late_sink: graphrefly_core::Sink =
std::sync::Arc::new(move |ms: &[graphrefly_core::Message]| {
let mut g = buf.lock().unwrap();
for m in ms {
match m {
graphrefly_core::Message::Start => g.push(Ev::Start),
graphrefly_core::Message::Data(h) => {
match b.deref(*h) {
TestValue::Int(n) => g.push(Ev::Data(n)),
_ => g.push(Ev::Other),
}
}
_ => g.push(Ev::Other),
}
}
});
let _ = cf.subscribe(s_id, late_sink);
}));
}
}
});
let trig_sub = rt.track_subscribe(d, trigger);
s.set(TestValue::Int(1));
rt.drain_mailbox();
s.set(TestValue::Int(2));
rt.drain_mailbox();
let got = late.lock().unwrap();
assert_eq!(
got.first(),
Some(&Ev::Start),
"late subscriber observes Start first; got {:?}",
&*got
);
let data: Vec<i64> = got
.iter()
.filter_map(|e| match e {
Ev::Data(n) => Some(*n),
_ => None,
})
.collect();
assert_eq!(
data,
vec![0, 1, 2],
"each Data value delivered EXACTLY ONCE (no double-receive): \
`Data(0)` from late's handshake replay (D260 in-wave Defer \
timing), then `Data(1)`/`Data(2)` from post-subscribe emits; \
full stream {:?}",
&*got
);
let mut sorted = data.clone();
sorted.sort_unstable();
sorted.dedup();
assert_eq!(
sorted.len(),
data.len(),
"X4/D2 freeze invariant: every Data value appears EXACTLY ONCE; \
got data projection {:?} (duplicates after dedup: {:?})",
data,
sorted
);
drop(got);
rt.unsubscribe(d, trig_sub);
}
#[test]
fn lock_released_refactor_does_not_leak_handles_under_basic_emit() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
let d = rt.derived(&[s.id], |deps| Some(deps[0].clone()));
let _rec_d = rt.subscribe_recorder(d);
for i in 1..=10 {
s.set(TestValue::Int(i));
}
drop(_rec_d);
drop(s);
let live_now = rt.binding.live_handles();
assert!(
live_now <= 2,
"expected <= 2 live handles after drop (derived cache + maybe a transient), got {}",
live_now
);
drop(rt);
}
#[test]
fn reentrant_core_cache_of_read_via_defer_returns_correct_value() {
let rt = TestRuntime::new();
let probe_src = rt.state(Some(TestValue::Int(0)));
let probe = rt.derived(&[probe_src.id], |deps| {
let TestValue::Int(n) = &deps[0] else {
panic!("type")
};
Some(TestValue::Int(n + 1000))
});
let _rec_probe = rt.subscribe_recorder(probe);
assert_eq!(rt.cache_value(probe), Some(TestValue::Int(1000)));
let observed: Arc<std::sync::Mutex<Option<graphrefly_core::HandleId>>> =
Arc::new(std::sync::Mutex::new(None));
let observed_w = observed.clone();
let mailbox = rt.mailbox();
let probe_id = probe;
let s = rt.state(Some(TestValue::Int(1)));
let d = rt.derived(&[s.id], move |deps| {
let observed_w = observed_w.clone();
assert!(
mailbox.post_defer(Box::new(move |cf| {
*observed_w.lock().unwrap() = Some(cf.cache_of(probe_id));
})),
"Core alive"
);
Some(deps[0].clone())
});
let _rec_d = rt.subscribe_recorder(d);
assert_eq!(rt.cache_value(d), Some(TestValue::Int(1)));
rt.drain_mailbox();
let h = observed.lock().unwrap().expect("defer ran");
assert_eq!(
rt.binding.deref(h),
TestValue::Int(1000),
"re-entrant cache_of read returned the correct cached value, no deadlock"
);
}