hydroflow 0.10.0

Hydro's low-level dataflow runtime and IR
Documentation
use std::collections::BTreeSet;

use hydroflow::assert_graphvis_snapshots;
use hydroflow::scheduled::ticks::TickInstant;
use multiplatform_test::multiplatform_test;

#[multiplatform_test]
pub fn test_fold_keyed_infer_basic() {
    pub struct SubordResponse {
        pub xid: &'static str,
        pub mtype: u32,
    }
    let (result_send, mut result_recv) =
        hydroflow::util::unbounded_channel::<(&'static str, u32)>();

    let mut df = hydroflow::hydroflow_syntax! {
        source_iter([
            SubordResponse { xid: "123", mtype: 33 },
            SubordResponse { xid: "123", mtype: 52 },
            SubordResponse { xid: "123", mtype: 72 },
            SubordResponse { xid: "123", mtype: 83 },
            SubordResponse { xid: "123", mtype: 78 },
        ])
            -> map(|m: SubordResponse| (m.xid, m.mtype))
            -> fold_keyed::<'static>(|| 0, |old: &mut u32, val: u32| *old += val)
            -> for_each(|kv| result_send.send(kv).unwrap());
    };
    assert_graphvis_snapshots!(df);
    assert_eq!(
        (TickInstant::new(0), 0),
        (df.current_tick(), df.current_stratum())
    );
    df.run_tick();
    assert_eq!(
        (TickInstant::new(1), 0),
        (df.current_tick(), df.current_stratum())
    );

    df.run_available(); // Should return quickly and not hang

    assert_eq!(
        &[("123", 318), ("123", 318)],
        &*hydroflow::util::collect_ready::<Vec<_>, _>(&mut result_recv)
    );
}

#[multiplatform_test]
pub fn test_fold_keyed_tick() {
    let (items_send, items_recv) = hydroflow::util::unbounded_channel::<(u32, Vec<u32>)>();
    let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::<(u32, Vec<u32>)>();

    let mut df = hydroflow::hydroflow_syntax! {
        source_stream(items_recv)
            -> fold_keyed::<'tick>(Vec::new, |old: &mut Vec<u32>, mut x: Vec<u32>| old.append(&mut x))
            -> for_each(|v| result_send.send(v).unwrap());
    };
    assert_graphvis_snapshots!(df);
    assert_eq!(
        (TickInstant::new(0), 0),
        (df.current_tick(), df.current_stratum())
    );
    df.run_tick();
    assert_eq!(
        (TickInstant::new(1), 0),
        (df.current_tick(), df.current_stratum())
    );

    items_send.send((0, vec![1, 2])).unwrap();
    items_send.send((0, vec![3, 4])).unwrap();
    items_send.send((1, vec![1])).unwrap();
    items_send.send((1, vec![1, 2])).unwrap();
    df.run_tick();

    assert_eq!(
        (TickInstant::new(2), 0),
        (df.current_tick(), df.current_stratum())
    );
    assert_eq!(
        [(0, vec![1, 2, 3, 4]), (1, vec![1, 1, 2])]
            .into_iter()
            .collect::<BTreeSet<_>>(),
        hydroflow::util::collect_ready::<BTreeSet<_>, _>(&mut result_recv)
    );

    items_send.send((0, vec![5, 6])).unwrap();
    items_send.send((0, vec![7, 8])).unwrap();
    items_send.send((1, vec![10])).unwrap();
    items_send.send((1, vec![11, 12])).unwrap();
    df.run_tick();

    assert_eq!(
        (TickInstant::new(3), 0),
        (df.current_tick(), df.current_stratum())
    );
    assert_eq!(
        [(0, vec![5, 6, 7, 8]), (1, vec![10, 11, 12])]
            .into_iter()
            .collect::<BTreeSet<_>>(),
        hydroflow::util::collect_ready::<BTreeSet<_>, _>(&mut result_recv)
    );

    df.run_available(); // Should return quickly and not hang
}

#[multiplatform_test]
pub fn test_fold_keyed_static() {
    let (items_send, items_recv) = hydroflow::util::unbounded_channel::<(u32, Vec<u32>)>();
    let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::<(u32, Vec<u32>)>();

    let mut df = hydroflow::hydroflow_syntax! {
        source_stream(items_recv)
            -> fold_keyed::<'static>(Vec::new, |old: &mut Vec<u32>, mut x: Vec<u32>| old.append(&mut x))
            -> for_each(|v| result_send.send(v).unwrap());
    };
    assert_graphvis_snapshots!(df);
    assert_eq!(
        (TickInstant::new(0), 0),
        (df.current_tick(), df.current_stratum())
    );
    df.run_tick();
    assert_eq!(
        (TickInstant::new(1), 0),
        (df.current_tick(), df.current_stratum())
    );

    items_send.send((0, vec![1, 2])).unwrap();
    items_send.send((0, vec![3, 4])).unwrap();
    items_send.send((1, vec![1])).unwrap();
    items_send.send((1, vec![1, 2])).unwrap();
    df.run_tick();

    assert_eq!(
        (TickInstant::new(2), 0),
        (df.current_tick(), df.current_stratum())
    );
    assert_eq!(
        [(0, vec![1, 2, 3, 4]), (1, vec![1, 1, 2])]
            .into_iter()
            .collect::<BTreeSet<_>>(),
        hydroflow::util::collect_ready::<BTreeSet<_>, _>(&mut result_recv)
    );

    items_send.send((0, vec![5, 6])).unwrap();
    items_send.send((0, vec![7, 8])).unwrap();
    items_send.send((1, vec![10])).unwrap();
    items_send.send((1, vec![11, 12])).unwrap();
    df.run_tick();

    assert_eq!(
        (TickInstant::new(3), 0),
        (df.current_tick(), df.current_stratum())
    );
    assert_eq!(
        [
            (0, vec![1, 2, 3, 4, 5, 6, 7, 8]),
            (1, vec![1, 1, 2, 10, 11, 12])
        ]
        .into_iter()
        .collect::<BTreeSet<_>>(),
        hydroflow::util::collect_ready::<BTreeSet<_>, _>(&mut result_recv)
    );

    df.run_available(); // Should return quickly and not hang
}