use noir_compute::operator::source::IteratorSource;
use utils::TestHelper;
mod utils;
#[test]
fn interval_join_keyed_stream() {
TestHelper::local_remote_env(|env| {
let source = IteratorSource::new(0..10);
let source2 = IteratorSource::new(0..10);
let right = env
.stream(source2)
.add_timestamps(|&x| x, |&x, &ts| if x % 2 == 0 { Some(ts) } else { None })
.group_by(|x| x % 2);
let res = env
.stream(source)
.add_timestamps(|&x| x, |&x, &ts| if x % 2 == 0 { Some(ts) } else { None })
.group_by(|x| x % 2)
.interval_join(right, 1, 2)
.collect_vec();
env.execute_blocking();
if let Some(mut res) = res.get() {
let mut expected = Vec::new();
for l in 0..10 {
for r in l - 1..=l + 2 {
if !(0..10).contains(&r) || l % 2 != r % 2 {
continue;
}
expected.push((l % 2, (l, r)));
}
}
expected.sort_unstable();
res.sort_unstable();
assert_eq!(res, expected);
}
});
}
#[test]
fn interval_join_stream() {
TestHelper::local_remote_env(|env| {
let source = IteratorSource::new(0..10);
let source2 = IteratorSource::new(0..10);
let right = env
.stream(source2)
.add_timestamps(|&x| x, |&x, &ts| if x % 2 == 0 { Some(ts) } else { None });
let res = env
.stream(source)
.add_timestamps(|&x| x, |&x, &ts| if x % 2 == 0 { Some(ts) } else { None })
.interval_join(right, 1, 2)
.collect_vec();
env.execute_blocking();
if let Some(mut res) = res.get() {
let mut expected = Vec::new();
for l in 0..10 {
for r in l - 1..=l + 2 {
if !(0..10).contains(&r) {
continue;
}
expected.push((l, r));
}
}
expected.sort_unstable();
res.sort_unstable();
assert_eq!(res, expected);
}
});
}