use operese_dagx_test::task_fn;
use operese_dagx::*;
#[tokio::test]
async fn test_empty_dag() {
let dag = DagRunner::new();
let _ = dag
.run(|fut| async move { tokio::spawn(fut).await.unwrap() })
.await
.unwrap(); }
#[tokio::test]
async fn test_single_task() {
let mut dag = DagRunner::new();
let task = dag.add_task(task_fn::<(), _, _>(|_: ()| 42));
let mut output = dag
.run(|fut| async move { tokio::spawn(fut).await.unwrap() })
.await
.unwrap();
assert_eq!(output.get(task), 42);
}
#[tokio::test]
async fn test_wide_parallel() {
let mut dag = DagRunner::new();
let tasks: Vec<_> = (0..10)
.map(|i| dag.add_task(task_fn::<(), _, _>(move |_: ()| i * 2)))
.collect();
let mut output = dag
.run(|fut| async move { tokio::spawn(fut).await.unwrap() })
.await
.unwrap();
for (i, task) in tasks.into_iter().enumerate() {
assert_eq!(output.get(task), i * 2);
}
}
#[tokio::test]
async fn test_diamond_dependency() {
let mut dag = DagRunner::new();
let a = dag.add_task(task_fn::<(), _, _>(|_: ()| 10));
let b = dag
.add_task(task_fn::<i32, _, _>(|&x: &i32| x + 5))
.depends_on(&a);
let c = dag
.add_task(task_fn::<i32, _, _>(|&x: &i32| x * 2))
.depends_on(&a);
let d = dag
.add_task(task_fn::<(i32, i32), _, _>(|(x, y): (&i32, &i32)| x + y))
.depends_on((&b, &c));
let mut output = dag
.run(|fut| async move { tokio::spawn(fut).await.unwrap() })
.await
.unwrap();
assert_eq!(output.get(d), 35); }
#[derive(Clone)]
struct User {
name: String,
age: u32,
}
#[tokio::test]
async fn test_different_output_types() {
let mut dag = DagRunner::new();
let string_task = dag.add_task(task_fn::<(), _, _>(|_: ()| "hello".to_string()));
let vec_task = dag.add_task(task_fn::<(), _, _>(|_: ()| vec![1, 2, 3]));
let struct_task = dag.add_task(task_fn::<(), _, _>(|_: ()| User {
name: "Alice".to_string(),
age: 30,
}));
let mut output = dag
.run(|fut| async move { tokio::spawn(fut).await.unwrap() })
.await
.unwrap();
assert_eq!(&output.get(string_task), "hello");
assert_eq!(output.get(vec_task), vec![1, 2, 3]);
let user = output.get(struct_task);
assert_eq!(user.name, "Alice");
assert_eq!(user.age, 30);
}
#[tokio::test]
async fn test_multiple_sinks() {
let mut dag = DagRunner::new();
let source = dag.add_task(task_fn::<(), _, _>(|_: ()| 10));
let branch1 = dag
.add_task(task_fn::<i32, _, _>(|&x: &i32| x * 2))
.depends_on(&source);
let sink1 = dag
.add_task(task_fn::<i32, _, _>(|&x: &i32| x + 1))
.depends_on(&branch1);
let branch2 = dag
.add_task(task_fn::<i32, _, _>(|&x: &i32| x + 5))
.depends_on(&source);
let sink2 = dag
.add_task(task_fn::<i32, _, _>(|&x: &i32| x * 3))
.depends_on(&branch2);
let mut output = dag
.run(|fut| async move { tokio::spawn(fut).await.unwrap() })
.await
.unwrap();
assert_eq!(output.get(sink1), 21); assert_eq!(output.get(sink2), 45); }
#[tokio::test]
async fn test_single_task_inline_path() {
let mut dag = DagRunner::new();
let t1 = dag.add_task(task_fn::<(), _, _>(|_: ()| 10));
let t2 = dag
.add_task(task_fn::<i32, _, _>(|&x: &i32| x * 2))
.depends_on(&t1);
let t3 = dag
.add_task(task_fn::<i32, _, _>(|&x: &i32| x + 5))
.depends_on(&t2);
let mut output = dag
.run(|fut| async move { tokio::spawn(fut).await.unwrap() })
.await
.unwrap();
assert_eq!(output.get(t3), 25); }
#[tokio::test]
async fn test_multi_consumer_fanout() {
let mut dag = DagRunner::new();
let producer = dag.add_task(task_fn::<(), _, _>(|_: ()| 42));
let c1 = dag
.add_task(task_fn::<i32, _, _>(|&x: &i32| x * 2))
.depends_on(&producer);
let c2 = dag
.add_task(task_fn::<i32, _, _>(|&x: &i32| x + 10))
.depends_on(&producer);
let c3 = dag
.add_task(task_fn::<i32, _, _>(|&x: &i32| x - 5))
.depends_on(&producer);
let mut output = dag
.run(|fut| async move { tokio::spawn(fut).await.unwrap() })
.await
.unwrap();
assert_eq!(output.get(c1), 84);
assert_eq!(output.get(c2), 52);
assert_eq!(output.get(c3), 37);
}