use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use dag_executor::advanced::RetryPolicy;
use dag_executor::context::Context;
use dag_executor::prelude::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut dag = Dag::new();
let attempts = Arc::new(AtomicUsize::new(0));
let a = attempts.clone();
dag.add_task(Arc::new(BasicTask::new(
"flaky",
move |_ctx: Arc<Context>| {
let a = a.clone();
async move {
let n = a.fetch_add(1, Ordering::SeqCst);
if n < 2 {
println!("flaky attempt {n}: failing");
Err(TaskError::execution("transient error"))
} else {
println!("flaky attempt {n}: success");
Ok(serde_json::json!("ok"))
}
}
},
)))?;
dag.add_task(Arc::new(BasicTask::new(
"always_fails",
|_ctx: Arc<Context>| async { Err(TaskError::execution("permanent error")) },
)))?;
dag.add_task(Arc::new(
BasicTask::new("downstream", |_ctx: Arc<Context>| async {
Ok(serde_json::Value::Null)
})
.with_deps(["always_fails"]),
))?;
let executor = DagExecutor::builder()
.persist(false)
.retry(RetryPolicy::exponential(4, Duration::from_millis(20)))
.build();
let report = executor.run(dag).await?;
println!("\n--- report ---");
for (id, rec) in &report.records {
println!("{id:>14}: {} (attempts: {})", rec.state, rec.attempts);
}
println!("retries: {}", report.metrics.retries);
println!("\ndead-letter queue:");
for entry in executor.dead_letter().list().await? {
println!(" {} -> {}", entry.task_id, entry.error);
}
Ok(())
}