mod common;
use common::{OpRuntime, RecordedEvent, TestValue};
use graphrefly_operators::{empty, from_iter, never, of, throw_error};
#[test]
fn from_iter_emits_each_handle_then_completes() {
let rt = OpRuntime::new();
let h1 = rt.intern_int(10);
let h2 = rt.intern_int(20);
let h3 = rt.intern_int(30);
let node = from_iter(rt.core(), &rt.producer_binding, vec![h1, h2, h3]);
let rec = rt.subscribe_recorder(node);
let events = rec.events();
assert_eq!(
events,
vec![
RecordedEvent::Start,
RecordedEvent::Dirty,
RecordedEvent::Data(TestValue::Int(10)),
RecordedEvent::Data(TestValue::Int(20)),
RecordedEvent::Data(TestValue::Int(30)),
RecordedEvent::Complete,
]
);
}
#[test]
fn of_emits_values_then_completes() {
let rt = OpRuntime::new();
let h1 = rt.intern_int(42);
let node = of(rt.core(), &rt.producer_binding, vec![h1]);
let rec = rt.subscribe_recorder(node);
let events = rec.events();
assert_eq!(
events,
vec![
RecordedEvent::Start,
RecordedEvent::Dirty,
RecordedEvent::Data(TestValue::Int(42)),
RecordedEvent::Complete,
]
);
}
#[test]
fn from_iter_empty_vec_behaves_like_empty() {
let rt = OpRuntime::new();
let node = from_iter(rt.core(), &rt.producer_binding, vec![]);
let rec = rt.subscribe_recorder(node);
let events = rec.events();
assert_eq!(events, vec![RecordedEvent::Start, RecordedEvent::Complete]);
}
#[test]
fn empty_completes_immediately() {
let rt = OpRuntime::new();
let node = empty(rt.core(), &rt.producer_binding);
let rec = rt.subscribe_recorder(node);
let events = rec.events();
assert_eq!(events, vec![RecordedEvent::Start, RecordedEvent::Complete]);
}
#[test]
fn never_emits_start_only() {
let rt = OpRuntime::new();
let node = never(rt.core(), &rt.producer_binding);
let rec = rt.subscribe_recorder(node);
let events = rec.events();
assert_eq!(events, vec![RecordedEvent::Start]);
}
#[test]
fn never_cleanup_on_last_unsub() {
let rt = OpRuntime::new();
let node = never(rt.core(), &rt.producer_binding);
let rec = rt.subscribe_recorder(node);
assert_eq!(rec.events(), vec![RecordedEvent::Start]);
drop(rec);
let storage = rt.binding.producer_storage();
let guard = storage.lock();
assert!(
!guard.contains_key(&node),
"producer storage should be cleaned up after last unsub"
);
}
#[test]
fn throw_error_emits_error_immediately() {
let rt = OpRuntime::new();
let err_handle = rt.intern_int(999);
let node = throw_error(rt.core(), &rt.producer_binding, err_handle);
let rec = rt.subscribe_recorder(node);
let events = rec.events();
assert_eq!(
events,
vec![
RecordedEvent::Start,
RecordedEvent::Error(TestValue::Int(999)),
]
);
}
#[test]
fn from_iter_data_values_only() {
let rt = OpRuntime::new();
let h1 = rt.intern_int(1);
let h2 = rt.intern_int(2);
let h3 = rt.intern_int(3);
let node = from_iter(rt.core(), &rt.producer_binding, vec![h1, h2, h3]);
let rec = rt.subscribe_recorder(node);
let data = rec.data_values();
assert_eq!(
data,
vec![TestValue::Int(1), TestValue::Int(2), TestValue::Int(3)]
);
}
#[test]
fn from_iter_single_element() {
let rt = OpRuntime::new();
let h = rt.intern_int(7);
let node = from_iter(rt.core(), &rt.producer_binding, vec![h]);
let rec = rt.subscribe_recorder(node);
let data = rec.data_values();
assert_eq!(data, vec![TestValue::Int(7)]);
}
#[test]
fn empty_into_take_completes_with_no_data() {
let rt = OpRuntime::new();
let source = empty(rt.core(), &rt.producer_binding);
let take_reg = graphrefly_operators::take(rt.core(), source, 5);
let rec = rt.subscribe_recorder(take_reg.node);
let events = rec.events();
assert_eq!(events, vec![RecordedEvent::Start, RecordedEvent::Complete]);
}