use std::sync::Arc;
use dag_executor::context::Context;
use dag_executor::prelude::*;
use dag_executor::storage::{FileStorage, Storage};
use dag_executor::tasks::{StatefulTask, StepResult};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dir = tempfile::tempdir()?;
let storage: Arc<dyn Storage> = Arc::new(FileStorage::open(dir.path())?);
let mut dag = Dag::new();
dag.add_task(Arc::new(StatefulTask::new(
"accumulate",
storage.clone(),
|_ctx: Arc<Context>, prev| async move {
let count = prev.get("count").and_then(|v| v.as_u64()).unwrap_or(0) + 1;
let state = serde_json::json!({ "count": count });
println!("checkpoint -> count = {count}");
Ok(StepResult::new(state.clone(), state))
},
)))?;
let executor = DagExecutor::builder()
.storage(storage.clone())
.persist(true)
.build();
for _ in 0..3 {
let mut dag2 = Dag::new();
dag2.add_task(Arc::new(StatefulTask::new(
"accumulate",
storage.clone(),
|_ctx: Arc<Context>, prev| async move {
let count = prev.get("count").and_then(|v| v.as_u64()).unwrap_or(0) + 1;
let state = serde_json::json!({ "count": count });
Ok(StepResult::new(state.clone(), state))
},
)))?;
executor.run(dag2).await?;
}
let _ = dag;
let final_state = storage.load("state:accumulate").await?;
println!("final persisted state: {final_state:?}");
Ok(())
}