mod common;
use common::{OpRuntime, RecordedEvent};
use graphrefly_operators::{concat, race, take_until, zip};
#[test]
fn zip_self_completes_immediately_when_one_source_is_dead() {
let rt = OpRuntime::new();
let live = rt.state_int(Some(1));
let dead = rt.state_int(Some(2));
rt.core().complete(dead);
let pack_fn = rt.register_tuple_packer();
let rec = rt.with_all_partitions_held(|rt| {
let z = zip(rt.core(), &rt.producer_binding, vec![live, dead], pack_fn).unwrap();
rt.subscribe_recorder(z)
});
let events = rec.events();
assert!(
events.contains(&RecordedEvent::Complete),
"zip with one Dead source must self-Complete synchronously \
(F2 Dead path). events={events:?}"
);
let data_count = rec.data_values().len();
assert_eq!(
data_count,
0,
"zip must NOT emit any tuples when one source is Dead at \
activation. data={:?}",
rec.data_values()
);
}
#[test]
fn zip_self_completes_when_all_sources_are_dead() {
let rt = OpRuntime::new();
let a = rt.state_int(Some(1));
let b = rt.state_int(Some(2));
rt.core().complete(a);
rt.core().complete(b);
let pack_fn = rt.register_tuple_packer();
let rec = rt.with_all_partitions_held(|rt| {
let z = zip(rt.core(), &rt.producer_binding, vec![a, b], pack_fn).unwrap();
rt.subscribe_recorder(z)
});
assert!(rec.events().contains(&RecordedEvent::Complete));
assert!(rec.data_values().is_empty());
}
#[test]
fn concat_dead_first_immediately_advances_to_second() {
let rt = OpRuntime::new();
let first = rt.state_int(Some(1));
let second = rt.state_int(None);
rt.core().complete(first);
let rec = rt.with_all_partitions_held(|rt| {
let c = concat(rt.core(), &rt.producer_binding, first, second);
rt.subscribe_recorder(c)
});
rt.emit_int(second, 42);
rt.settle(); let values: Vec<i64> = rec.data_values().into_iter().map(|v| v.int()).collect();
assert!(
values.contains(&42),
"concat must forward `second` data after Dead `first` triggered \
phase transition. events={:?}",
rec.events()
);
}
#[test]
fn concat_dead_first_and_dead_second_self_completes() {
let rt = OpRuntime::new();
let first = rt.state_int(Some(1));
let second = rt.state_int(Some(2));
rt.core().complete(first);
rt.core().complete(second);
let rec = rt.with_all_partitions_held(|rt| {
let c = concat(rt.core(), &rt.producer_binding, first, second);
rt.subscribe_recorder(c)
});
assert!(
rec.events().contains(&RecordedEvent::Complete),
"concat with both sources Dead must self-Complete. events={:?}",
rec.events()
);
}
#[test]
fn race_all_dead_sources_self_completes() {
let rt = OpRuntime::new();
let a = rt.state_int(Some(1));
let b = rt.state_int(Some(2));
rt.core().complete(a);
rt.core().complete(b);
let rec = rt.with_all_partitions_held(|rt| {
let r = race(rt.core(), &rt.producer_binding, vec![a, b]).unwrap();
rt.subscribe_recorder(r)
});
assert!(
rec.events().contains(&RecordedEvent::Complete),
"race with all sources Dead must self-Complete. events={:?}",
rec.events()
);
}
#[test]
fn race_one_dead_one_live_continues_with_live() {
let rt = OpRuntime::new();
let dead = rt.state_int(Some(1));
let live = rt.state_int(None);
rt.core().complete(dead);
let rec = rt.with_all_partitions_held(|rt| {
let r = race(rt.core(), &rt.producer_binding, vec![dead, live]).unwrap();
rt.subscribe_recorder(r)
});
rt.emit_int(live, 99);
rt.settle(); let values: Vec<i64> = rec.data_values().into_iter().map(|v| v.int()).collect();
assert_eq!(
values,
vec![99],
"race with one Dead + one live: live wins after emitting. \
events={:?}",
rec.events()
);
}
#[test]
fn take_until_dead_source_self_completes() {
let rt = OpRuntime::new();
let source = rt.state_int(Some(1));
let notifier = rt.state_int(None);
rt.core().complete(source);
let rec = rt.with_all_partitions_held(|rt| {
let n = take_until(rt.core(), &rt.producer_binding, source, notifier);
rt.subscribe_recorder(n)
});
assert!(
rec.events().contains(&RecordedEvent::Complete),
"take_until with Dead source must self-Complete. events={:?}",
rec.events()
);
}
#[test]
fn take_until_dead_notifier_passes_source_through() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let notifier = rt.state_int(Some(99));
rt.core().complete(notifier);
let rec = rt.with_all_partitions_held(|rt| {
let n = take_until(rt.core(), &rt.producer_binding, source, notifier);
rt.subscribe_recorder(n)
});
rt.emit_int(source, 7);
rt.settle(); let values: Vec<i64> = rec.data_values().into_iter().map(|v| v.int()).collect();
assert_eq!(
values,
vec![7],
"take_until with Dead notifier passes source DATA through \
(notifier signal will never fire). events={:?}",
rec.events()
);
assert!(
!rec.events().contains(&RecordedEvent::Complete),
"Dead notifier must NOT trigger take_until's self-Complete \
(only notifier DATA does)"
);
}