use std::collections::HashSet;
use hydroflow::util::collect_ready;
use hydroflow_macro::hydroflow_syntax;
use lattices::collections::SingletonMap;
use lattices::DeepReveal;
use multiplatform_test::multiplatform_test;
#[multiplatform_test]
pub fn test_lattice_join_fused_join_reducing_behavior() {
let mut df = hydroflow_syntax! {
use lattices::Min;
use lattices::Max;
source_iter([(7, Min::new(5)), (7, Min::new(6))]) -> [0]my_join;
source_iter([(7, Max::new(5)), (7, Max::new(6))]) -> [1]my_join;
my_join = _lattice_join_fused_join::<Min<usize>, Max<usize>>()
-> map(DeepReveal::deep_reveal)
-> assert_eq([
SingletonMap(7, (5, 6))
]);
};
df.run_available();
}
#[multiplatform_test]
pub fn test_lattice_join_fused_join_set_union() {
let mut df = hydroflow_syntax! {
use lattices::set_union::SetUnionSingletonSet;
use lattices::set_union::SetUnionHashSet;
source_iter([(7, SetUnionHashSet::new_from([5])), (7, SetUnionHashSet::new_from([6]))]) -> [0]my_join;
source_iter([(7, SetUnionSingletonSet::new_from(5)), (7, SetUnionSingletonSet::new_from(6))]) -> [1]my_join;
my_join = _lattice_join_fused_join::<SetUnionHashSet<usize>, SetUnionHashSet<usize>>()
-> map(DeepReveal::deep_reveal)
-> assert_eq([SingletonMap(7, (HashSet::from_iter([5, 6]), HashSet::from_iter([5, 6])))]);
};
df.run_available();
}
#[multiplatform_test]
pub fn test_lattice_join_fused_join_map_union() {
let mut df = hydroflow_syntax! {
use lattices::map_union::MapUnionSingletonMap;
use lattices::map_union::MapUnionHashMap;
use hydroflow::lattices::Min;
source_iter([(7, MapUnionHashMap::new_from([(3, Min::new(4))])), (7, MapUnionHashMap::new_from([(3, Min::new(5))]))]) -> [0]my_join;
source_iter([(7, MapUnionSingletonMap::new_from((3, Min::new(5)))), (7, MapUnionSingletonMap::new_from((3, Min::new(4))))]) -> [1]my_join;
my_join = _lattice_join_fused_join::<MapUnionHashMap<usize, Min<usize>>, MapUnionHashMap<usize, Min<usize>>>()
-> map(DeepReveal::deep_reveal)
-> assert(|SingletonMap(_k, (mu1, mu2))| mu1.len() == 1 && mu2.len() == 1)
-> map(|SingletonMap(k, (mu1, mu2))| {
let v1 = mu1.into_iter().next().unwrap();
let v2 = mu2.into_iter().next().unwrap();
(k, (v1, v2))
})
-> assert_eq([
(7, ((3, 4), (3, 4)))
]);
};
df.run_available();
}
#[multiplatform_test]
pub fn test_lattice_join_fused_join() {
use hydroflow::lattices::Max;
{
let (out_tx, mut out_rx) =
hydroflow::util::unbounded_channel::<SingletonMap<usize, (usize, usize)>>();
let (lhs_tx, lhs_rx) = hydroflow::util::unbounded_channel::<(usize, Max<usize>)>();
let (rhs_tx, rhs_rx) = hydroflow::util::unbounded_channel::<(usize, Max<usize>)>();
let mut df = hydroflow_syntax! {
my_join = _lattice_join_fused_join::<'static, 'tick, Max<usize>, Max<usize>>();
source_stream(lhs_rx) -> [0]my_join;
source_stream(rhs_rx) -> [1]my_join;
my_join
-> map(DeepReveal::deep_reveal)
-> for_each(|v| out_tx.send(v).unwrap());
};
lhs_tx.send((7, Max::new(3))).unwrap();
lhs_tx.send((7, Max::new(4))).unwrap();
rhs_tx.send((7, Max::new(6))).unwrap();
rhs_tx.send((7, Max::new(5))).unwrap();
df.run_tick();
let out: Vec<_> = collect_ready(&mut out_rx);
assert_eq!(out, [SingletonMap(7, (4, 6))]);
rhs_tx.send((7, Max::new(6))).unwrap();
rhs_tx.send((7, Max::new(5))).unwrap();
df.run_tick();
let out: Vec<_> = collect_ready(&mut out_rx);
assert_eq!(out, [SingletonMap(7, (4, 6))]);
}
{
let (out_tx, mut out_rx) =
hydroflow::util::unbounded_channel::<SingletonMap<usize, (usize, usize)>>();
let (lhs_tx, lhs_rx) = hydroflow::util::unbounded_channel::<(usize, Max<usize>)>();
let (rhs_tx, rhs_rx) = hydroflow::util::unbounded_channel::<(usize, Max<usize>)>();
let mut df = hydroflow_syntax! {
my_join = _lattice_join_fused_join::<'static, 'static, Max<usize>, Max<usize>>();
source_stream(lhs_rx) -> [0]my_join;
source_stream(rhs_rx) -> [1]my_join;
my_join
-> map(DeepReveal::deep_reveal)
-> for_each(|v| out_tx.send(v).unwrap());
};
lhs_tx.send((7, Max::new(3))).unwrap();
lhs_tx.send((7, Max::new(4))).unwrap();
rhs_tx.send((7, Max::new(6))).unwrap();
rhs_tx.send((7, Max::new(5))).unwrap();
df.run_tick();
let out: Vec<_> = collect_ready(&mut out_rx);
assert_eq!(out, [SingletonMap(7, (4, 6))]);
lhs_tx.send((7, Max::new(4))).unwrap();
rhs_tx.send((7, Max::new(5))).unwrap();
df.run_tick();
let out: Vec<_> = collect_ready(&mut out_rx);
assert_eq!(out, [SingletonMap(7, (4, 6)), SingletonMap(7, (4, 6))]);
}
}