hydroflow 0.10.0

Hydro's low-level dataflow runtime and IR
Documentation
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;

use hydroflow::lattices::set_union::SetUnionSingletonSet;
use hydroflow::scheduled::ticks::TickInstant;
use hydroflow::{assert_graphvis_snapshots, hydroflow_syntax};
use lattices::set_union::SetUnionHashSet;
use lattices::Merge;
use multiplatform_test::multiplatform_test;

macro_rules! assert_contains_each_by_tick {
    ($results:expr, $tick:expr, $input:expr) => {{
        for v in $input {
            assert!(
                $results.borrow()[&$tick].contains(v),
                "did not contain: {:?} in {:?}",
                v,
                $results.borrow()[&$tick]
            );
        }
    }};
}

#[multiplatform_test]
pub fn tick_tick_lhs_blocking_rhs_streaming() {
    let results = Rc::new(RefCell::new(HashMap::<TickInstant, Vec<_>>::new()));
    let results_inner = Rc::clone(&results);

    let mut df = hydroflow_syntax! {
        source_iter([(7, 1), (7, 2)])
            -> map(|(k, v)| (k, SetUnionSingletonSet::new_from(v)))
            -> [0]my_join;

        source_iter([(7, 0)]) -> unioner;
        source_iter([(7, 1)]) -> defer_tick() -> unioner;
        source_iter([(7, 2)]) -> defer_tick() -> defer_tick() -> unioner;
        unioner = union()
            -> [1]my_join;

        my_join = join_fused_lhs(Fold(SetUnionHashSet::default, Merge::merge))
            -> for_each(|x| results_inner.borrow_mut().entry(context.current_tick()).or_default().push(x));
    };
    assert_graphvis_snapshots!(df);
    df.run_available();

    assert_contains_each_by_tick!(
        results,
        TickInstant::new(0),
        &[(7, (SetUnionHashSet::new_from([1, 2]), 0))]
    );
}

#[multiplatform_test]
pub fn static_tick_lhs_blocking_rhs_streaming() {
    let results = Rc::new(RefCell::new(HashMap::<TickInstant, Vec<_>>::new()));
    let results_inner = Rc::clone(&results);

    let mut df = hydroflow_syntax! {
        source_iter([(7, 1), (7, 2)])
            -> map(|(k, v)| (k, SetUnionSingletonSet::new_from(v)))
            -> [0]my_join;

        source_iter([(7, 0)]) -> unioner;
        source_iter([(7, 1)]) -> defer_tick() -> unioner;
        source_iter([(7, 2)]) -> defer_tick() -> defer_tick() -> unioner;
        unioner = union()
            -> [1]my_join;

        my_join = join_fused_lhs::<'static, 'tick>(Fold(SetUnionHashSet::default, Merge::merge))
            -> for_each(|x| results_inner.borrow_mut().entry(context.current_tick()).or_default().push(x));
    };
    assert_graphvis_snapshots!(df);
    df.run_available();

    assert_contains_each_by_tick!(
        results,
        TickInstant::new(0),
        &[(7, (SetUnionHashSet::new_from([1, 2]), 0))]
    );
    assert_contains_each_by_tick!(
        results,
        TickInstant::new(1),
        &[(7, (SetUnionHashSet::new_from([1, 2]), 1))]
    );
    assert_contains_each_by_tick!(
        results,
        TickInstant::new(2),
        &[(7, (SetUnionHashSet::new_from([1, 2]), 2))]
    );
}

#[multiplatform_test]
pub fn static_static_lhs_blocking_rhs_streaming() {
    let results = Rc::new(RefCell::new(HashMap::<TickInstant, Vec<_>>::new()));
    let results_inner = Rc::clone(&results);

    let mut df = hydroflow_syntax! {
        source_iter([(7, 1), (7, 2)])
            -> map(|(k, v)| (k, SetUnionSingletonSet::new_from(v)))
            -> [0]my_join;

        source_iter([(7, 0)]) -> unioner;
        source_iter([(7, 1)]) -> defer_tick() -> unioner;
        source_iter([(7, 2)]) -> defer_tick() -> defer_tick() -> unioner;
        unioner = union()
            -> [1]my_join;

        my_join = join_fused_lhs::<'static, 'static>(Fold(SetUnionHashSet::default, Merge::merge))
            -> for_each(|x| results_inner.borrow_mut().entry(context.current_tick()).or_default().push(x));
    };
    assert_graphvis_snapshots!(df);
    df.run_available();

    #[rustfmt::skip]
    {
        assert_contains_each_by_tick!(results, TickInstant::new(0), &[(7, (SetUnionHashSet::new_from([1, 2]), 0))]);
        assert_contains_each_by_tick!(results, TickInstant::new(1), &[(7, (SetUnionHashSet::new_from([1, 2]), 0)), (7, (SetUnionHashSet::new_from([1, 2]), 1))]);
        assert_contains_each_by_tick!(results, TickInstant::new(2), &[(7, (SetUnionHashSet::new_from([1, 2]), 0)), (7, (SetUnionHashSet::new_from([1, 2]), 1)), (7, (SetUnionHashSet::new_from([1, 2]), 2))]);
    };
}

