durable_dag/
durable_dag.rs1use dagx::{task, DagRunner, Task};
2use duraflow_rs::{Context, DurableDag, MemoryStore};
3use std::sync::atomic::AtomicUsize;
4use std::sync::Arc;
5
6struct LoadValue(i32);
8#[task]
9impl LoadValue {
10 async fn run(&self) -> i32 {
11 self.0
12 }
13}
14
15struct Add;
16#[task]
17impl Add {
18 async fn run(a: &i32, b: &i32) -> i32 {
19 a + b
20 }
21}
22
23struct Multiply;
24#[task]
25impl Multiply {
26 async fn run(a: &i32, b: &i32, c: &i32) -> i32 {
27 a * b * c
28 }
29}
30
31#[tokio::main]
32async fn main() {
33 let dag = DagRunner::new();
34 let db: Arc<dyn duraflow_rs::Storage + Send + Sync> = Arc::new(MemoryStore::new());
35 let ctx = Arc::new(Context {
36 db,
37 completed_count: Arc::new(AtomicUsize::new(0)),
38 });
39
40 let d_dag = DurableDag::new(&dag, ctx.clone()).with_progress(|id, completed| {
41 println!("progress: task={} completed_count={}", id, completed);
42 });
43
44 let val1 = d_dag.add("v1", LoadValue(10));
46 let val2 = d_dag.add("v2", LoadValue(20));
47 let val3 = d_dag.add("v3", LoadValue(2));
48
49 let sum_builder = d_dag.add("sum_1_2", Add);
50 let one = d_dag.add("v4", LoadValue(1));
51 let product = d_dag
52 .add("final_prod", Multiply)
53 .depends_on((&sum_builder, &val3, &one));
54 let _sum = sum_builder.depends_on((&val1, &val2));
55
56 dag.run(|fut| {
57 tokio::spawn(fut);
58 })
59 .await
60 .unwrap();
61
62 println!(
63 "Completed tasks: {}",
64 ctx.completed_count
65 .load(std::sync::atomic::Ordering::SeqCst)
66 );
67 println!("Final Result: {}", dag.get(product).unwrap());
68
69 println!("\nRestarting DAG...");
71 let dag2 = DagRunner::new();
72 let d_dag2 = DurableDag::new(&dag2, ctx.clone()).with_progress(|id, completed| {
73 println!(
74 "(resumed) progress: task={} completed_count={}",
75 id, completed
76 );
77 });
78
79 let v1 = d_dag2.add("v1", LoadValue(10));
80 let v2 = d_dag2.add("v2", LoadValue(20));
81 let s = d_dag2.add("sum_1_2", Add).depends_on((&v1, &v2));
82
83 dag2.run(|fut| {
84 tokio::spawn(fut);
85 })
86 .await
87 .unwrap();
88 println!("Retrieved cached sum: {}", dag2.get(s).unwrap());
89}