hydroflow 0.10.0

Hydro's low-level dataflow runtime and IR
Documentation
use hydroflow::hydroflow_syntax;
use hydroflow::lattices::{IsTop, Max, Merge};
use hydroflow::util::collect_ready;

#[test]
fn test_basic() {
    let mut df = hydroflow_syntax! {
        source_iter([1,2,3,4,5])
            -> map(Max::new)
            -> lattice_fold::<'static>(|| Max::<u32>::new(0))
            -> for_each(|x: Max<u32>| println!("Least upper bound: {:?}", x));
    };
    df.run_available();
}

#[test]
fn test_fold_loop() {
    let (output_send, output_recv) = hydroflow::util::unbounded_channel::<u8>();
    let mut df = hydroflow_syntax! {
        start = source_iter([1])
            -> map(Max::new)
            -> folder;
        folder = union()
            -> fold::<'static>(|| Max::<u8>::new(0), |accum, x| { accum.merge(x); })
            -> map(|x| Max::<u8>::new(x.into_reveal() + 1))
            -> filter(|x| !x.is_top())
            -> tee();
        folder -> folder;
        folder -> inspect(|v| println!("{:?}", v))
            -> for_each(|v: Max<u8>| output_send.send(*v.as_reveal_ref()).unwrap());
    };
    df.run_tick();
    assert_eq!(&[2], &*collect_ready::<Vec<_>, _>(output_recv));
}

#[test]
fn test_lattice_fold_loop() {
    let (output_send, output_recv) = hydroflow::util::unbounded_channel::<u8>();
    let mut df = hydroflow_syntax! {
        start = source_iter([1])
            -> map(Max::<u8>::new)
            -> folder;
        folder = union()
            -> lattice_fold::<'static>(|| Max::<u8>::new(0))
            -> map(|x| Max::<u8>::new(x.into_reveal() + 1))
            -> filter(|x| !x.is_top())
            -> tee();
        folder -> folder;
        folder
            -> for_each(|v: Max<u8>| output_send.send(*v.as_reveal_ref()).unwrap());
    };
    df.run_tick();
    assert_eq!(
        &(2..=254).collect::<Vec<u8>>(),
        &*collect_ready::<Vec<_>, _>(output_recv)
    );
}