mod common;
use std::time::Duration;
use common::{OpRuntime, RecordedEvent, TestValue};
use graphrefly_operators::temporal::{self, ThrottleOpts};
async fn multi_yield(n: usize) {
for _ in 0..n {
tokio::task::yield_now().await;
}
}
#[tokio::test]
async fn sample_emits_source_latest_on_notifier_data() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let notifier = rt.state_int(None);
let sampled = temporal::sample(rt.core(), &rt.producer_binding, source, notifier);
let rec = rt.subscribe_recorder(sampled);
rt.emit_int(source, 10);
rt.emit_int(source, 20);
rt.emit_int(notifier, 1);
rt.settle();
let data = rec.data_values();
assert_eq!(data, vec![TestValue::Int(20)]);
}
#[tokio::test]
async fn sample_no_emit_if_source_completed() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let notifier = rt.state_int(None);
let sampled = temporal::sample(rt.core(), &rt.producer_binding, source, notifier);
let rec = rt.subscribe_recorder(sampled);
rt.emit_int(source, 10);
rt.core().complete(source);
rt.emit_int(notifier, 1);
let data = rec.data_values();
assert!(data.is_empty(), "no data after source complete");
}
#[tokio::test]
async fn sample_completes_on_notifier_complete() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let notifier = rt.state_int(None);
let sampled = temporal::sample(rt.core(), &rt.producer_binding, source, notifier);
let rec = rt.subscribe_recorder(sampled);
rt.emit_int(source, 10);
rt.core().complete(notifier);
rt.settle();
assert!(
rec.events().contains(&RecordedEvent::Complete),
"sample should complete when notifier completes"
);
}
#[tokio::test]
async fn debounce_emits_after_quiet() {
tokio::time::pause();
let rt = OpRuntime::new();
let source = rt.state_int(None);
let debounced = temporal::debounce(rt.core(), &rt.producer_binding, source, 50);
let rec = rt.subscribe_recorder(debounced);
rt.emit_int(source, 10);
multi_yield(5).await;
tokio::time::advance(Duration::from_millis(30)).await;
multi_yield(5).await;
rt.settle(); assert!(rec.data_values().is_empty(), "nothing before deadline");
tokio::time::advance(Duration::from_millis(25)).await;
multi_yield(10).await;
rt.settle();
assert_eq!(rec.data_values(), vec![TestValue::Int(10)]);
}
#[tokio::test]
async fn debounce_resets_on_new_data() {
tokio::time::pause();
let rt = OpRuntime::new();
let source = rt.state_int(None);
let debounced = temporal::debounce(rt.core(), &rt.producer_binding, source, 50);
let rec = rt.subscribe_recorder(debounced);
rt.emit_int(source, 10);
multi_yield(5).await;
tokio::time::advance(Duration::from_millis(30)).await;
multi_yield(5).await;
rt.emit_int(source, 20);
multi_yield(5).await;
tokio::time::advance(Duration::from_millis(30)).await;
multi_yield(5).await;
rt.settle(); assert!(rec.data_values().is_empty(), "timer reset, not yet fired");
tokio::time::advance(Duration::from_millis(25)).await;
multi_yield(10).await;
rt.settle();
assert_eq!(rec.data_values(), vec![TestValue::Int(20)], "emits latest");
}
#[tokio::test]
async fn debounce_flushes_on_complete() {
tokio::time::pause();
let rt = OpRuntime::new();
let source = rt.state_int(None);
let debounced = temporal::debounce(rt.core(), &rt.producer_binding, source, 50);
let rec = rt.subscribe_recorder(debounced);
rt.emit_int(source, 10);
multi_yield(5).await;
rt.core().complete(source);
multi_yield(10).await;
rt.settle();
let events = rec.events();
assert!(
events.contains(&RecordedEvent::Data(TestValue::Int(10))),
"pending value flushed on complete"
);
assert!(
events.contains(&RecordedEvent::Complete),
"complete propagated"
);
}
#[tokio::test]
async fn delay_emits_after_duration() {
tokio::time::pause();
let rt = OpRuntime::new();
let source = rt.state_int(None);
let delayed = temporal::delay(rt.core(), &rt.producer_binding, source, 100);
let rec = rt.subscribe_recorder(delayed);
rt.emit_int(source, 42);
multi_yield(5).await;
tokio::time::advance(Duration::from_millis(50)).await;
multi_yield(5).await;
rt.settle(); assert!(rec.data_values().is_empty());
tokio::time::advance(Duration::from_millis(55)).await;
multi_yield(10).await;
rt.settle();
assert_eq!(rec.data_values(), vec![TestValue::Int(42)]);
}
#[tokio::test]
async fn delay_multiple_in_flight() {
tokio::time::pause();
let rt = OpRuntime::new();
let source = rt.state_int(None);
let delayed = temporal::delay(rt.core(), &rt.producer_binding, source, 100);
let rec = rt.subscribe_recorder(delayed);
rt.emit_int(source, 1);
multi_yield(5).await;
tokio::time::advance(Duration::from_millis(30)).await;
multi_yield(5).await;
rt.emit_int(source, 2);
multi_yield(5).await;
tokio::time::advance(Duration::from_millis(75)).await;
multi_yield(10).await;
rt.settle(); assert_eq!(rec.data_values(), vec![TestValue::Int(1)]);
tokio::time::advance(Duration::from_millis(30)).await;
multi_yield(10).await;
rt.settle(); assert_eq!(
rec.data_values(),
vec![TestValue::Int(1), TestValue::Int(2)]
);
}
#[tokio::test]
async fn audit_emits_latest_after_window() {
tokio::time::pause();
let rt = OpRuntime::new();
let source = rt.state_int(None);
let audited = temporal::audit(rt.core(), &rt.producer_binding, source, 50);
let rec = rt.subscribe_recorder(audited);
rt.emit_int(source, 10);
multi_yield(5).await;
tokio::time::advance(Duration::from_millis(20)).await;
multi_yield(5).await;
rt.emit_int(source, 20);
multi_yield(5).await;
tokio::time::advance(Duration::from_millis(10)).await;
multi_yield(5).await;
rt.settle(); assert!(rec.data_values().is_empty(), "window hasn't closed yet");
tokio::time::advance(Duration::from_millis(25)).await;
multi_yield(10).await;
rt.settle();
assert_eq!(rec.data_values(), vec![TestValue::Int(20)]);
}
#[tokio::test]
async fn throttle_leading_emits_first_then_drops() {
tokio::time::pause();
let rt = OpRuntime::new();
let source = rt.state_int(None);
let throttled = temporal::throttle(
rt.core(),
&rt.producer_binding,
source,
100,
ThrottleOpts::default(), );
let rec = rt.subscribe_recorder(throttled);
rt.emit_int(source, 1);
multi_yield(5).await;
rt.settle(); assert_eq!(rec.data_values(), vec![TestValue::Int(1)]);
rt.emit_int(source, 2);
multi_yield(5).await;
rt.settle(); assert_eq!(
rec.data_values(),
vec![TestValue::Int(1)],
"second value dropped"
);
tokio::time::advance(Duration::from_millis(105)).await;
multi_yield(10).await;
rt.emit_int(source, 3);
multi_yield(5).await;
rt.settle(); assert_eq!(
rec.data_values(),
vec![TestValue::Int(1), TestValue::Int(3)]
);
}
#[tokio::test]
async fn throttle_trailing_emits_at_window_end() {
tokio::time::pause();
let rt = OpRuntime::new();
let source = rt.state_int(None);
let throttled = temporal::throttle(
rt.core(),
&rt.producer_binding,
source,
100,
ThrottleOpts {
leading: true,
trailing: true,
},
);
let rec = rt.subscribe_recorder(throttled);
rt.emit_int(source, 1);
multi_yield(5).await;
rt.settle(); assert_eq!(rec.data_values(), vec![TestValue::Int(1)]);
rt.emit_int(source, 2);
multi_yield(5).await;
rt.emit_int(source, 3);
multi_yield(5).await;
tokio::time::advance(Duration::from_millis(105)).await;
multi_yield(10).await;
rt.settle();
assert_eq!(
rec.data_values(),
vec![TestValue::Int(1), TestValue::Int(3)]
);
}
#[tokio::test]
async fn interval_emits_incrementing_counter() {
tokio::time::pause();
let rt = OpRuntime::new();
let iv = temporal::interval(rt.core(), &rt.producer_binding, 50);
let emitted = std::sync::Arc::new(std::sync::Mutex::new(
Vec::<graphrefly_core::HandleId>::new(),
));
let em = emitted.clone();
let _sub = rt.core().subscribe(
iv,
std::sync::Arc::new(move |msgs: &[graphrefly_core::Message]| {
for &m in msgs {
if let graphrefly_core::Message::Data(h) = m {
em.lock().unwrap().push(h);
}
}
}),
);
multi_yield(10).await;
tokio::time::advance(Duration::from_millis(55)).await;
multi_yield(20).await;
tokio::time::advance(Duration::from_millis(55)).await;
multi_yield(20).await;
tokio::time::advance(Duration::from_millis(55)).await;
multi_yield(20).await;
rt.settle();
let handles = emitted.lock().unwrap().clone();
assert!(
handles.len() >= 2,
"expected at least 2 interval ticks, got {}",
handles.len()
);
if handles.len() >= 2 {
assert_ne!(handles[0], handles[1], "handles should be distinct");
}
}
#[tokio::test]
async fn debounce_error_releases_pending() {
tokio::time::pause();
let rt = OpRuntime::new();
let source = rt.state_int(None);
let debounced = temporal::debounce(rt.core(), &rt.producer_binding, source, 50);
let rec = rt.subscribe_recorder(debounced);
rt.emit_int(source, 10);
multi_yield(5).await;
rt.core().error(source, rt.intern_int(99));
multi_yield(10).await;
rt.settle();
let events = rec.events();
assert!(
events.contains(&RecordedEvent::Error(TestValue::Int(99))),
"error should propagate"
);
assert!(
!events.contains(&RecordedEvent::Data(TestValue::Int(10))),
"pending should not emit on error"
);
}
#[tokio::test]
async fn delay_error_releases_all_pending() {
tokio::time::pause();
let rt = OpRuntime::new();
let source = rt.state_int(None);
let delayed = temporal::delay(rt.core(), &rt.producer_binding, source, 100);
let rec = rt.subscribe_recorder(delayed);
rt.emit_int(source, 1);
multi_yield(5).await;
rt.emit_int(source, 2);
multi_yield(5).await;
rt.core().error(source, rt.intern_int(99));
multi_yield(10).await;
rt.settle();
let events = rec.events();
assert!(
events.contains(&RecordedEvent::Error(TestValue::Int(99))),
"error should propagate"
);
assert!(
events
.iter()
.filter(|e| matches!(e, RecordedEvent::Data(_)))
.count()
== 0,
"no data should emit after error"
);
}
#[tokio::test]
async fn sample_notifier_data_then_complete_in_batch() {
let rt = OpRuntime::new();
let source = rt.state_int(None);
let notifier = rt.state_int(None);
let sampled = temporal::sample(rt.core(), &rt.producer_binding, source, notifier);
let rec = rt.subscribe_recorder(sampled);
rt.emit_int(source, 42);
rt.core().batch(|| {
rt.emit_int(notifier, 1);
rt.core().complete(notifier);
});
rt.settle();
let events = rec.events();
assert!(
events.contains(&RecordedEvent::Data(TestValue::Int(42))),
"should emit source latest on notifier DATA"
);
assert!(
events.contains(&RecordedEvent::Complete),
"should complete after notifier completes"
);
}