futures-dagtask 0.2.0

DAG-based Task Queue
Documentation
use std::sync::{ Arc, Mutex };
use futures::prelude::*;
use futures::future;
use futures_dagtask::TaskGraph;

macro_rules! unwrap_async {
    ( $e:expr ) => {
        match $e {
            Async::Ready(e) => e,
            Async::NotReady => panic!()
        }
    }
}


#[test]
fn test_simple_dependent() {
    let mut graph = TaskGraph::new();
    let zero = graph.add_task(&[], future::ok::<u32, u32>(0)).unwrap();
    let one = graph.add_task(&[zero], future::ok(1)).unwrap();
    let (add, exec) = graph.execute();
    let _two = unwrap_async!(add.add_task(&[one], future::ok(2))).unwrap();

    let output = exec.take(3)
        .wait()
        .filter_map(Result::ok)
        .map(|(_, n)| n)
        .collect::<Vec<_>>();
    assert_eq!(output, vec![0, 1, 2]);
}

#[test]
fn test_concurrent_dependent() {
    let mut graph = TaskGraph::new();
    let zero0 = graph.add_task(&[], future::ok::<(u32, u32), ()>((0, 0))).unwrap();
    let zero1 = graph.add_task(&[], future::ok((0, 1))).unwrap();
    let one0 = graph.add_task(&[zero0], future::ok((1, 0))).unwrap();
    let _one1 = graph.add_task(&[zero0], future::ok((1, 1))).unwrap();
    let (add, exec) = graph.execute();
    let _two0 = unwrap_async!(add.add_task(
        &[zero1, one0],
        future::ok((2, 0))
    )).unwrap();

    let output = exec.take(5)
        .wait()
        .filter_map(Result::ok)
        .map(|(_, n)| n)
        .collect::<Vec<_>>();

    let zero0 = find(&output, (0, 0));
    let zero1 = find(&output, (0, 1));
    let one0 = find(&output, (1, 0));
    let one1 = find(&output, (1, 1));
    let two0 = find(&output, (2, 0));

    assert!(zero0 < one0 && zero1 < two0 && one0 < two0);
    assert!(zero0 < one1);
}

#[test]
fn test_concurrent_dependent2() {
    let mut graph = TaskGraph::new();
    let zero0 = graph.add_task(&[], Box::new(future::ok::<(u32, u32), ()>((0, 0))) as Box<Future<Item=_, Error=_>>).unwrap();
    let (add, exec) = graph.execute();
    let add = Arc::new(add);
    let add2 = add.clone();
    let _zero1 = unwrap_async!(add.add_task(&[], Box::new(future::lazy(move || {
        let _one0 = unwrap_async!(add2.add_task(&[zero0], Box::new(future::ok((1u32, 0u32))) as Box<_>)).unwrap();
        future::ok((0u32, 1u32))
    })) as Box<_>)).unwrap();

    let output = exec
        .take(3)
        .wait()
        .filter_map(Result::ok)
        .map(|(_, n)| n)
        .collect::<Vec<_>>();

    let zero0 = find(&output, (0, 0));
    let zero1 = find(&output, (0, 1));
    let one0 = find(&output, (1, 0));

    assert!(zero0 < one0 && zero1 < one0);
}

#[test]
fn test_cancel() {
    let exec = {
        let mut graph = TaskGraph::new();
        let zero = graph.add_task(&[], future::ok::<u32, u32>(0)).unwrap();
        let one = graph.add_task(&[zero], future::ok(1)).unwrap();
        let (add, exec) = graph.execute();
        let _two = unwrap_async!(add.add_task(&[one], future::ok(2))).unwrap();
        exec
    };

    let output = exec
        .wait()
        .filter_map(Result::ok)
        .map(|(_, n)| n)
        .collect::<Vec<_>>();
    assert_eq!(output, vec![0, 1, 2]);
}

#[test]
fn test_abort() {
    let mut graph = TaskGraph::new();
    let zero = graph.add_task(&[], Box::new(future::ok::<u32, ()>(0)) as Box<Future<Item=_, Error=_>>).unwrap();
    let (add, exec) = graph.execute();
    let add = Arc::new(Mutex::new(Some(add)));
    let add2 = add.clone();

    {
        unwrap_async!(add.lock().unwrap().as_ref().unwrap().add_task(&[zero], Box::new(future::lazy(move || {
            add2.lock().unwrap().take().unwrap().abort();
            future::ok(1u32)
        })) as Box<_>)).unwrap();
    }

    let output = exec.wait()
        .filter_map(Result::ok)
        .map(|(_, n)| n)
        .collect::<Vec<_>>();

    assert_eq!(output, vec![0u32, 1]);
}

#[test]
fn test_custom_index() {
    let mut graph = TaskGraph::<future::FutureResult<u32, ()>, u64>::default();
    let zero = graph.add_task(&[], future::ok(0)).unwrap();
    let one = graph.add_task(&[zero], future::ok(1)).unwrap();
    let (add, exec) = graph.execute();
    let _two = unwrap_async!(add.add_task(&[one], future::ok(2))).unwrap();

    let output = exec.take(3)
        .wait()
        .filter_map(Result::ok)
        .map(|(_, n)| n)
        .collect::<Vec<_>>();
    assert_eq!(output, vec![0, 1, 2]);
}

fn find(output: &[(u32, u32)], target: (u32, u32)) -> usize {
    output.iter()
        .enumerate()
        .find_map(|(i, &n)| if n == target {
            Some(i)
        } else {
            None
        })
        .unwrap()
}