mod common;
use graphrefly_core::BindingBoundary;
use graphrefly_operators::flow::{last_with_default, take};
use graphrefly_operators::transform::scan;
use common::{OpRuntime, RecordedEvent, TestValue};
#[test]
fn take_resubscribable_non_terminal_deactivate_resets_counter() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let taken = take(rt.core(), source, 3).into_node();
rt.core().set_resubscribable(taken, true);
let rec1 = rt.subscribe_recorder(taken);
rt.emit_int(source, 1);
rt.emit_int(source, 2);
let cycle1: Vec<i64> = rec1.data_values().into_iter().map(|v| v.int()).collect();
assert_eq!(cycle1, vec![1, 2], "cycle 1 should emit 1 and 2");
assert!(
!rec1.events().contains(&RecordedEvent::Complete),
"take(3) should not self-complete after only 2 emits"
);
drop(rec1);
rt.core().invalidate(source);
let rec2 = rt.subscribe_recorder(taken);
rt.emit_int(source, 100);
rt.emit_int(source, 200);
rt.emit_int(source, 300);
rt.emit_int(source, 400); let cycle2: Vec<i64> = rec2
.data_values()
.into_iter()
.map(|v| v.int())
.filter(|n| *n >= 100)
.collect();
assert_eq!(
cycle2,
vec![100, 200, 300],
"post-deactivate cycle must count from 0 again (D-α). Pre-D-α \
this would have emitted only 100 then self-completed."
);
assert!(
rec2.events().contains(&RecordedEvent::Complete),
"take(3) must self-complete after 3 fresh emits"
);
}
#[test]
fn scan_resubscribable_non_terminal_deactivate_resets_acc_to_seed() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let seed = rt.intern_int(10);
let binding_for_fold = rt.binding.clone();
let scanned = scan(
rt.core(),
&rt.op_binding,
source,
move |acc, x| {
let a = binding_for_fold.deref(acc).int();
let b = binding_for_fold.deref(x).int();
binding_for_fold.intern(TestValue::Int(a + b))
},
seed,
)
.node;
rt.core().set_resubscribable(scanned, true);
let rec1 = rt.subscribe_recorder(scanned);
rt.emit_int(source, 5);
rt.emit_int(source, 3);
let cycle1: Vec<i64> = rec1.data_values().into_iter().map(|v| v.int()).collect();
assert_eq!(
cycle1,
vec![15, 18],
"scan(seed=10) folds 10+5=15 then 15+3=18"
);
drop(rec1);
rt.core().invalidate(source);
let rec2 = rt.subscribe_recorder(scanned);
rt.emit_int(source, 7);
let cycle2: Vec<i64> = rec2
.data_values()
.into_iter()
.map(|v| v.int())
.filter(|n| ![15, 18].contains(n)) .collect();
assert_eq!(
cycle2,
vec![17],
"post-deactivate cycle must restart acc from seed=10 (so 10+7=17). \
Pre-D-α this would have emitted 25 (18 + 7 carrying over stale acc)."
);
}
#[test]
fn phase_g_resubscribable_seed_aliasing_acc_does_not_collapse_registry() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let seed = rt.intern_int(42);
let _keep_alive = rt.intern_int(42);
let baseline = rt.binding.refcount_of(seed);
assert_eq!(
baseline, 2,
"pre-scan baseline: user intern + diagnostic intern = 2"
);
let binding_for_fold = rt.binding.clone();
let scanned = scan(
rt.core(),
&rt.op_binding,
source,
move |acc, _x| {
binding_for_fold.intern(binding_for_fold.deref(acc))
},
seed,
)
.node;
rt.core().set_resubscribable(scanned, true);
assert_eq!(
rt.binding.refcount_of(seed),
3,
"post-scan registration: user + diag + ScanState.acc retain = 3"
);
let rec1 = rt.subscribe_recorder(scanned);
rt.emit_int(source, 1);
rt.emit_int(source, 2);
assert!(
rt.binding.refcount_of(seed) > baseline,
"seed retained above baseline during active subscription (acc \
aliases seed via identity fold). got refcount(seed)={}, baseline={baseline}",
rt.binding.refcount_of(seed)
);
drop(rec1);
assert!(
rt.binding.refcount_of(seed) >= baseline,
"seed handle must NOT drop below baseline ({baseline}) after \
Phase G — D-α retain-before-release invariant. got refcount(seed)={}",
rt.binding.refcount_of(seed)
);
rt.core().invalidate(source);
let _rec2 = rt.subscribe_recorder(scanned);
rt.emit_int(source, 3);
assert!(
rt.binding.refcount_of(seed) >= baseline,
"seed handle remains at least at baseline ({baseline}) across \
non-terminal deactivate-reactivate"
);
}
#[test]
fn pending_scratch_release_drains_on_core_drop() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let seed = rt.intern_int(100);
let binding_for_fold = rt.binding.clone();
let scanned = scan(
rt.core(),
&rt.op_binding,
source,
move |acc, x| {
let a = binding_for_fold.deref(acc).int();
let b = binding_for_fold.deref(x).int();
binding_for_fold.intern(TestValue::Int(a + b))
},
seed,
)
.node;
rt.core().set_resubscribable(scanned, true);
let rec = rt.subscribe_recorder(scanned);
rt.emit_int(source, 1);
rt.emit_int(source, 2);
drop(rec);
let observed_103 = rt.binding.intern(TestValue::Int(103));
let refcount_pre_drop = rt.binding.refcount_of(observed_103);
assert!(
refcount_pre_drop >= 2,
"diagnostic intern + queued ScanState.acc share both retain 103. \
Got refcount={refcount_pre_drop}"
);
let binding = rt.binding.clone();
drop(rt);
let refcount_post_drop = binding.refcount_of(observed_103);
assert_eq!(
refcount_post_drop, 1,
"CoreState::drop must drain pending_scratch_release queue \
(D-α catch-all). Pre-drop refcount(103)={refcount_pre_drop} \
(diagnostic + queued ScanState.acc); post-drop expected 1 \
(diagnostic only). Got {refcount_post_drop}."
);
binding.release_handle(observed_103);
let _ = seed; }
#[test]
fn pending_scratch_release_drains_on_reset_for_fresh_lifecycle() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let default_handle = rt.intern_int(0);
let n = last_with_default(rt.core(), source, default_handle)
.expect("last_with_default registers cleanly for live non-terminal source")
.into_node();
rt.core().set_resubscribable(n, true);
let rec1 = rt.subscribe_recorder(n);
rt.emit_int(source, 7);
drop(rec1);
rt.core().invalidate(source);
let rec2 = rt.subscribe_recorder(n);
rt.emit_int(source, 9);
drop(rec2);
rt.core().complete(source);
let live_pre_reset = rt.binding.live_handles();
let _rec3 = rt.subscribe_recorder(n);
let live_post_reset = rt.binding.live_handles();
assert!(
live_post_reset <= live_pre_reset,
"reset_for_fresh_lifecycle Phase 3b must drain pending_scratch_release \
(D-α drain on next-reset path). pre={live_pre_reset}, post={live_post_reset}"
);
let h7 = rt.intern_int(7); assert!(
rt.binding.refcount_of(h7) >= 1,
"intern(7) post-test must be valid"
);
}