use std::cell::RefCell;
use std::rc::Rc;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::util::multiset::HashMultiSet;
use hydroflow::{assert_graphvis_snapshots, hydroflow_syntax};
use multiplatform_test::multiplatform_test;
use tokio::sync::mpsc::error::SendError;
#[multiplatform_test]
pub fn test_difference_a() {
let output = <Rc<RefCell<HashMultiSet<usize>>>>::default();
let output_inner = Rc::clone(&output);
let mut df: Hydroflow = hydroflow_syntax! {
a = difference();
source_iter([1, 2, 3, 4]) -> [pos]a;
source_iter([1, 3, 5, 7]) -> [neg]a;
a -> for_each(|x| output_inner.borrow_mut().insert(x));
};
assert_graphvis_snapshots!(df);
df.run_available();
assert_eq!(HashMultiSet::from_iter([2, 4]), output.take());
}
#[multiplatform_test]
pub fn test_difference_b() -> Result<(), SendError<&'static str>> {
let (inp_send, inp_recv) = hydroflow::util::unbounded_channel::<&'static str>();
let output = <Rc<RefCell<HashMultiSet<&'static str>>>>::default();
let output_inner = Rc::clone(&output);
let mut df: Hydroflow = hydroflow_syntax! {
a = difference();
source_stream(inp_recv) -> [pos]a;
b = a -> tee();
b[0] -> defer_tick() -> [neg]a;
b[1] -> for_each(|x| output_inner.borrow_mut().insert(x));
};
assert_graphvis_snapshots!(df);
inp_send.send("01")?;
inp_send.send("02")?;
inp_send.send("03")?;
df.run_tick();
assert_eq!(HashMultiSet::from_iter(["01", "02", "03"]), output.take());
inp_send.send("02")?;
inp_send.send("11")?;
inp_send.send("12")?;
df.run_tick();
assert_eq!(HashMultiSet::from_iter(["11", "12"]), output.take());
inp_send.send("02")?;
inp_send.send("11")?;
inp_send.send("12")?;
df.run_tick();
assert_eq!(HashMultiSet::from_iter(["02"]), output.take());
Ok(())
}
#[multiplatform_test]
pub fn test_tick_loop_1() {
let output = <Rc<RefCell<Vec<usize>>>>::default();
let output_inner = Rc::clone(&output);
let mut df: Hydroflow = hydroflow_syntax! {
a = union() -> tee();
source_iter([1, 3]) -> [0]a;
a[0] -> defer_tick() -> map(|x| 2 * x) -> [1]a;
a[1] -> for_each(|x| output_inner.borrow_mut().push(x));
};
assert_graphvis_snapshots!(df);
df.run_tick();
assert_eq!(&[1, 3], &*output.take());
df.run_tick();
assert_eq!(&[2, 6], &*output.take());
df.run_tick();
assert_eq!(&[4, 12], &*output.take());
df.run_tick();
assert_eq!(&[8, 24], &*output.take());
}
#[multiplatform_test]
pub fn test_tick_loop_2() {
let output = <Rc<RefCell<Vec<usize>>>>::default();
let output_inner = Rc::clone(&output);
let mut df: Hydroflow = hydroflow_syntax! {
a = union() -> tee();
source_iter([1, 3]) -> [0]a;
a[0] -> defer_tick() -> defer_tick() -> map(|x| 2 * x) -> [1]a;
a[1] -> for_each(|x| output_inner.borrow_mut().push(x));
};
assert_graphvis_snapshots!(df);
df.run_tick();
assert_eq!(&[1, 3], &*output.take());
df.run_tick();
assert!(output.take().is_empty());
df.run_tick();
assert_eq!(&[2, 6], &*output.take());
df.run_tick();
assert!(output.take().is_empty());
df.run_tick();
assert_eq!(&[4, 12], &*output.take());
}
#[multiplatform_test]
pub fn test_tick_loop_3() {
let output = <Rc<RefCell<Vec<usize>>>>::default();
let output_inner = Rc::clone(&output);
let mut df: Hydroflow = hydroflow_syntax! {
a = union() -> tee();
source_iter([1, 3]) -> [0]a;
a[0] -> defer_tick() -> defer_tick() -> defer_tick() -> map(|x| 2 * x) -> [1]a;
a[1] -> for_each(|x| output_inner.borrow_mut().push(x));
};
assert_graphvis_snapshots!(df);
df.run_tick();
assert_eq!(&[1, 3], &*output.take());
df.run_tick();
assert!(output.take().is_empty());
df.run_tick();
assert!(output.take().is_empty());
df.run_tick();
assert_eq!(&[2, 6], &*output.take());
df.run_tick();
assert!(output.take().is_empty());
}
#[multiplatform_test]
pub fn test_surface_syntax_graph_unreachability() {
let (pairs_send, pairs_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>();
#[expect(clippy::map_identity, reason = "stratification topology testing")]
let mut df = hydroflow_syntax! {
reached_vertices = union() -> map(|v| (v, ()));
source_iter(vec![0]) -> [0]reached_vertices;
edges = source_stream(pairs_recv) -> tee();
my_join_tee = join() -> map(|(_src, ((), dst))| dst) -> map(|x| x) -> map(|x| x) -> tee();
reached_vertices -> [0]my_join_tee;
edges[1] -> [1]my_join_tee;
my_join_tee[0] -> [1]reached_vertices;
diff = difference() -> for_each(|x| println!("Not reached: {}", x));
edges[0] -> flat_map(|(a, b)| [a, b]) -> [pos]diff;
my_join_tee[1] -> [neg]diff;
};
assert_graphvis_snapshots!(df);
df.run_available();
println!("A");
pairs_send.send((0, 1)).unwrap();
pairs_send.send((2, 4)).unwrap();
pairs_send.send((3, 5)).unwrap();
pairs_send.send((1, 2)).unwrap();
df.run_available();
}
#[multiplatform_test]
pub fn test_subgraph_stratum_consolidation() {
let output = <Rc<RefCell<Vec<usize>>>>::default();
let output_inner = Rc::clone(&output);
let mut df: Hydroflow = hydroflow_syntax! {
a = union() -> tee();
b = union() -> tee();
c = union() -> tee();
d = union() -> for_each(|x| output_inner.borrow_mut().push(x));
source_iter([0]) -> [0]a[0] -> [0]b[0] -> [0]c[0] -> [0]d;
source_iter([1]) -> [1]a[1] -> [1]b[1] -> [1]c[1] -> [1]d;
};
assert_graphvis_snapshots!(df);
df.run_available();
assert_eq!(2 * usize::pow(2, 3), output.take().len());
df.run_available();
assert!(output.take().is_empty());
}
#[multiplatform_test]
pub fn test_defer_lazy() {
let output = <Rc<RefCell<Vec<usize>>>>::default();
let output_inner = Rc::clone(&output);
let mut df: Hydroflow = hydroflow_syntax! {
a = union() -> tee();
source_iter([1, 3]) -> [0]a;
a[0] -> defer_tick_lazy() -> map(|x| 2 * x) -> [1]a;
a[1] -> for_each(|x| output_inner.borrow_mut().push(x));
};
println!(
"{}",
df.meta_graph().unwrap().to_mermaid(&Default::default())
);
assert_graphvis_snapshots!(df);
df.run_available();
assert_eq!(&[1, 3], &*output.take());
df.run_available();
assert_eq!(&[2, 6], &*output.take());
df.run_available();
assert_eq!(&[4, 12], &*output.take());
df.run_available();
assert_eq!(&[8, 24], &*output.take());
}