hydroflow 0.10.0

Hydro's low-level dataflow runtime and IR
Documentation
use std::error::Error;
use std::time::Duration;

use hydroflow::scheduled::ticks::{TickDuration, TickInstant};
use hydroflow::{assert_graphvis_snapshots, hydroflow_syntax, rassert_eq};
use multiplatform_test::multiplatform_test;
use tokio::time::timeout;

#[multiplatform_test(test, wasm, env_tracing)]
pub fn test_stratum_loop() {
    let (out_send, mut out_recv) = hydroflow::util::unbounded_channel::<TickInstant>();

    let mut df = hydroflow_syntax! {
        source_iter([TickInstant::new(0)]) -> union_tee;
        union_tee = union() -> tee();
        union_tee -> map(|n| n + TickDuration::SINGLE_TICK) -> filter(|&n| n < TickInstant::new(10)) -> next_stratum() -> union_tee;
        union_tee -> for_each(|v| out_send.send(v).unwrap());
    };
    assert_graphvis_snapshots!(df);
    df.run_available();

    assert_eq!(
        &[
            TickInstant::new(0),
            TickInstant::new(1),
            TickInstant::new(2),
            TickInstant::new(3),
            TickInstant::new(4),
            TickInstant::new(5),
            TickInstant::new(6),
            TickInstant::new(7),
            TickInstant::new(8),
            TickInstant::new(9)
        ],
        &*hydroflow::util::collect_ready::<Vec<_>, _>(&mut out_recv)
    );
    assert_eq!(
        (TickInstant::new(11), 0),
        (df.current_tick(), df.current_stratum())
    );
}

#[multiplatform_test(test, wasm, env_tracing)]
pub fn test_tick_loop() {
    let (out_send, mut out_recv) = hydroflow::util::unbounded_channel::<TickInstant>();

    let mut df = hydroflow_syntax! {
        source_iter([TickInstant::new(0)]) -> union_tee;
        union_tee = union() -> tee();
        union_tee -> map(|n| n + TickDuration::SINGLE_TICK) -> filter(|&n| n < TickInstant::new(10)) -> defer_tick() -> union_tee;
        union_tee -> for_each(|v| out_send.send(v).unwrap());
    };
    assert_graphvis_snapshots!(df);
    df.run_available();

    assert_eq!(
        &[
            TickInstant::new(0),
            TickInstant::new(1),
            TickInstant::new(2),
            TickInstant::new(3),
            TickInstant::new(4),
            TickInstant::new(5),
            TickInstant::new(6),
            TickInstant::new(7),
            TickInstant::new(8),
            TickInstant::new(9)
        ],
        &*hydroflow::util::collect_ready::<Vec<_>, _>(&mut out_recv)
    );
    assert_eq!(
        (TickInstant::new(10), 0),
        (df.current_tick(), df.current_stratum())
    );
}

#[multiplatform_test(hydroflow, env_tracing)]
async fn test_persist_stratum_run_available() -> Result<(), Box<dyn Error>> {
    let (out_send, out_recv) = hydroflow::util::unbounded_channel();

    let mut df = hydroflow_syntax! {
        a = source_iter([0])
            -> persist::<'static>()
            -> next_stratum()
            -> for_each(|x| out_send.send(x).unwrap());
    };
    assert_graphvis_snapshots!(df);
    df.run_available();

    let seen: Vec<_> = hydroflow::util::collect_ready_async(out_recv).await;
    rassert_eq!(
        &[0],
        &*seen,
        "Only one tick should have run, actually ran {}",
        seen.len()
    )?;

    Ok(())
}

#[multiplatform_test(hydroflow, env_tracing)]
async fn test_persist_stratum_run_async() -> Result<(), Box<dyn Error>> {
    let (out_send, out_recv) = hydroflow::util::unbounded_channel();

    let mut df = hydroflow_syntax! {
        source_iter([0])
            -> persist::<'static>()
            -> next_stratum()
            -> for_each(|x| out_send.send(x).unwrap());
    };
    assert_graphvis_snapshots!(df);

    timeout(Duration::from_millis(200), df.run_async())
        .await
        .expect_err("Expected time out");

    let seen: Vec<_> = hydroflow::util::collect_ready_async(out_recv).await;
    rassert_eq!(
        &[0],
        &*seen,
        "Only one tick should have run, actually ran {}",
        seen.len()
    )?;

    Ok(())
}

#[multiplatform_test(test, wasm, env_tracing)]
pub fn test_issue_800_1050_persist() {
    let mut df = hydroflow_syntax! {
        in1 = source_iter(0..10) -> map(|i| (i, i));
        in1 -> persist::<'static>() -> my_union_tee;

        my_union_tee = union() -> tee();
        my_union_tee -> filter(|_| false) -> my_union_tee;
        my_union_tee -> for_each(|x| println!("A {} {} {:?}", context.current_tick(), context.current_stratum(), x));
    };
    assert_graphvis_snapshots!(df);
    df.run_available();
}

#[multiplatform_test(test, wasm, env_tracing)]
pub fn test_issue_800_1050_fold_keyed() {
    let mut df = hydroflow_syntax! {
        in1 = source_iter(0..10) -> map(|i| (i, i));
        in1 -> fold_keyed::<'static>(Vec::new, Vec::push) -> my_union_tee;

        my_union_tee = union() -> tee();
        my_union_tee -> filter(|_| false) -> my_union_tee;
        my_union_tee -> for_each(|x| println!("A {} {} {:?}", context.current_tick(), context.current_stratum(), x));
    };
    assert_graphvis_snapshots!(df);
    df.run_available();
}

#[multiplatform_test(test, wasm, env_tracing)]
pub fn test_issue_800_1050_reduce_keyed() {
    let mut df = hydroflow_syntax! {
        in1 = source_iter(0..10) -> map(|i| (i, i));
        in1 -> reduce_keyed::<'static>(std::ops::AddAssign::add_assign) -> my_union_tee;

        my_union_tee = union() -> tee();
        my_union_tee -> filter(|_| false) -> my_union_tee;
        my_union_tee -> for_each(|x| println!("A {} {} {:?}", context.current_tick(), context.current_stratum(), x));
    };
    assert_graphvis_snapshots!(df);
    df.run_available();
}

#[multiplatform_test(hydroflow, env_tracing)]
async fn test_nospin_issue_961() {
    let mut df = hydroflow_syntax! {
        source_iter([1])
            -> next_stratum()
            -> persist::<'static>()
            -> defer_tick_lazy()
            -> null();
    };
    assert_graphvis_snapshots!(df);

    timeout(Duration::from_millis(100), df.run_available_async())
        .await
        .expect("Should not spin.");
}

#[multiplatform_test(hydroflow, env_tracing)]
async fn test_nospin_issue_961_complicated() {
    let mut df = hydroflow_syntax! {
        source_iter([1]) -> items;
        items = union();

        double = items
            -> persist::<'static>()
            -> fold(|| 0, |accum, x| *accum += x)
            -> defer_tick_lazy()
            -> filter(|_| false)
            -> tee();

        double -> null();

        double -> items;
    };
    assert_graphvis_snapshots!(df);

    timeout(Duration::from_millis(100), df.run_available_async())
        .await
        .expect("Should not spin.");
}