mod common;
use std::sync::{Arc, Mutex};
use graphrefly_core::Message;
use common::{TestRuntime, TestValue};
fn armed_sink<F>(armed: Arc<Mutex<bool>>, on_data: F) -> graphrefly_core::Sink
where
F: Fn(&Message) + Send + Sync + 'static,
{
Arc::new(move |msgs: &[Message]| {
if !*armed.lock().unwrap() {
return;
}
for m in msgs {
if matches!(m, Message::Data(_)) {
on_data(m);
}
}
})
}
#[test]
fn sink_can_reenter_core_via_emit() {
let rt = Arc::new(TestRuntime::new());
let s = rt.state(Some(TestValue::Int(0)));
let s_id = s.id;
let t = rt.state(Some(TestValue::Int(0)));
let t_id = t.id;
let t_rec = rt.subscribe_recorder(t_id);
let baseline_t = t_rec.snapshot().len();
let armed = Arc::new(Mutex::new(false));
let triggered = Arc::new(Mutex::new(false));
let rt_inner = Arc::clone(&rt);
let triggered_inner = triggered.clone();
let sink = armed_sink(armed.clone(), move |_msg| {
let already = {
let mut t = triggered_inner.lock().unwrap();
let prev = *t;
*t = true;
prev
};
if !already {
let h = rt_inner.binding.intern(TestValue::Int(99));
rt_inner.core.emit(t_id, h);
}
});
std::mem::forget(rt.core.subscribe(s_id, sink));
*armed.lock().unwrap() = true;
let h = rt.binding.intern(TestValue::Int(7));
rt.core.emit(s_id, h);
let t_events = t_rec.snapshot();
let new_t = &t_events[baseline_t..];
assert!(
new_t
.iter()
.any(|e| matches!(e, common::RecordedEvent::Data(TestValue::Int(99)))),
"expected re-entrant emit to deliver Data(99) to t; got {t_events:?}"
);
}
#[test]
fn sink_can_reenter_core_via_pause_and_resume() {
let rt = Arc::new(TestRuntime::new());
let s = rt.state(Some(TestValue::Int(0)));
let s_id = s.id;
let t = rt.state(Some(TestValue::Int(0)));
let t_id = t.id;
let _t_rec = rt.subscribe_recorder(t_id);
let lock = rt.core.alloc_lock_id();
let armed = Arc::new(Mutex::new(false));
let pauses_seen = Arc::new(Mutex::new(0u32));
let rt_inner = Arc::clone(&rt);
let pauses_inner = pauses_seen.clone();
let sink = armed_sink(armed.clone(), move |_msg| {
let mut p = pauses_inner.lock().unwrap();
if *p == 0 {
*p = 1;
drop(p);
rt_inner.core.pause(t_id, lock).expect("pause from sink");
rt_inner.core.resume(t_id, lock).expect("resume from sink");
}
});
std::mem::forget(rt.core.subscribe(s_id, sink));
*armed.lock().unwrap() = true;
let h = rt.binding.intern(TestValue::Int(42));
rt.core.emit(s_id, h);
assert_eq!(
*pauses_seen.lock().unwrap(),
1,
"sink should have re-entered Core via pause/resume"
);
}
#[test]
fn sink_can_complete_another_node_from_callback() {
let rt = Arc::new(TestRuntime::new());
let s = rt.state(Some(TestValue::Int(0)));
let t = rt.state(Some(TestValue::Int(0)));
let t_id = t.id;
let t_rec = rt.subscribe_recorder(t_id);
let baseline = t_rec.snapshot().len();
let armed = Arc::new(Mutex::new(false));
let fired = Arc::new(Mutex::new(false));
let rt_inner = Arc::clone(&rt);
let fired_inner = fired.clone();
let sink = armed_sink(armed.clone(), move |_msg| {
let mut f = fired_inner.lock().unwrap();
if !*f {
*f = true;
drop(f);
rt_inner.core.complete(t_id);
}
});
std::mem::forget(rt.core.subscribe(s.id, sink));
*armed.lock().unwrap() = true;
let h = rt.binding.intern(TestValue::Int(11));
rt.core.emit(s.id, h);
let t_events = t_rec.snapshot();
let new_t = &t_events[baseline..];
assert!(
new_t
.iter()
.any(|e| matches!(e, common::RecordedEvent::Complete)),
"t should observe Complete after re-entrant complete from s's sink; got {t_events:?}"
);
}
#[test]
fn handshake_sink_can_reenter_core_emit_on_other_node() {
use std::sync::Arc;
let rt = Arc::new(TestRuntime::new());
let s = rt.state(Some(TestValue::Int(0)));
let s_id = s.id;
let other = rt.state(None);
let other_id = other.id;
let other_rec = rt.subscribe_recorder(other_id);
let rt_inner = Arc::clone(&rt);
let sink: graphrefly_core::Sink = Arc::new(move |msgs: &[graphrefly_core::Message]| {
for m in msgs {
if matches!(m, graphrefly_core::Message::Data(_)) {
let h = rt_inner.binding.intern(TestValue::Int(99));
rt_inner.core.emit(other_id, h);
}
}
});
let _sub = rt.core.subscribe(s_id, sink);
let other_events = other_rec.snapshot();
let saw_99 = other_events
.iter()
.any(|e| matches!(e, common::RecordedEvent::Data(TestValue::Int(99))));
assert!(
saw_99,
"other should observe Data(99) emitted from the handshake sink; got {other_events:?}"
);
}
#[test]
fn concurrent_subscribe_during_emit_observes_monotonic_post_subscribe_emits() {
use std::sync::atomic::{AtomicI64, Ordering};
use std::thread;
use std::time::Duration;
let rt = Arc::new(TestRuntime::new());
let s = rt.state(Some(TestValue::Int(0)));
let s_id = s.id;
let stop = Arc::new(Mutex::new(false));
let emit_count = Arc::new(AtomicI64::new(0));
let rt_emit = Arc::clone(&rt);
let stop_emit = Arc::clone(&stop);
let emit_count_inner = Arc::clone(&emit_count);
let emit_handle = thread::spawn(move || {
let mut counter = 1i64;
while !*stop_emit.lock().unwrap() && counter <= 1000 {
let h = rt_emit.binding.intern(TestValue::Int(counter));
rt_emit.core.emit(s_id, h);
emit_count_inner.store(counter, Ordering::SeqCst);
counter += 1;
}
});
thread::sleep(Duration::from_millis(2));
let count_before_subscribe = emit_count.load(Ordering::SeqCst);
let rec = rt.subscribe_recorder(s_id);
let count_at_subscribe = emit_count.load(Ordering::SeqCst);
thread::sleep(Duration::from_millis(20));
*stop.lock().unwrap() = true;
emit_handle.join().expect("emit thread join");
let count_after_join = emit_count.load(Ordering::SeqCst);
let events = rec.snapshot();
assert!(
matches!(events.first(), Some(common::RecordedEvent::Start)),
"first event must be Start; got {:?}",
events.first()
);
let mut last: Option<i64> = None;
for e in &events {
if let common::RecordedEvent::Data(common::TestValue::Int(n)) = e {
if let Some(prev) = last {
assert!(
*n > prev,
"Data values not monotonic: prev={prev}, now={n}; full trace: {events:?}"
);
}
last = Some(*n);
}
}
assert!(
count_after_join > count_at_subscribe,
"sandwich check failed: emit thread completed before/at subscribe \
(count before subscribe={count_before_subscribe}, at subscribe={count_at_subscribe}, \
after join={count_after_join}). The race window was not exercised; \
test result is vacuous. Increase the post-subscribe sleep or upper \
counter bound."
);
if let Some(max_observed) = last {
assert!(max_observed >= 1, "no Data values observed at all");
}
}