use operese_dagx::{task, DagResult, DagRunner};
use std::time::{Duration, Instant};
use tokio::time::sleep;
struct ComputeSum {
start: i32,
end: i32,
work_delay_ms: u64, }
impl ComputeSum {
fn new(start: i32, end: i32, work_delay_ms: u64) -> Self {
Self {
start,
end,
work_delay_ms,
}
}
}
#[task]
impl ComputeSum {
async fn run(&mut self) -> i32 {
sleep(Duration::from_millis(self.work_delay_ms)).await;
(self.start..self.end).sum()
}
}
struct Aggregate;
#[task]
impl Aggregate {
async fn run(a: &i32, b: &i32, c: &i32, d: &i32) -> i32 {
a + b + c + d
}
}
#[tokio::main]
async fn main() -> DagResult<()> {
println!("=== Proving True Parallelism ===\n");
const WORK_DELAY: u64 = 100;
println!("Running PARALLEL computation (4 workers on separate threads)...");
let (parallel_time, parallel_result) = {
let start = Instant::now();
let mut dag = DagRunner::new();
let sum1 = dag.add_task(ComputeSum::new(1, 251, WORK_DELAY));
let sum2 = dag.add_task(ComputeSum::new(251, 501, WORK_DELAY));
let sum3 = dag.add_task(ComputeSum::new(501, 751, WORK_DELAY));
let sum4 = dag.add_task(ComputeSum::new(751, 1001, WORK_DELAY));
let total = dag
.add_task(Aggregate)
.depends_on((&sum1, &sum2, &sum3, &sum4));
let mut output = dag
.run(|fut| async move { tokio::spawn(fut).await.unwrap() })
.await?;
let result = output.get(total);
let elapsed = start.elapsed();
println!(
"Parallel result: {} (computed in {}ms)\n",
result,
elapsed.as_millis()
);
assert_eq!(result, 500500);
(elapsed, result)
};
println!("Running SEQUENTIAL computation (same work, one at a time)...");
let (sequential_time, sequential_result) = {
let start = Instant::now();
let mut task1 = ComputeSum::new(1, 251, WORK_DELAY);
let mut task2 = ComputeSum::new(251, 501, WORK_DELAY);
let mut task3 = ComputeSum::new(501, 751, WORK_DELAY);
let mut task4 = ComputeSum::new(751, 1001, WORK_DELAY);
let r1 = task1.run_impl().await;
let r2 = task2.run_impl().await;
let r3 = task3.run_impl().await;
let r4 = task4.run_impl().await;
let result = r1 + r2 + r3 + r4;
let elapsed = start.elapsed();
println!(
"Sequential result: {} (computed in {}ms)\n",
result,
elapsed.as_millis()
);
assert_eq!(result, 500500);
(elapsed, result)
};
assert_eq!(parallel_result, sequential_result);
let speedup = sequential_time.as_secs_f64() / parallel_time.as_secs_f64();
println!("Speedup: {:.1}x faster with parallel execution!", speedup);
println!("This proves dagx runs tasks on multiple CPU cores simultaneously.");
if speedup > 2.0 {
println!("\n✓ Parallel speedup confirmed - tasks ran on multiple threads!");
} else {
println!("\n⚠ Warning: Low speedup. Try running with --release flag for accurate timing.");
}
Ok(())
}