#![allow(clippy::arc_with_non_send_sync)]
mod common;
use std::sync::{Arc, Mutex};
use graphrefly_core::Message;
use common::{TestRuntime, TestValue};
#[test]
fn sink_can_reenter_core_via_emit() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let d = rt.derived(&[s.id], |deps| Some(deps[0].clone()));
let other = rt.state(None);
let other_obs = rt.derived(&[other.id], |deps| Some(deps[0].clone()));
let other_rec = rt.subscribe_recorder(other_obs);
let mailbox = rt.mailbox();
let binding = rt.binding.clone();
let other_id = other.id;
let reentrant_sink: graphrefly_core::Sink = Arc::new(move |msgs: &[Message]| {
for m in msgs {
if matches!(m, Message::Data(_)) {
let h = binding.intern(TestValue::Int(55));
let _ = mailbox.post_emit(other_id, h);
}
}
});
let sub = rt.track_subscribe(d, reentrant_sink);
s.set(TestValue::Int(2));
rt.drain_mailbox();
assert!(
other_rec.data_values().contains(&TestValue::Int(55)),
"in-wave sink re-entered Core via mailbox emit → delivered as nested wave"
);
rt.unsubscribe(d, sub);
}
#[test]
fn sink_can_complete_another_node_from_callback() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(1)));
let d = rt.derived(&[s.id], |deps| Some(deps[0].clone()));
let target = rt.state(Some(TestValue::Int(0)));
let target_rec = rt.subscribe_recorder(target.id);
let mailbox = rt.mailbox();
let target_id = target.id;
let completing_sink: graphrefly_core::Sink = Arc::new(move |msgs: &[Message]| {
for m in msgs {
if matches!(m, Message::Data(_)) {
let _ = mailbox.post_complete(target_id);
}
}
});
let sub = rt.track_subscribe(d, completing_sink);
s.set(TestValue::Int(2));
rt.drain_mailbox();
assert!(
target_rec
.snapshot()
.iter()
.any(|e| matches!(e, common::RecordedEvent::Complete)),
"in-wave sink completed another node via mailbox → delivered as nested wave"
);
rt.unsubscribe(d, sub);
}
#[test]
fn p7_reentrant_drain_mailbox_applies_nested_waves_in_fifo_order() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
let d = rt.derived(&[s.id], |deps| Some(deps[0].clone()));
let order = Arc::new(Mutex::new(Vec::<i64>::new()));
let sink_node = rt.state(None);
let order_w = order.clone();
let binding_o = rt.binding.clone();
let order_sink: graphrefly_core::Sink = Arc::new(move |msgs: &[Message]| {
for m in msgs {
if let Message::Data(h) = m {
if let common::TestValue::Int(n) = binding_o.deref(*h) {
order_w.lock().unwrap().push(n);
}
}
}
});
let order_sub = rt.track_subscribe(sink_node.id, order_sink);
let mailbox = rt.mailbox();
let binding = rt.binding.clone();
let sink_id = sink_node.id;
let posted = Arc::new(std::sync::atomic::AtomicBool::new(false));
let reentrant: graphrefly_core::Sink = Arc::new(move |msgs: &[Message]| {
for m in msgs {
if matches!(m, Message::Data(_))
&& !posted.swap(true, std::sync::atomic::Ordering::SeqCst)
{
for v in [10i64, 20, 30] {
let h = binding.intern(common::TestValue::Int(v));
let _ = mailbox.post_emit(sink_id, h);
}
let mailbox2 = mailbox.clone();
let binding2 = binding.clone();
let _ = mailbox.post_defer(Box::new(move |_cf| {
let h = binding2.intern(common::TestValue::Int(40));
let _ = mailbox2.post_emit(sink_id, h);
}));
}
}
});
let d_sub = rt.track_subscribe(d, reentrant);
s.set(TestValue::Int(1));
rt.drain_mailbox();
assert_eq!(
*order.lock().unwrap(),
vec![10, 20, 30, 40],
"re-entrant drain_mailbox applied nested waves in strict FIFO post order"
);
rt.unsubscribe(d, d_sub);
rt.unsubscribe(sink_node.id, order_sub);
}
#[test]
#[allow(clippy::arc_with_non_send_sync)]
fn handshake_sink_can_reenter_core_emit_on_other_node() {
use std::sync::Arc;
let rt = Arc::new(TestRuntime::new());
let s = rt.state(Some(TestValue::Int(0)));
let s_id = s.id;
let other = rt.state(None);
let other_id = other.id;
let other_rec = rt.subscribe_recorder(other_id);
let rt_inner = Arc::clone(&rt);
let sink: graphrefly_core::Sink = Arc::new(move |msgs: &[graphrefly_core::Message]| {
for m in msgs {
if matches!(m, graphrefly_core::Message::Data(_)) {
let h = rt_inner.binding.intern(TestValue::Int(99));
rt_inner.core().emit(other_id, h);
}
}
});
let _sub = rt.core().subscribe(s_id, sink);
let other_events = other_rec.snapshot();
let saw_99 = other_events
.iter()
.any(|e| matches!(e, common::RecordedEvent::Data(TestValue::Int(99))));
assert!(
saw_99,
"other should observe Data(99) emitted from the handshake sink; got {other_events:?}"
);
}
#[test]
fn concurrent_subscribe_during_emit_observes_monotonic_post_subscribe_emits() {
#[derive(Debug, PartialEq, Clone)]
enum Ev {
Start,
Data(i64),
Other,
}
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
let d = rt.derived(&[s.id], |deps| Some(deps[0].clone()));
let late: Arc<Mutex<Vec<Ev>>> = Arc::new(Mutex::new(Vec::new()));
let mailbox = rt.mailbox();
let binding = rt.binding.clone();
let s_id = s.id;
let late_for_defer = Arc::clone(&late);
let posted = Arc::new(std::sync::atomic::AtomicBool::new(false));
let trigger: graphrefly_core::Sink = Arc::new(move |msgs: &[Message]| {
for m in msgs {
if matches!(m, Message::Data(_))
&& posted
.compare_exchange(
false,
true,
std::sync::atomic::Ordering::SeqCst,
std::sync::atomic::Ordering::SeqCst,
)
.is_ok()
{
let binding = binding.clone();
let late = Arc::clone(&late_for_defer);
let _ = mailbox.post_defer(Box::new(move |cf: &dyn graphrefly_core::CoreFull| {
let b = binding.clone();
let buf = Arc::clone(&late);
let late_sink: graphrefly_core::Sink = Arc::new(move |ms: &[Message]| {
let mut g = buf.lock().unwrap();
for m in ms {
match m {
Message::Start => g.push(Ev::Start),
Message::Data(h) => match b.deref(*h) {
TestValue::Int(n) => g.push(Ev::Data(n)),
_ => g.push(Ev::Other),
},
_ => g.push(Ev::Other),
}
}
});
let _ = cf.subscribe(s_id, late_sink);
}));
}
}
});
let trig_sub = rt.track_subscribe(d, trigger);
for k in 1..=5 {
s.set(TestValue::Int(k));
rt.drain_mailbox();
}
let got = late.lock().unwrap().clone();
assert_eq!(
got.first(),
Some(&Ev::Start),
"late sink observes Start first"
);
let data: Vec<i64> = got
.iter()
.filter_map(|e| match e {
Ev::Data(n) => Some(*n),
_ => None,
})
.collect();
assert!(!data.is_empty(), "late sink received post-subscribe DATA");
assert!(
data.windows(2).all(|w| w[0] < w[1]),
"post-subscribe DATA is strictly monotonic (no stale/out-of-order): {data:?}"
);
assert_eq!(
*data.last().unwrap(),
5,
"monotonic tail converges to the final emit"
);
assert!(
data.iter().all(|n| (0..=5).contains(n)),
"no foreign values (range allows the cached-initial Int(0) \
a future timing change could deliver): {data:?}"
);
rt.unsubscribe(d, trig_sub);
}
#[test]
fn cross_queue_order_mailbox_then_deferred() {
use graphrefly_core::CoreFull;
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
let d = rt.derived(&[s.id], |deps| Some(deps[0].clone()));
let target = rt.state(None);
let target_id = target.id;
let target_rec = rt.subscribe_recorder(target_id);
let order: Arc<Mutex<Vec<&'static str>>> = Arc::new(Mutex::new(Vec::new()));
let mailbox = rt.mailbox();
let deferred = rt.core().defer_queue();
let binding = rt.binding.clone();
let order_for_sink = Arc::clone(&order);
let posted = Arc::new(std::sync::atomic::AtomicBool::new(false));
let trigger: graphrefly_core::Sink = Arc::new(move |msgs: &[Message]| {
for m in msgs {
if matches!(m, Message::Data(_))
&& !posted.swap(true, std::sync::atomic::Ordering::SeqCst)
{
let order_in_defer = Arc::clone(&order_for_sink);
let _ = deferred.post(Box::new(move |_cf: &dyn CoreFull| {
order_in_defer.lock().unwrap().push("Defer");
}));
let h = binding.intern(TestValue::Int(99));
let _ = mailbox.post_emit(target_id, h);
}
}
});
let trig_sub = rt.track_subscribe(d, trigger);
s.set(TestValue::Int(1));
rt.drain_mailbox();
assert!(
target_rec.data_values().contains(&TestValue::Int(99)),
"the CoreMailbox post_emit's cascade landed (mailbox drained)"
);
assert_eq!(
order.lock().unwrap().as_slice(),
&["Defer"],
"the DeferQueue closure ran exactly once; combined with the \
Emit-landed assertion above, this proves the drain visited \
the CoreMailbox first (Emit applied → target_rec saw Data(99)) \
then the DeferQueue (push 'Defer'). Cross-queue order = \
queue priority, not arrival order (M1 contract)."
);
rt.unsubscribe(d, trig_sub);
}