async-pipes 0.3.1

A library for building concurrent data processing pipelines
Documentation
use std::sync::atomic::Ordering::{Acquire, Release, SeqCst};
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::Arc;

use async_pipes::{branch, Pipeline, WorkerOptions};

#[tokio::test]
async fn test_stage_producer() {
    let written = Arc::new(AtomicBool::new(false));
    let task_written = written.clone();

    Pipeline::builder()
        .with_producer("pipe", move || {
            let written = task_written.clone();
            async move {
                if !written.load(Acquire) {
                    written.store(true, Release);
                    Some("hello!".to_string())
                } else {
                    None
                }
            }
        })
        .with_consumer(
            "pipe",
            WorkerOptions::default_single_task(),
            |value: String| async move {
                assert_eq!(value, "hello!".to_string());
            },
        )
        .build()
        .unwrap()
        .wait()
        .await;

    assert!(written.load(Acquire), "value was not handled by worker!");
}

#[tokio::test]
async fn test_stage_branching_producer() {
    let value_written = Arc::new(AtomicBool::new(false));
    let task_value_written = value_written.clone();
    let double_written = Arc::new(AtomicBool::new(false));
    let task_double_written = double_written.clone();

    let value_sum = Arc::new(AtomicUsize::new(0));
    let task_value_sum = value_sum.clone();
    let double_sum = Arc::new(AtomicUsize::new(0));
    let task_double_sum = double_sum.clone();

    let count = Arc::new(AtomicUsize::new(0));

    Pipeline::builder()
        .with_branching_producer(vec!["Value", "Doubled"], move || {
            let count = count.clone();
            async move {
                let c = count.load(Acquire);
                if c < 10 {
                    count.fetch_add(1, SeqCst);
                    Some(branch![c, c * 2])
                } else {
                    None
                }
            }
        })
        .with_consumer(
            "Value",
            WorkerOptions::default_single_task(),
            move |value: usize| {
                let written = task_value_written.clone();
                let sum = task_value_sum.clone();
                async move {
                    written.store(true, Release);
                    sum.fetch_add(value, SeqCst);
                }
            },
        )
        .with_consumer(
            "Doubled",
            WorkerOptions::default_single_task(),
            move |value: usize| {
                let written = task_double_written.clone();
                let sum = task_double_sum.clone();
                async move {
                    written.store(true, Release);
                    sum.fetch_add(value, SeqCst);
                }
            },
        )
        .build()
        .unwrap()
        .wait()
        .await;

    assert!(
        value_written.load(Acquire),
        "value was not handled by worker!"
    );
    assert!(
        double_written.load(Acquire),
        "value was not handled by worker!"
    );

    assert_eq!(value_sum.load(Acquire), 45);
    assert_eq!(double_sum.load(Acquire), 90);
}

#[tokio::test]
async fn test_stage_single_output() {
    let written = Arc::new(AtomicBool::new(false));
    let task_written = written.clone();

    Pipeline::builder()
        .with_inputs("first", vec![1usize])
        .with_stage(
            "first",
            "second",
            WorkerOptions::default_single_task(),
            |value: usize| async move { Some(value + 1) },
        )
        .with_consumer(
            "second",
            WorkerOptions::default_single_task(),
            move |value: usize| {
                let written = task_written.clone();
                async move {
                    assert_eq!(value, 2);
                    written.store(true, Release);
                }
            },
        )
        .build()
        .unwrap()
        .wait()
        .await;

    assert!(written.load(Acquire), "value was not handled by worker!");
}

#[tokio::test]
async fn test_stage_flattener() {
    let sum = Arc::new(AtomicUsize::new(0));
    let task_sum = sum.clone();

    Pipeline::builder()
        .with_inputs("first", vec![vec![1usize, 2usize, 3usize]])
        .with_flattener::<Vec<usize>>("first", "second")
        .with_consumer(
            "second",
            WorkerOptions::default_single_task(),
            move |value: usize| {
                let sum = task_sum.clone();
                async move {
                    sum.fetch_add(value, SeqCst);
                }
            },
        )
        .build()
        .unwrap()
        .wait()
        .await;

    assert_eq!(sum.load(Acquire), 6);
}

#[tokio::test]
#[should_panic]
async fn test_stage_propagates_task_panic() {
    Pipeline::builder()
        .with_inputs("pipe", vec![()])
        .with_consumer(
            "pipe",
            WorkerOptions::default_single_task(),
            |_: ()| async move { panic!("AHH!") },
        )
        .build()
        .unwrap()
        .wait()
        .await;
}