#[multiplatform_test]
pub fn tick_tick_lhs_streaming_rhs_blocking() {
    let results = Rc::new(RefCell::new(HashMap::<TickInstant, Vec<_>>::new()));
    let results_inner = Rc::clone(&results);

    let mut df = hydroflow_syntax! {
        source_iter([(7, 1), (7, 2)])
            -> map(|(k, v)| (k, SetUnionSingletonSet::new_from(v)))
            -> [1]my_join;

        source_iter([(7, 0)]) -> unioner;
        source_iter([(7, 1)]) -> defer_tick() -> unioner;
        source_iter([(7, 2)]) -> defer_tick() -> defer_tick() -> unioner;
        unioner = union()
            -> [0]my_join;

        my_join = join_fused_rhs(Fold(SetUnionHashSet::default, Merge::merge))
            -> for_each(|x| results_inner.borrow_mut().entry(context.current_tick()).or_default().push(x));
    };
    assert_graphvis_snapshots!(df);
    df.run_available();

    assert_contains_each_by_tick!(
        results,
        TickInstant::new(0),
        &[(7, (0, (SetUnionHashSet::new_from([1, 2]))))]
    );
}

#[multiplatform_test]
pub fn static_tick_lhs_streaming_rhs_blocking() {
    let results = Rc::new(RefCell::new(HashMap::<TickInstant, Vec<_>>::new()));
    let results_inner = Rc::clone(&results);

    let mut df = hydroflow_syntax! {
        source_iter([(7, 1), (7, 2)])
            -> map(|(k, v)| (k, SetUnionSingletonSet::new_from(v)))
            -> [1]my_join;

        source_iter([(7, 0)]) -> unioner;
        source_iter([(7, 1)]) -> defer_tick() -> unioner;
        source_iter([(7, 2)]) -> defer_tick() -> defer_tick() -> unioner;
        unioner = union()
            -> [0]my_join;

        my_join = join_fused_rhs::<'static, 'tick>(Fold(SetUnionHashSet::default, Merge::merge))
            -> for_each(|x| results_inner.borrow_mut().entry(context.current_tick()).or_default().push(x));
    };
    assert_graphvis_snapshots!(df);
    df.run_available();

    assert_contains_each_by_tick!(
        results,
        TickInstant::new(0),
        &[(7, (0, SetUnionHashSet::new_from([1, 2])))]
    );
    assert_contains_each_by_tick!(
        results,
        TickInstant::new(1),
        &[(7, (1, SetUnionHashSet::new_from([1, 2])))]
    );
    assert_contains_each_by_tick!(
        results,
        TickInstant::new(2),
        &[(7, (2, SetUnionHashSet::new_from([1, 2])))]
    );
}

#[multiplatform_test]
pub fn static_static_lhs_streaming_rhs_blocking() {
    let results = Rc::new(RefCell::new(HashMap::<TickInstant, Vec<_>>::new()));
    let results_inner = Rc::clone(&results);

    let mut df = hydroflow_syntax! {
        source_iter([(7, 1), (7, 2)])
            -> map(|(k, v)| (k, SetUnionSingletonSet::new_from(v)))
            -> [1]my_join;

        source_iter([(7, 0)]) -> unioner;
        source_iter([(7, 1)]) -> defer_tick() -> unioner;
        source_iter([(7, 2)]) -> defer_tick() -> defer_tick() -> unioner;
        unioner = union()
            -> [0]my_join;

        my_join = join_fused_rhs::<'static, 'static>(Fold(SetUnionHashSet::default, Merge::merge))
            -> inspect(|x| println!("{}, {x:?}", context.current_tick()))
            -> for_each(|x| results_inner.borrow_mut().entry(context.current_tick()).or_default().push(x));
    };
    assert_graphvis_snapshots!(df);
    df.run_available();

    #[rustfmt::skip]
    {
        assert_contains_each_by_tick!(results, TickInstant::new(0), &[(7, (0, SetUnionHashSet::new_from([1, 2])))]);
        assert_contains_each_by_tick!(results, TickInstant::new(1), &[(7, (0, SetUnionHashSet::new_from([1, 2]))), (7, (1, SetUnionHashSet::new_from([1, 2])))]);
        assert_contains_each_by_tick!(results, TickInstant::new(2), &[(7, (0, SetUnionHashSet::new_from([1, 2]))), (7, (1, SetUnionHashSet::new_from([1, 2]))), (7, (2, SetUnionHashSet::new_from([1, 2])))]);
    };
}

#[multiplatform_test]
pub fn tick_tick_lhs_fold_rhs_reduce() {
    let results = Rc::new(RefCell::new(HashMap::<TickInstant, Vec<_>>::new()));
    let results_inner = Rc::clone(&results);

    let mut df = hydroflow_syntax! {
        source_iter([(7, 1), (7, 2)])
            -> map(|(k, v)| (k, SetUnionSingletonSet::new_from(v)))
            -> [0]my_join;

        source_iter([(7, 0)]) -> unioner;
        source_iter([(7, 1)]) -> defer_tick() -> unioner;
        source_iter([(7, 2)]) -> defer_tick() -> defer_tick() -> unioner;
        unioner = union()
            -> [1]my_join;

        my_join = join_fused(Fold(SetUnionHashSet::default, Merge::merge), Reduce(std::ops::AddAssign::add_assign))
            -> for_each(|x| results_inner.borrow_mut().entry(context.current_tick()).or_default().push(x));
    };
    assert_graphvis_snapshots!(df);
    df.run_available();

    assert_contains_each_by_tick!(
        results,
        TickInstant::new(0),
        &[(7, (SetUnionHashSet::new_from([1, 2]), 0))]
    );
}