use std::sync::Arc;
use dag_executor::context::Context;
use dag_executor::prelude::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut dag = Dag::new();
dag.add_task(Arc::new(ConditionalTask::new(
"should_process",
"process",
"skip",
|_ctx| async { Ok(true) },
)))?;
dag.add_task(Arc::new(
LoopTask::new("poll", 5, |_ctx: Arc<Context>, i| async move {
println!("poll attempt {i}");
Ok(serde_json::json!({ "ready": i >= 2 }))
})
.with_break(|out| out.get("ready").and_then(|v| v.as_bool()).unwrap_or(false))
.with_deps(["should_process"]),
))?;
for task in patterns::fan_out_in(
"compute",
6,
Some("poll"),
|_ctx, i| async move { Ok(serde_json::json!(i * 10)) },
|_ctx, results| async move {
let total: u64 = results.iter().filter_map(|r| r.as_u64()).sum();
println!("aggregated total = {total}");
Ok(serde_json::json!(total))
},
) {
dag.add_task(task)?;
}
let executor = DagExecutor::builder().persist(false).build();
let report = executor.run(dag).await?;
println!("workflow success: {}", report.is_success());
Ok(())
}