mod common;
use graphrefly_operators::stratify::stratify_branch;
use common::{OpRuntime, RecordedEvent, TestValue};
fn make_modulo_classifier(rt: &OpRuntime, expected_remainder: i64) -> graphrefly_core::FnId {
let binding = rt.binding.clone();
rt.binding
.register_stratify_classifier(Box::new(move |rules_h, value_h| {
let mode = binding.deref(rules_h).int();
let value = binding.deref(value_h).int();
if mode == 0 {
return false; }
value.rem_euclid(mode) == expected_remainder
}))
}
#[test]
fn stratify_routes_matching_values_only() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let rules = rt.state_int(Some(2));
let classifier_evens = make_modulo_classifier(&rt, 0);
let evens = stratify_branch(
rt.core(),
&rt.producer_binding,
source,
rules,
classifier_evens,
);
let rec_evens = rt.subscribe_recorder(evens);
rt.emit_int(source, 1); rt.emit_int(source, 2); rt.emit_int(source, 3); rt.emit_int(source, 4); rt.settle();
assert_eq!(
rec_evens.data_values(),
vec![TestValue::Int(2), TestValue::Int(4)]
);
}
#[test]
fn stratify_multi_branch_independent_routing() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let rules = rt.state_int(Some(3));
let classifier_zeros = make_modulo_classifier(&rt, 0);
let zeros = stratify_branch(
rt.core(),
&rt.producer_binding,
source,
rules,
classifier_zeros,
);
let classifier_ones = make_modulo_classifier(&rt, 1);
let ones = stratify_branch(
rt.core(),
&rt.producer_binding,
source,
rules,
classifier_ones,
);
let classifier_twos = make_modulo_classifier(&rt, 2);
let twos = stratify_branch(
rt.core(),
&rt.producer_binding,
source,
rules,
classifier_twos,
);
let rec_zeros = rt.subscribe_recorder(zeros);
let rec_ones = rt.subscribe_recorder(ones);
let rec_twos = rt.subscribe_recorder(twos);
for n in 0..9 {
rt.emit_int(source, n);
}
rt.settle();
assert_eq!(
rec_zeros.data_values(),
vec![TestValue::Int(0), TestValue::Int(3), TestValue::Int(6)],
);
assert_eq!(
rec_ones.data_values(),
vec![TestValue::Int(1), TestValue::Int(4), TestValue::Int(7)],
);
assert_eq!(
rec_twos.data_values(),
vec![TestValue::Int(2), TestValue::Int(5), TestValue::Int(8)],
);
}
#[test]
fn stratify_reactive_rules_change_future_classification() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let rules = rt.state_int(Some(2));
let classifier_zeros = make_modulo_classifier(&rt, 0);
let zeros = stratify_branch(
rt.core(),
&rt.producer_binding,
source,
rules,
classifier_zeros,
);
let rec = rt.subscribe_recorder(zeros);
rt.emit_int(source, 2); rt.emit_int(source, 5);
rt.emit_int(rules, 3);
rt.emit_int(source, 9); rt.emit_int(source, 7); rt.emit_int(source, 12); rt.settle();
assert_eq!(
rec.data_values(),
vec![TestValue::Int(2), TestValue::Int(9), TestValue::Int(12)],
);
}
#[test]
fn stratify_no_rules_sentinel_drops_all() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let rules = rt.state_int(None);
let classifier = make_modulo_classifier(&rt, 0);
let branch = stratify_branch(rt.core(), &rt.producer_binding, source, rules, classifier);
let rec = rt.subscribe_recorder(branch);
rt.emit_int(source, 2);
rt.emit_int(source, 4);
assert!(
rec.data_values().is_empty(),
"no rules → no classifier fire → empty stream"
);
}
#[test]
fn stratify_forwards_source_complete() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let rules = rt.state_int(Some(2));
let classifier = make_modulo_classifier(&rt, 0);
let branch = stratify_branch(rt.core(), &rt.producer_binding, source, rules, classifier);
let rec = rt.subscribe_recorder(branch);
rt.emit_int(source, 2);
rt.core().complete(source);
rt.settle();
assert_eq!(rec.data_values(), vec![TestValue::Int(2)]);
assert!(rec.events().contains(&RecordedEvent::Complete));
}
#[test]
fn stratify_forwards_source_error() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let rules = rt.state_int(Some(2));
let classifier = make_modulo_classifier(&rt, 0);
let branch = stratify_branch(rt.core(), &rt.producer_binding, source, rules, classifier);
let rec = rt.subscribe_recorder(branch);
let err_h = rt.intern(TestValue::Str("boom".into()));
rt.core().error(source, err_h);
rt.settle();
assert!(
rec.events()
.iter()
.any(|e| matches!(e, RecordedEvent::Error(TestValue::Str(s)) if s == "boom")),
"expected Error(boom) in {:?}",
rec.events()
);
}
#[test]
fn stratify_absorbs_rules_complete_silently() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let rules = rt.state_int(Some(2));
let classifier = make_modulo_classifier(&rt, 0);
let branch = stratify_branch(rt.core(), &rt.producer_binding, source, rules, classifier);
let rec = rt.subscribe_recorder(branch);
rt.emit_int(source, 2);
rt.core().complete(rules); rt.emit_int(source, 4); rt.settle();
assert_eq!(
rec.data_values(),
vec![TestValue::Int(2), TestValue::Int(4)]
);
assert!(
!rec.events().contains(&RecordedEvent::Complete),
"rules COMPLETE must NOT propagate to branch downstream",
);
}
#[test]
fn stratify_forwards_source_teardown() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let rules = rt.state_int(Some(2));
let classifier = make_modulo_classifier(&rt, 0);
let branch = stratify_branch(rt.core(), &rt.producer_binding, source, rules, classifier);
let rec = rt.subscribe_recorder(branch);
rt.emit_int(source, 2);
rt.core().teardown(source);
rt.settle();
assert_eq!(rec.data_values(), vec![TestValue::Int(2)]);
assert!(
rec.events().contains(&RecordedEvent::Teardown),
"source TEARDOWN must propagate downstream (TS parity); got {:?}",
rec.events()
);
}
#[test]
fn stratify_same_wave_gating_uses_new_rules() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let rules = rt.state_int(Some(2));
let classifier = make_modulo_classifier(&rt, 0);
let branch = stratify_branch(rt.core(), &rt.producer_binding, source, rules, classifier);
let rec = rt.subscribe_recorder(branch);
{
let _g = rt.core().begin_batch();
rt.emit_int(rules, 3);
rt.emit_int(source, 3);
}
rt.settle();
assert_eq!(
rec.data_values(),
vec![TestValue::Int(3)],
"same-wave rules+source update must classify under NEW rules \
(mode=3 → 3%3==0 → emit); got {:?}",
rec.data_values()
);
}
#[test]
fn stratify_drop_releases_cached_rules_handle() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let rules = rt.state_int(Some(2));
let baseline_live = rt.binding.live_handles();
let classifier = make_modulo_classifier(&rt, 0);
let branch = stratify_branch(rt.core(), &rt.producer_binding, source, rules, classifier);
{
let rec = rt.subscribe_recorder(branch);
rt.emit_int(source, 2); rt.emit_int(source, 5); rt.emit_int(source, 4); drop(rec);
}
let post_live = rt.binding.live_handles();
assert_eq!(
post_live,
baseline_live + 1,
"expected delta of +1 (source.cache for Int(4)); baseline \
{baseline_live}, post {post_live}. A larger delta means a \
retained handle leaked through StratifyState::Drop or the \
buffered-source-value cleanup path.",
);
}