mod common;
use common::{OpRuntime, RecordedEvent, TestValue};
use graphrefly_operators::{concat, race, take_until, zip};
#[test]
fn zip_pairs_data_from_two_sources() {
let rt = OpRuntime::new();
let s1 = rt.state_int(None);
let s2 = rt.state_int(None);
let pack_fn = rt.register_tuple_packer();
let z = zip(rt.core(), &rt.producer_binding, vec![s1, s2], pack_fn).unwrap();
let rec = rt.subscribe_recorder(z);
rt.emit_int(s1, 1);
rt.emit_int(s2, 10);
rt.emit_int(s1, 2);
rt.emit_int(s2, 20);
rt.settle();
let data = rec.data_values();
assert_eq!(data.len(), 2, "should emit 2 tuples; got {data:?}");
assert_eq!(
data[0].clone().tuple(),
vec![TestValue::Int(1), TestValue::Int(10)]
);
assert_eq!(
data[1].clone().tuple(),
vec![TestValue::Int(2), TestValue::Int(20)]
);
}
#[test]
fn zip_buffers_until_all_sources_have_data() {
let rt = OpRuntime::new();
let s1 = rt.state_int(None);
let s2 = rt.state_int(None);
let pack_fn = rt.register_tuple_packer();
let z = zip(rt.core(), &rt.producer_binding, vec![s1, s2], pack_fn).unwrap();
let rec = rt.subscribe_recorder(z);
rt.emit_int(s1, 1);
rt.emit_int(s1, 2);
rt.emit_int(s1, 3);
rt.settle(); assert!(rec.data_values().is_empty(), "no tuples until s2 emits");
rt.emit_int(s2, 100);
rt.settle(); let data = rec.data_values();
assert_eq!(data.len(), 1);
assert_eq!(
data[0].clone().tuple(),
vec![TestValue::Int(1), TestValue::Int(100)]
);
}
#[test]
fn zip_completes_when_one_source_completes_with_empty_queue() {
let rt = OpRuntime::new();
let s1 = rt.state_int(None);
let s2 = rt.state_int(None);
let pack_fn = rt.register_tuple_packer();
let z = zip(rt.core(), &rt.producer_binding, vec![s1, s2], pack_fn).unwrap();
let rec = rt.subscribe_recorder(z);
rt.emit_int(s1, 1);
rt.emit_int(s2, 10);
rt.core().complete(s1);
rt.settle();
let events = rec.events();
let has_complete = events.iter().any(|e| matches!(e, RecordedEvent::Complete));
assert!(has_complete, "zip should complete; got events {events:?}");
}
#[test]
fn zip_with_three_sources() {
let rt = OpRuntime::new();
let s1 = rt.state_int(None);
let s2 = rt.state_int(None);
let s3 = rt.state_int(None);
let pack_fn = rt.register_tuple_packer();
let z = zip(rt.core(), &rt.producer_binding, vec![s1, s2, s3], pack_fn).unwrap();
let rec = rt.subscribe_recorder(z);
rt.emit_int(s1, 1);
rt.emit_int(s2, 10);
rt.settle(); assert!(rec.data_values().is_empty(), "need s3 too");
rt.emit_int(s3, 100);
rt.settle();
let data = rec.data_values();
assert_eq!(data.len(), 1);
assert_eq!(
data[0].clone().tuple(),
vec![TestValue::Int(1), TestValue::Int(10), TestValue::Int(100)]
);
}
#[test]
fn zip_propagates_error_from_any_source() {
let rt = OpRuntime::new();
let s1 = rt.state_int(None);
let s2 = rt.state_int(None);
let pack_fn = rt.register_tuple_packer();
let z = zip(rt.core(), &rt.producer_binding, vec![s1, s2], pack_fn).unwrap();
let rec = rt.subscribe_recorder(z);
let err_h = rt.intern(TestValue::Str("boom".into()));
rt.core().error(s1, err_h);
rt.settle();
let events = rec.events();
let errored = events.iter().any(|e| matches!(e, RecordedEvent::Error(_)));
assert!(errored, "expected ERROR; got {events:?}");
}
#[test]
fn concat_forwards_first_then_second() {
let rt = OpRuntime::new();
let s1 = rt.state_int(None);
let s2 = rt.state_int(None);
let c = concat(rt.core(), &rt.producer_binding, s1, s2);
let rec = rt.subscribe_recorder(c);
rt.emit_int(s1, 1);
rt.emit_int(s1, 2);
rt.core().complete(s1);
rt.emit_int(s2, 10);
rt.emit_int(s2, 20);
rt.settle();
let data = rec.data_values();
assert_eq!(
data,
vec![
TestValue::Int(1),
TestValue::Int(2),
TestValue::Int(10),
TestValue::Int(20),
]
);
}
#[test]
fn concat_buffers_second_data_during_phase_zero() {
let rt = OpRuntime::new();
let s1 = rt.state_int(None);
let s2 = rt.state_int(None);
let c = concat(rt.core(), &rt.producer_binding, s1, s2);
let rec = rt.subscribe_recorder(c);
rt.emit_int(s1, 1);
rt.emit_int(s2, 99);
rt.emit_int(s1, 2);
rt.settle(); assert_eq!(
rec.data_values(),
vec![TestValue::Int(1), TestValue::Int(2)],
"s2 data must be buffered until s1 completes"
);
rt.core().complete(s1);
rt.settle(); let data_after_handoff = rec.data_values();
assert_eq!(
data_after_handoff,
vec![
TestValue::Int(1),
TestValue::Int(2),
TestValue::Int(99), ]
);
}
#[test]
fn concat_completes_when_second_completes() {
let rt = OpRuntime::new();
let s1 = rt.state_int(None);
let s2 = rt.state_int(None);
let c = concat(rt.core(), &rt.producer_binding, s1, s2);
let rec = rt.subscribe_recorder(c);
rt.emit_int(s1, 1);
rt.core().complete(s1);
rt.emit_int(s2, 10);
rt.core().complete(s2);
rt.settle();
let events = rec.events();
let has_complete = events.iter().any(|e| matches!(e, RecordedEvent::Complete));
assert!(has_complete, "expected COMPLETE; got {events:?}");
}
#[test]
fn concat_propagates_error_from_first() {
let rt = OpRuntime::new();
let s1 = rt.state_int(None);
let s2 = rt.state_int(None);
let c = concat(rt.core(), &rt.producer_binding, s1, s2);
let rec = rt.subscribe_recorder(c);
let err = rt.intern(TestValue::Str("first-err".into()));
rt.core().error(s1, err);
rt.settle();
let events = rec.events();
let errored = events.iter().any(|e| matches!(e, RecordedEvent::Error(_)));
assert!(errored);
}
#[test]
fn concat_propagates_error_from_second_after_handoff() {
let rt = OpRuntime::new();
let s1 = rt.state_int(None);
let s2 = rt.state_int(None);
let c = concat(rt.core(), &rt.producer_binding, s1, s2);
let rec = rt.subscribe_recorder(c);
rt.core().complete(s1);
let err = rt.intern(TestValue::Str("second-err".into()));
rt.core().error(s2, err);
rt.settle();
let events = rec.events();
let errored = events.iter().any(|e| matches!(e, RecordedEvent::Error(_)));
assert!(errored);
}
#[test]
fn concat_completes_when_second_completes_before_first_in_phase_zero() {
let rt = OpRuntime::new();
let s1 = rt.state_int(None);
let s2 = rt.state_int(None);
let c = concat(rt.core(), &rt.producer_binding, s1, s2);
let rec = rt.subscribe_recorder(c);
rt.emit_int(s1, 1);
rt.emit_int(s2, 99);
rt.core().complete(s2);
rt.settle();
let pre_handoff_events = rec.events();
let completed_pre = pre_handoff_events
.iter()
.any(|e| matches!(e, RecordedEvent::Complete));
assert!(
!completed_pre,
"concat must not complete before s1 completes; got {pre_handoff_events:?}"
);
rt.core().complete(s1);
rt.settle();
let events = rec.events();
let data: Vec<i64> = events
.iter()
.filter_map(|e| match e {
RecordedEvent::Data(TestValue::Int(n)) => Some(*n),
_ => None,
})
.collect();
assert_eq!(
data,
vec![1, 99],
"expected pending(99) drained on handoff; got {data:?}"
);
let completed = events.iter().any(|e| matches!(e, RecordedEvent::Complete));
assert!(
completed,
"concat must self-complete after handoff when second already completed in phase 0; got {events:?}"
);
}
#[test]
fn race_winner_emits_subsequent_data() {
let rt = OpRuntime::new();
let s1 = rt.state_int(None);
let s2 = rt.state_int(None);
let r = race(rt.core(), &rt.producer_binding, vec![s1, s2]).unwrap();
let rec = rt.subscribe_recorder(r);
rt.emit_int(s1, 1); rt.emit_int(s2, 99); rt.emit_int(s1, 2); rt.settle();
assert_eq!(
rec.data_values(),
vec![TestValue::Int(1), TestValue::Int(2)]
);
}
#[test]
fn race_loser_data_is_silently_ignored() {
let rt = OpRuntime::new();
let s1 = rt.state_int(None);
let s2 = rt.state_int(None);
let s3 = rt.state_int(None);
let r = race(rt.core(), &rt.producer_binding, vec![s1, s2, s3]).unwrap();
let rec = rt.subscribe_recorder(r);
rt.emit_int(s2, 50); rt.emit_int(s1, 1);
rt.emit_int(s3, 100);
rt.emit_int(s2, 60); rt.settle();
assert_eq!(
rec.data_values(),
vec![TestValue::Int(50), TestValue::Int(60)]
);
}
#[test]
fn race_winner_complete_terminates_producer() {
let rt = OpRuntime::new();
let s1 = rt.state_int(None);
let s2 = rt.state_int(None);
let r = race(rt.core(), &rt.producer_binding, vec![s1, s2]).unwrap();
let rec = rt.subscribe_recorder(r);
rt.emit_int(s1, 1);
rt.core().complete(s1); rt.settle();
let events = rec.events();
let has_complete = events.iter().any(|e| matches!(e, RecordedEvent::Complete));
assert!(
has_complete,
"winner complete should terminate; got {events:?}"
);
}
#[test]
fn race_loser_complete_does_not_terminate() {
let rt = OpRuntime::new();
let s1 = rt.state_int(None);
let s2 = rt.state_int(None);
let r = race(rt.core(), &rt.producer_binding, vec![s1, s2]).unwrap();
let rec = rt.subscribe_recorder(r);
rt.emit_int(s1, 1); rt.core().complete(s2);
let events = rec.events();
let producer_complete = events.iter().any(|e| matches!(e, RecordedEvent::Complete));
assert!(
!producer_complete,
"loser complete must not terminate producer; got {events:?}"
);
}
#[test]
fn race_pre_winner_error_cascades() {
let rt = OpRuntime::new();
let s1 = rt.state_int(None);
let s2 = rt.state_int(None);
let r = race(rt.core(), &rt.producer_binding, vec![s1, s2]).unwrap();
let rec = rt.subscribe_recorder(r);
let err = rt.intern(TestValue::Str("err".into()));
rt.core().error(s1, err); rt.settle();
let events = rec.events();
let errored = events.iter().any(|e| matches!(e, RecordedEvent::Error(_)));
assert!(errored);
}
#[test]
fn take_until_forwards_source_until_notifier_emits() {
let rt = OpRuntime::new();
let src = rt.state_int(None);
let notif = rt.state_int(None);
let t = take_until(rt.core(), &rt.producer_binding, src, notif);
let rec = rt.subscribe_recorder(t);
rt.emit_int(src, 1);
rt.emit_int(src, 2);
rt.emit_int(notif, 999); rt.emit_int(src, 3);
assert_eq!(
rec.data_values(),
vec![TestValue::Int(1), TestValue::Int(2)]
);
let events = rec.events();
let has_complete = events.iter().any(|e| matches!(e, RecordedEvent::Complete));
assert!(has_complete);
}
#[test]
fn take_until_does_not_forward_notifier_value() {
let rt = OpRuntime::new();
let src = rt.state_int(None);
let notif = rt.state_int(None);
let t = take_until(rt.core(), &rt.producer_binding, src, notif);
let rec = rt.subscribe_recorder(t);
rt.emit_int(notif, 999);
let data = rec.data_values();
assert!(
!data.contains(&TestValue::Int(999)),
"notifier value must NOT be forwarded; got {data:?}"
);
}
#[test]
fn take_until_source_complete_propagates() {
let rt = OpRuntime::new();
let src = rt.state_int(None);
let notif = rt.state_int(None);
let t = take_until(rt.core(), &rt.producer_binding, src, notif);
let rec = rt.subscribe_recorder(t);
rt.emit_int(src, 1);
rt.core().complete(src); rt.settle();
let events = rec.events();
let has_complete = events.iter().any(|e| matches!(e, RecordedEvent::Complete));
assert!(has_complete);
}
#[test]
fn take_until_source_error_propagates() {
let rt = OpRuntime::new();
let src = rt.state_int(None);
let notif = rt.state_int(None);
let t = take_until(rt.core(), &rt.producer_binding, src, notif);
let rec = rt.subscribe_recorder(t);
let err = rt.intern(TestValue::Str("src-err".into()));
rt.core().error(src, err);
rt.settle();
let events = rec.events();
let errored = events.iter().any(|e| matches!(e, RecordedEvent::Error(_)));
assert!(errored);
}
#[test]
fn producer_storage_cleared_on_deactivation() {
let rt = OpRuntime::new();
let s1 = rt.state_int(None);
let s2 = rt.state_int(None);
let pack_fn = rt.register_tuple_packer();
let z = zip(rt.core(), &rt.producer_binding, vec![s1, s2], pack_fn).unwrap();
{
let _rec = rt.subscribe_recorder(z);
let storage = rt.binding.producer_storage().lock();
let entry = storage
.get(&z)
.expect("producer storage entry should exist");
assert_eq!(entry.subs.len(), 2, "zip should subscribe to both sources");
}
rt.settle();
let storage = rt.binding.producer_storage().lock();
assert!(
storage.get(&z).is_none(),
"producer storage entry should be cleared on deactivation"
);
}
#[test]
fn producer_re_subscribe_re_runs_build_closure() {
let rt = OpRuntime::new();
let s1 = rt.state_int(None);
let s2 = rt.state_int(None);
let pack_fn = rt.register_tuple_packer();
let z = zip(rt.core(), &rt.producer_binding, vec![s1, s2], pack_fn).unwrap();
{
let rec = rt.subscribe_recorder(z);
rt.emit_int(s1, 1);
rt.emit_int(s2, 10);
rt.settle(); assert_eq!(
rec.data_values(),
vec![TestValue::Tuple(vec![
TestValue::Int(1),
TestValue::Int(10)
])]
);
}
rt.core().invalidate(s1);
rt.core().invalidate(s2);
{
let rec = rt.subscribe_recorder(z);
rt.emit_int(s1, 2);
rt.emit_int(s2, 20);
rt.settle(); let data = rec.data_values();
assert!(
data.contains(&TestValue::Tuple(vec![
TestValue::Int(2),
TestValue::Int(20)
])),
"fresh activation should produce Tuple(2, 20) from new emissions; got {data:?}"
);
}
}