mod common;
use graphrefly_core::Core;
use graphrefly_operators::flow::{
element_at, find, first, last, last_with_default, skip, take, take_while,
};
use common::{OpRuntime, RecordedEvent, TestValue};
#[test]
fn take_emits_first_n_then_self_completes() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let taken = take(rt.core(), source, 2).into_node();
let rec = rt.subscribe_recorder(taken);
rt.emit_int(source, 10);
rt.emit_int(source, 20);
rt.emit_int(source, 30);
assert_eq!(
rec.data_values(),
vec![TestValue::Int(10), TestValue::Int(20)]
);
assert!(
rec.events().contains(&RecordedEvent::Complete),
"expected Complete in {:?}",
rec.events()
);
}
#[test]
fn take_zero_self_completes_on_first_fire_with_no_data() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let taken = take(rt.core(), source, 0).into_node();
let rec = rt.subscribe_recorder(taken);
rt.emit_int(source, 99);
assert!(rec.data_values().is_empty(), "events={:?}", rec.events());
assert!(
rec.events().contains(&RecordedEvent::Complete),
"expected Complete in {:?}",
rec.events()
);
}
#[test]
fn take_propagates_upstream_complete_when_count_not_reached() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let taken = take(rt.core(), source, 5).into_node();
let rec = rt.subscribe_recorder(taken);
rt.emit_int(source, 1);
rt.emit_int(source, 2);
rt.core().complete(source);
assert_eq!(
rec.data_values(),
vec![TestValue::Int(1), TestValue::Int(2)]
);
assert!(rec.events().contains(&RecordedEvent::Complete));
}
#[test]
fn take_resubscribable_resets_counter_on_lifecycle_reset() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let taken = take(rt.core(), source, 2).into_node();
rt.core().set_resubscribable(taken, true);
let rec1 = rt.subscribe_recorder(taken);
rt.emit_int(source, 1);
rt.emit_int(source, 2);
let cycle1_data: Vec<_> = rec1
.data_values()
.into_iter()
.filter(|v| matches!(v, TestValue::Int(n) if *n < 100))
.collect();
assert_eq!(cycle1_data, vec![TestValue::Int(1), TestValue::Int(2)]);
assert!(rec1.events().contains(&RecordedEvent::Complete));
drop(rec1);
rt.core().invalidate(source);
let rec2 = rt.subscribe_recorder(taken);
rt.emit_int(source, 100);
rt.emit_int(source, 200);
rt.emit_int(source, 300); let cycle2_fresh: Vec<_> = rec2
.data_values()
.into_iter()
.filter(|v| matches!(v, TestValue::Int(n) if *n >= 100))
.collect();
assert_eq!(
cycle2_fresh,
vec![TestValue::Int(100), TestValue::Int(200)],
"fresh cycle data wrong; events={:?}",
rec2.events()
);
let complete_count = rec2
.events()
.iter()
.filter(|e| matches!(e, RecordedEvent::Complete))
.count();
assert_eq!(
complete_count, 1,
"cycle 2 must self-complete after 2 fresh emits"
);
}
#[test]
fn skip_drops_first_n_then_emits_remaining() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let skipped = skip(rt.core(), source, 2).into_node();
let rec = rt.subscribe_recorder(skipped);
rt.emit_int(source, 1);
rt.emit_int(source, 2);
rt.emit_int(source, 3);
rt.emit_int(source, 4);
assert_eq!(
rec.data_values(),
vec![TestValue::Int(3), TestValue::Int(4)]
);
}
#[test]
fn skip_full_window_settles_dirty_resolved_per_d018() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let skipped = skip(rt.core(), source, 3).into_node();
let rec = rt.subscribe_recorder(skipped);
rt.emit_int(source, 1);
let events = rec.events();
assert!(
rec.data_values().is_empty(),
"skip window swallowed; no Data expected. events={events:?}"
);
assert!(
events.contains(&RecordedEvent::Resolved),
"expected Resolved settle for full-skip wave: {events:?}"
);
}
#[test]
fn take_while_emits_until_first_false_then_self_completes() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let binding = rt.binding.clone();
let taken = take_while(rt.core(), &rt.op_binding, source, move |h| {
binding.deref(h).int() < 10
})
.into_node();
let rec = rt.subscribe_recorder(taken);
rt.emit_int(source, 3);
rt.emit_int(source, 7);
rt.emit_int(source, 12); rt.emit_int(source, 1);
assert_eq!(
rec.data_values(),
vec![TestValue::Int(3), TestValue::Int(7)],
"events={:?}",
rec.events()
);
assert!(
rec.events().contains(&RecordedEvent::Complete),
"expected Complete, events={:?}",
rec.events()
);
}
#[test]
fn last_emits_buffered_latest_on_upstream_complete() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let n = last(rt.core(), source).into_node();
let rec = rt.subscribe_recorder(n);
rt.emit_int(source, 1);
rt.emit_int(source, 2);
rt.emit_int(source, 3);
assert!(
rec.data_values().is_empty(),
"last should not emit before upstream Complete: {:?}",
rec.events()
);
rt.core().complete(source);
assert_eq!(rec.data_values(), vec![TestValue::Int(3)]);
assert!(rec.events().contains(&RecordedEvent::Complete));
}
#[test]
fn last_no_default_on_empty_stream_emits_only_complete() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let n = last(rt.core(), source).into_node();
let rec = rt.subscribe_recorder(n);
rt.core().complete(source);
assert!(
rec.data_values().is_empty(),
"no default + no DATA → only Complete; events={:?}",
rec.events()
);
assert!(rec.events().contains(&RecordedEvent::Complete));
}
#[test]
fn last_with_default_on_empty_stream_emits_default() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let default = rt.intern_int(42);
let n = last_with_default(rt.core(), source, default)
.unwrap()
.into_node();
let rec = rt.subscribe_recorder(n);
rt.core().complete(source);
assert_eq!(rec.data_values(), vec![TestValue::Int(42)]);
assert!(rec.events().contains(&RecordedEvent::Complete));
}
#[test]
fn last_with_default_prefers_latest_over_default() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let default = rt.intern_int(42);
let n = last_with_default(rt.core(), source, default)
.unwrap()
.into_node();
let rec = rt.subscribe_recorder(n);
rt.emit_int(source, 7);
rt.core().complete(source);
assert_eq!(rec.data_values(), vec![TestValue::Int(7)]);
}
#[test]
fn last_propagates_upstream_error() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let n = last(rt.core(), source).into_node();
let rec = rt.subscribe_recorder(n);
rt.emit_int(source, 1);
let err_h = rt.binding.intern(TestValue::Str("boom".into()));
rt.core().error(source, err_h);
let saw_error = rec
.events()
.iter()
.any(|e| matches!(e, RecordedEvent::Error(TestValue::Str(s)) if s == "boom"));
assert!(
saw_error,
"expected Error('boom'); events={:?}",
rec.events()
);
assert!(
rec.data_values().is_empty(),
"last should not emit Data on Error: {:?}",
rec.events()
);
}
#[test]
fn first_alias_for_take_one() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let n = first(rt.core(), source).into_node();
let rec = rt.subscribe_recorder(n);
rt.emit_int(source, 7);
rt.emit_int(source, 8);
assert_eq!(rec.data_values(), vec![TestValue::Int(7)]);
assert!(rec.events().contains(&RecordedEvent::Complete));
}
#[test]
fn find_emits_first_matching_then_completes() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let binding = rt.binding.clone();
let n = find(rt.core(), &rt.op_binding, source, move |h| {
binding.deref(h).int() > 5
})
.into_node();
let rec = rt.subscribe_recorder(n);
rt.emit_int(source, 1);
rt.emit_int(source, 3);
rt.emit_int(source, 8); rt.emit_int(source, 9);
assert_eq!(rec.data_values(), vec![TestValue::Int(8)]);
assert!(rec.events().contains(&RecordedEvent::Complete));
}
#[test]
fn element_at_emits_indexed_value_then_completes() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let n = element_at(rt.core(), source, 2).into_node();
let rec = rt.subscribe_recorder(n);
rt.emit_int(source, 10);
rt.emit_int(source, 20);
rt.emit_int(source, 30); rt.emit_int(source, 40);
assert_eq!(rec.data_values(), vec![TestValue::Int(30)]);
assert!(rec.events().contains(&RecordedEvent::Complete));
}
#[test]
fn last_with_default_releases_default_on_core_drop() {
let binding;
let default_handle;
{
let rt = OpRuntime::new();
binding = rt.binding.clone();
let source = rt.state_int(None);
default_handle = rt.intern_int(99);
assert_eq!(binding.refcount_of(default_handle), 1);
let _n = last_with_default(rt.core(), source, default_handle)
.unwrap()
.into_node();
assert_eq!(
binding.refcount_of(default_handle),
2,
"register_operator must retain the default handle"
);
}
assert_eq!(
binding.refcount_of(default_handle),
1,
"Core drop must release the default handle's share"
);
}
#[test]
fn last_releases_buffered_latest_on_lifecycle_reset() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let n = last(rt.core(), source).into_node();
rt.core().set_resubscribable(n, true);
let observed_handle = rt.intern_int(5);
assert_eq!(rt.binding.refcount_of(observed_handle), 1);
let rec1 = rt.subscribe_recorder(n);
rt.emit_int(source, 5);
rt.settle(); assert_eq!(
rt.binding.refcount_of(observed_handle),
4,
"after emit + wave drain: 1 (diag) + 1 (source cache) + 1 (prev_data) + 1 (LastState.latest)"
);
rt.core().complete(source);
rt.settle(); assert_eq!(rec1.data_values(), vec![TestValue::Int(5)]);
drop(rec1);
rt.settle();
let _rec2 = rt.subscribe_recorder(n);
rt.settle(); assert_eq!(
rt.binding.refcount_of(observed_handle),
4,
"after Phase G cache-clear + reset + re-activation: Last.cache \
released by Phase G; prev_data + LastState.latest replaced by \
re-activation's fresh retains. (Pre-D119 expected 5 because \
Last.cache survived; D119 clears compute cache on deactivation \
per R2.2.7 / R2.2.8 ROM rule.)"
);
}
#[test]
fn take_after_skip_produces_window() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let skipped = skip(rt.core(), source, 2).into_node();
let windowed = take(rt.core(), skipped, 3).into_node();
let rec = rt.subscribe_recorder(windowed);
for v in 1..=10 {
rt.emit_int(source, v);
}
assert_eq!(
rec.data_values(),
vec![TestValue::Int(3), TestValue::Int(4), TestValue::Int(5)]
);
assert!(rec.events().contains(&RecordedEvent::Complete));
}
#[test]
fn flow_registration_type_touch() {
let _ = core::marker::PhantomData::<graphrefly_operators::flow::FlowRegistration>;
let _ = core::marker::PhantomData::<Core>;
}