dfir_rs 0.16.0

DFIR runtime for Rust, used by Hydro.
Documentation
use std::collections::HashSet;

use dfir_rs::dfir_syntax;
use dfir_rs::util::collect_ready_async;
use multiplatform_test::multiplatform_test;
use tokio::time::{Duration, sleep};

#[multiplatform_test(dfir, env_tracing)]
async fn single_batch_test() {
    let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::<u32>();

    let mut df = dfir_syntax! {
        source_iter(0..10)
        -> map(|x| async move {
            sleep(Duration::from_millis(100)).await;
            x
        })
        -> resolve_futures_ordered()
        -> for_each(|x| result_send.send(x).unwrap());
    };

    let handle = tokio::task::spawn(async move {
        sleep(Duration::from_secs(1)).await;
        assert_eq!(
            Vec::from_iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
            collect_ready_async::<Vec<_>, _>(&mut result_recv).await
        );
    });

    tokio::time::timeout(Duration::from_secs(2), df.run())
        .await
        .expect_err("Expected time out");

    handle.await.unwrap();
}

#[multiplatform_test(dfir, env_tracing)]
async fn multi_batch_test() {
    let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::<u64>();

    let mut df = dfir_syntax! {
        source_iter([2, 3, 1, 9, 6, 5, 4, 7, 8])
        -> map(|x| async move {
            sleep(Duration::from_millis(10*x)).await;
            x
        })
        -> resolve_futures_ordered()
        -> for_each(|x| result_send.send(x).unwrap());
    };

    let handle = tokio::task::spawn(async move {
        sleep(Duration::from_secs(1)).await;
        assert_eq!(
            Vec::from_iter([2, 3, 1, 9, 6, 5, 4, 7, 8]),
            collect_ready_async::<Vec<_>, _>(&mut result_recv).await
        );
    });

    tokio::time::timeout(Duration::from_secs(2), df.run())
        .await
        .expect_err("Expected time out");

    handle.await.unwrap();
}

#[multiplatform_test(dfir, env_tracing)]
async fn pusherator_test() {
    let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::<u64>();

    let mut df = dfir_syntax! {
        ins = source_iter([2, 3, 1, 9, 6, 5, 4, 7, 8])
            -> tee();

        ins -> for_each(|_| {});
        ins -> map(|x| async move {
            sleep(Duration::from_millis(10*x)).await;
            x
        }) -> resolve_futures_ordered() -> for_each(|x| result_send.send(x).unwrap());
    };

    let handle = tokio::task::spawn(async move {
        sleep(Duration::from_secs(1)).await;
        assert_eq!(
            HashSet::from_iter([2, 3, 1, 9, 6, 5, 4, 7, 8]),
            collect_ready_async::<HashSet<_>, _>(&mut result_recv).await
        );
    });

    tokio::time::timeout(Duration::from_secs(2), df.run())
        .await
        .expect_err("Expected time out");

    handle.await.unwrap();
}