use dfir_rs::dfir_syntax;
use dfir_rs::util::{collect_ready, unbounded_channel};
use multiplatform_test::multiplatform_test;
fn assert_key_order_values_unordered<
K: Eq + std::fmt::Debug + Clone,
V: Ord + std::fmt::Debug + Clone,
>(
actual: &[(K, V)],
expected_keys_in_order: &[K],
expected_values_per_key: &[(K, Vec<V>)],
) {
let actual_keys: Vec<&K> = actual.iter().map(|(k, _)| k).collect();
let mut expected_key_iter = expected_keys_in_order.iter();
let mut cur_expected = expected_key_iter.next();
for actual_key in &actual_keys {
if cur_expected.is_some_and(|k| k != *actual_key) {
cur_expected = expected_key_iter.next();
assert_eq!(
cur_expected,
Some(*actual_key),
"key order mismatch: got {actual_keys:?}, expected order {expected_keys_in_order:?}"
);
}
}
for (key, expected_vals) in expected_values_per_key {
let mut actual_vals: Vec<&V> = actual
.iter()
.filter(|(k, _)| k == key)
.map(|(_, v)| v)
.collect();
actual_vals.sort();
let mut sorted_expected = expected_vals.clone();
sorted_expected.sort();
assert_eq!(
actual_vals,
sorted_expected.iter().collect::<Vec<_>>(),
"values mismatch for key {key:?}"
);
}
}
#[multiplatform_test]
pub fn test_join_multiset_half_basic() {
let (build_send, build_recv) = unbounded_channel::<(&str, char)>();
let (probe_send, probe_recv) = unbounded_channel::<(&str, i32)>();
let (out_send, mut out_recv) = unbounded_channel::<(&str, (i32, char))>();
let mut flow = dfir_syntax! {
source_stream(build_recv) -> [build]my_join;
source_stream(probe_recv) -> [probe]my_join;
my_join = join_multiset_half() -> for_each(|x| out_send.send(x).unwrap());
};
build_send.send(("cat", 'x')).unwrap();
build_send.send(("dog", 'y')).unwrap();
probe_send.send(("cat", 1)).unwrap();
probe_send.send(("dog", 2)).unwrap();
probe_send.send(("cat", 3)).unwrap();
flow.run_tick_sync();
let out: Vec<_> = collect_ready(&mut out_recv);
assert_key_order_values_unordered(
&out,
&["cat", "dog", "cat"],
&[("cat", vec![(1, 'x'), (3, 'x')]), ("dog", vec![(2, 'y')])],
);
}
#[multiplatform_test]
pub fn test_join_multiset_half_preserves_probe_order() {
let (build_send, build_recv) = unbounded_channel::<(i32, char)>();
let (probe_send, probe_recv) = unbounded_channel::<(i32, i32)>();
let (out_send, mut out_recv) = unbounded_channel::<(i32, (i32, char))>();
let mut flow = dfir_syntax! {
source_stream(build_recv) -> [build]my_join;
source_stream(probe_recv) -> [probe]my_join;
my_join = join_multiset_half() -> for_each(|x| out_send.send(x).unwrap());
};
build_send.send((1, 'a')).unwrap();
build_send.send((2, 'b')).unwrap();
probe_send.send((2, 20)).unwrap();
probe_send.send((1, 10)).unwrap();
flow.run_tick_sync();
let out: Vec<_> = collect_ready(&mut out_recv);
assert_eq!(out, vec![(2, (20, 'b')), (1, (10, 'a'))]);
}
#[multiplatform_test]
pub fn test_join_multiset_half_no_match() {
let (build_send, build_recv) = unbounded_channel::<(i32, char)>();
let (probe_send, probe_recv) = unbounded_channel::<(i32, i32)>();
let (out_send, mut out_recv) = unbounded_channel::<(i32, (i32, char))>();
let mut flow = dfir_syntax! {
source_stream(build_recv) -> [build]my_join;
source_stream(probe_recv) -> [probe]my_join;
my_join = join_multiset_half() -> for_each(|x| out_send.send(x).unwrap());
};
build_send.send((1, 'a')).unwrap();
probe_send.send((2, 20)).unwrap();
flow.run_tick_sync();
let out: Vec<_> = collect_ready(&mut out_recv);
assert_eq!(out, vec![]);
}
#[multiplatform_test]
pub fn test_join_multiset_half_multiple_build_values() {
let (build_send, build_recv) = unbounded_channel::<(i32, char)>();
let (probe_send, probe_recv) = unbounded_channel::<(i32, i32)>();
let (out_send, mut out_recv) = unbounded_channel::<(i32, (i32, char))>();
let mut flow = dfir_syntax! {
source_stream(build_recv) -> [build]my_join;
source_stream(probe_recv) -> [probe]my_join;
my_join = join_multiset_half() -> for_each(|x| out_send.send(x).unwrap());
};
build_send.send((1, 'a')).unwrap();
build_send.send((1, 'b')).unwrap();
probe_send.send((1, 10)).unwrap();
flow.run_tick_sync();
let out: Vec<_> = collect_ready(&mut out_recv);
assert_key_order_values_unordered(&out, &[1], &[(1, vec![(10, 'a'), (10, 'b')])]);
}
#[multiplatform_test]
pub fn test_join_multiset_half_tick_static() {
let (build_send, build_recv) = unbounded_channel::<(i32, char)>();
let (probe_send, probe_recv) = unbounded_channel::<(i32, i32)>();
let (out_send, mut out_recv) = unbounded_channel::<(i32, (i32, char))>();
let mut flow = dfir_syntax! {
source_stream(build_recv) -> [build]my_join;
source_stream(probe_recv) -> [probe]my_join;
my_join = join_multiset_half::<'static, 'tick>() -> for_each(|x| out_send.send(x).unwrap());
};
build_send.send((1, 'a')).unwrap();
probe_send.send((1, 10)).unwrap();
flow.run_tick_sync();
let out: Vec<_> = collect_ready(&mut out_recv);
assert_key_order_values_unordered(&out, &[1], &[(1, vec![(10, 'a')])]);
probe_send.send((1, 20)).unwrap();
flow.run_tick_sync();
let out: Vec<_> = collect_ready(&mut out_recv);
assert_key_order_values_unordered(&out, &[1], &[(1, vec![(20, 'a')])]);
}
#[multiplatform_test]
pub fn test_join_multiset_half_probe_does_not_persist() {
let (build_send, build_recv) = unbounded_channel::<(i32, char)>();
let (probe_send, probe_recv) = unbounded_channel::<(i32, i32)>();
let (out_send, mut out_recv) = unbounded_channel::<(i32, (i32, char))>();
let mut flow = dfir_syntax! {
source_stream(build_recv) -> [build]my_join;
source_stream(probe_recv) -> [probe]my_join;
my_join = join_multiset_half::<'static, 'tick>() -> for_each(|x| out_send.send(x).unwrap());
};
build_send.send((1, 'a')).unwrap();
probe_send.send((1, 10)).unwrap();
flow.run_tick_sync();
let out: Vec<_> = collect_ready(&mut out_recv);
assert_eq!(out, vec![(1, (10, 'a'))]);
build_send.send((1, 'b')).unwrap();
flow.run_tick_sync();
let out: Vec<_> = collect_ready(&mut out_recv);
assert_eq!(out, vec![]);
}