use std::sync::Arc;
use dag_executor::context::Context;
use dag_executor::prelude::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut dag = Dag::new();
dag.add_task(Arc::new(BasicTask::new(
"extract",
|_ctx: Arc<Context>| async {
println!("extracting...");
Ok(serde_json::json!({ "rows": 100 }))
},
)))?;
dag.add_task(Arc::new(
BasicTask::new("transform", |ctx: Arc<Context>| async move {
let rows = ctx
.get("extract")
.and_then(|v| v.get("rows").and_then(|r| r.as_u64()))
.unwrap_or(0);
println!("transforming {rows} rows...");
Ok(serde_json::json!({ "rows": rows * 2 }))
})
.with_deps(["extract"]),
))?;
dag.add_task(Arc::new(
BasicTask::new("load", |ctx: Arc<Context>| async move {
let rows = ctx
.get("transform")
.and_then(|v| v.get("rows").and_then(|r| r.as_u64()))
.unwrap_or(0);
println!("loading {rows} rows...");
Ok(serde_json::Value::Null)
})
.with_deps(["transform"]),
))?;
let executor = DagExecutor::builder().persist(false).build();
let report = executor.run(dag).await?;
println!(
"done: success={} completed={}",
report.is_success(),
report.metrics.tasks_completed
);
Ok(())
}