hydroflow 0.10.0

Hydro's low-level dataflow runtime and IR
Documentation
use std::cell::Cell;
use std::rc::Rc;

use hydroflow::{assert_graphvis_snapshots, hydroflow_syntax};
use hydroflow_macro::hydroflow_test;
use multiplatform_test::multiplatform_test;
use web_time::Duration;

#[multiplatform_test]
pub fn test_context_ref() {
    let mut df = hydroflow_syntax! {
        source_iter([()])
            -> for_each(|()| println!("Current tick: {}, stratum: {}", context.current_tick(), context.current_stratum()));
    };
    assert_graphvis_snapshots!(df);
    df.run_available();
}

#[multiplatform_test]
pub fn test_context_mut() {
    // TODO(mingwei): Currently cannot have conflicting (mut) references to `context` in the same
    // subgraph - bit of a leak of the subgraphs abstraction. `next_stratum()` here so it runs.
    let mut df = hydroflow_syntax! {
        source_iter(0..10)
            -> map(|n| context.add_state(n))
            -> next_stratum()
            -> for_each(|handle| println!("{:?}: {}", handle, context.state_ref(handle)));
    };
    assert_graphvis_snapshots!(df);
    df.run_available();
}

#[multiplatform_test(hydroflow)]
pub async fn test_context_current_tick_start_does_not_count_time_between_ticks_async() {
    let time = Rc::new(Cell::new(None));

    let mut df = {
        let time = time.clone();
        hydroflow_syntax! {
            source_iter([()])
                -> persist::<'static>()
                -> for_each(|_| time.set(Some(context.current_tick_start().elapsed().unwrap())));
        }
    };
    assert_graphvis_snapshots!(df);
    tokio::time::sleep(Duration::from_millis(100)).await;
    df.run_tick();
    assert!(time.take().unwrap() < Duration::from_millis(50));

    tokio::time::sleep(Duration::from_millis(100)).await;
    df.run_tick();
    assert!(time.take().unwrap() < Duration::from_millis(50));

    tokio::time::sleep(Duration::from_millis(100)).await;
    df.run_available();
    assert!(time.take().unwrap() < Duration::from_millis(50));

    tokio::time::sleep(Duration::from_millis(100)).await;
    df.run_available_async().await;
    assert!(time.take().unwrap() < Duration::from_millis(50));
}

#[hydroflow_test]
pub async fn test_defered_tick_and_no_io_with_run_async() {
    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

    let mut df = hydroflow_syntax! {
        source_iter([()])
            -> defer_tick()
            -> for_each(|_| tx.send(()).unwrap());
    };

    tokio::select! {
        _ = df.run_async() => {},
        _ = rx.recv() => {},
    }
}