futures-dagtask 0.3.0

DAG-based Task Queue
Documentation
use std::sync::{ Arc, Mutex };
use futures::future;
use futures::prelude::*;
use futures::executor::LocalPool;
use futures_dagtask::TaskGraph;


#[test]
fn test_simple_dependent() {
    let fut = async {
        let mut graph = TaskGraph::new();
        let zero = graph.add_task(&[], future::ready::<u32>(0)).unwrap();
        let one = graph.add_task(&[zero], future::ready(1)).unwrap();
        let (add, exec) = graph.execute();
        let _two = add.add_task(&[one], future::ready(2)).await.unwrap();

        exec.take(3)
            .map(|(_, n)| n)
            .collect::<Vec<_>>().await
    };

    let output = LocalPool::new()
        .run_until(fut);

    assert_eq!(output, vec![0, 1, 2]);
}

#[test]
fn test_concurrent_dependent() {
    let fut = async {
        let mut graph = TaskGraph::new();
        let zero0 = graph.add_task(&[], future::ready::<(u32, u32)>((0, 0))).unwrap();
        let zero1 = graph.add_task(&[], future::ready((0, 1))).unwrap();
        let one0 = graph.add_task(&[zero0], future::ready((1, 0))).unwrap();
        let _one1 = graph.add_task(&[zero0], future::ready((1, 1))).unwrap();
        let (add, exec) = graph.execute();
        let _two0 = add.add_task(
            &[zero1, one0],
            future::ready((2, 0))
        ).await.unwrap();

        exec.take(5)
            .map(|(_, n)| n)
            .collect::<Vec<_>>().await
    };

    let output = LocalPool::new()
        .run_until(fut);

    let zero0 = find(&output, (0, 0));
    let zero1 = find(&output, (0, 1));
    let one0 = find(&output, (1, 0));
    let one1 = find(&output, (1, 1));
    let two0 = find(&output, (2, 0));

    assert!(zero0 < one0 && zero1 < two0 && one0 < two0);
    assert!(zero0 < one1);
}

#[test]
fn test_concurrent_dependent2() {
    let fut = async {
        let mut graph = TaskGraph::new();
        let zero0 = graph.add_task(
            &[],
            future::ready::<(u32, u32)>((0, 0)).boxed()
        ).unwrap();
        let (add, exec) = graph.execute();
        let add = Arc::new(add);
        let add2 = add.clone();

        let _zero1 = add.add_task(
            &[],
            (async move {
                let _one0 = add2.add_task(
                    &[zero0],
                    future::ready((1u32, 0u32)).boxed()
                ).await.unwrap();
                future::ready((0u32, 1u32)).await
            }).boxed()
        ).await.unwrap();

        exec.take(3)
            .map(|(_, n)| n)
            .collect::<Vec<_>>().await
    };

    let output = LocalPool::new()
        .run_until(fut);

    let zero0 = find(&output, (0, 0));
    let zero1 = find(&output, (0, 1));
    let one0 = find(&output, (1, 0));

    assert!(zero0 < one0 && zero1 < one0);
}

#[test]
fn test_cancel() {
    let fut = async {
        let mut graph = TaskGraph::new();
        let zero = graph.add_task(&[], future::ready::<u32>(0)).unwrap();
        let one = graph.add_task(&[zero], future::ready(1)).unwrap();
        let (add, exec) = graph.execute();
        let _two = add.add_task(&[one], future::ready(2)).await.unwrap();

        drop(add);
        exec.map(|(_, n)| n).collect::<Vec<_>>().await
    };

    let output = LocalPool::new()
        .run_until(fut);

    assert_eq!(output, vec![0, 1, 2]);
}

#[test]
fn test_abort() {
    let fut = async {
        let mut graph = TaskGraph::new();
        let zero = graph.add_task(
            &[],
            future::ready::<u32>(0).boxed()
        ).unwrap();
        let (add, exec) = graph.execute();
        let add = Arc::new(Mutex::new(Some(add)));
        let add2 = add.clone();

        add.lock().unwrap().as_ref().unwrap().add_task(
            &[zero],
            (async move {
                add2.lock().unwrap().take().unwrap().abort();
                future::ready(1u32).await
            }).boxed()
        ).await.unwrap();

        exec.map(|(_, n)| n).collect::<Vec<_>>().await
    };

    let output = LocalPool::new()
        .run_until(fut);

    assert_eq!(output, vec![0u32, 1]);
}

#[test]
fn test_custom_index() {
    let fut = async {
        let mut graph = TaskGraph::<future::Ready<u32>, u64>::default();
        let zero = graph.add_task(&[], future::ready(0)).unwrap();
        let one = graph.add_task(&[zero], future::ready(1)).unwrap();
        let (add, exec) = graph.execute();
        let _two = add.add_task(&[one], future::ready(2)).await.unwrap();

        drop(add);
        exec.map(|(_, n)| n).collect::<Vec<_>>().await
    };

    let output = LocalPool::new()
        .run_until(fut);
    assert_eq!(output, vec![0, 1, 2]);
}

fn find(output: &[(u32, u32)], target: (u32, u32)) -> usize {
    output.iter()
        .enumerate()
        .find_map(|(i, &n)| if n == target {
            Some(i)
        } else {
            None
        })
        .unwrap()
}