use noir_compute::operator::source::IteratorSource;
use noir_compute::operator::window::EventTimeWindow;
use super::utils::TestHelper;
#[test]
fn window_join() {
TestHelper::local_remote_env(|env| {
let source1 = IteratorSource::new(0..10);
let source2 = IteratorSource::new(0..10);
let stream1 = env
.stream(source1)
.add_timestamps(|&x| x, |&x, &ts| if x % 2 == 1 { Some(ts) } else { None })
.shuffle()
.group_by(|x| x % 2);
let stream2 = env
.stream(source2)
.add_timestamps(|&x| x, |&x, &ts| if x % 2 == 0 { Some(ts) } else { None })
.shuffle()
.group_by(|x| x % 2)
.map(|(_, x)| ('a'..='z').nth(x as usize).unwrap());
let res = stream1
.window(EventTimeWindow::tumbling(3))
.join(stream2)
.collect_vec();
env.execute_blocking();
if let Some(mut res) = res.get() {
res.sort_unstable();
let windows = vec![
vec![0, 2],
vec![4],
vec![6, 8],
vec![1],
vec![3, 5],
vec![7],
vec![9],
];
let mut expected = Vec::new();
for window in windows.into_iter() {
for &x in window.iter() {
for &y in window.iter() {
expected.push((x % 2, (x, ('a'..='z').nth(y as usize).unwrap())));
}
}
}
expected.sort_unstable();
assert_eq!(res, expected);
}
});
}
#[test]
fn window_all_join() {
TestHelper::local_remote_env(|env| {
let source1 = IteratorSource::new(0..10);
let source2 = IteratorSource::new(0..10);
let stream1 = env
.stream(source1)
.add_timestamps(|&x| x, |&x, &ts| if x % 2 == 1 { Some(ts) } else { None })
.shuffle();
let stream2 = env
.stream(source2)
.add_timestamps(|&x| x, |&x, &ts| if x % 2 == 0 { Some(ts) } else { None })
.shuffle()
.map(|x| ('a'..='z').nth(x as usize).unwrap());
let res = stream1
.window_all(EventTimeWindow::tumbling(3))
.join(stream2)
.collect_vec();
env.execute_blocking();
if let Some(mut res) = res.get() {
res.sort_unstable();
let windows = vec![vec![0, 1, 2], vec![3, 4, 5], vec![6, 7, 8], vec![9]];
let mut expected = Vec::new();
for window in windows.into_iter() {
for &x in window.iter() {
for &y in window.iter() {
expected.push((x, ('a'..='z').nth(y as usize).unwrap()));
}
}
}
expected.sort_unstable();
assert_eq!(res, expected);
}
});
}
#[test]
fn session_window_join() {
TestHelper::local_remote_env(|env| {
let source1 = IteratorSource::new(vec![0, 1, 2, 6, 7, 8].into_iter());
let source2 = IteratorSource::new(vec![1, 3, 6, 9, 10, 11].into_iter());
let stream1 = env
.stream(source1)
.add_timestamps(|&x| x, |&x, &ts| if x % 2 == 1 { Some(ts) } else { None })
.shuffle()
.group_by(|x| x % 2);
let stream2 = env
.stream(source2)
.add_timestamps(|&x| x, |&x, &ts| if x % 2 == 0 { Some(ts) } else { None })
.shuffle()
.group_by(|x| x % 2);
let res = stream1
.window(EventTimeWindow::session(3))
.join(stream2)
.collect_vec();
env.execute_blocking();
if let Some(mut res) = res.get() {
let mut expected = vec![
(0, (6, 6)),
(0, (6, 10)),
(0, (8, 6)),
(0, (8, 10)),
(1, (1, 1)),
(1, (1, 3)),
(1, (7, 9)),
(1, (7, 11)),
];
expected.sort_unstable();
res.sort_unstable();
assert_eq!(res, expected);
}
});
}