use taskflow_rs::{AsyncExecutor, Taskflow};
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[tokio::main]
async fn main() {
println!("=== Async Tasks Demo ===\n");
demo_basic_async().await;
println!();
demo_mixed_sync_async().await;
println!();
demo_async_http_requests().await;
println!();
demo_async_pipeline().await;
}
async fn demo_basic_async() {
println!("1. Basic Async Tasks");
println!(" Running async tasks with tokio\n");
let executor = AsyncExecutor::new(4);
let mut taskflow = Taskflow::new();
let task_a = taskflow.emplace_async(|| async {
println!(" [Task A] Starting async work...");
tokio::time::sleep(Duration::from_millis(100)).await;
println!(" [Task A] Completed!");
}).name("task_a");
let task_b = taskflow.emplace_async(|| async {
println!(" [Task B] Starting async work...");
tokio::time::sleep(Duration::from_millis(50)).await;
println!(" [Task B] Completed!");
}).name("task_b");
let task_c = taskflow.emplace_async(|| async {
println!(" [Task C] Starting async work...");
tokio::time::sleep(Duration::from_millis(75)).await;
println!(" [Task C] Completed!");
}).name("task_c");
task_a.precede(&task_b);
task_a.precede(&task_c);
executor.run_async(&taskflow).await;
println!(" ✓ All async tasks completed");
}
async fn demo_mixed_sync_async() {
println!("2. Mixed Sync and Async Tasks");
println!(" Combining synchronous and asynchronous tasks\n");
let executor = AsyncExecutor::new(4);
let mut taskflow = Taskflow::new();
let counter = Arc::new(Mutex::new(0));
let counter_clone = Arc::clone(&counter);
let sync_task = taskflow.emplace(move || {
println!(" [Sync] Processing synchronously");
*counter_clone.lock().unwrap() += 1;
println!(" [Sync] Counter = {}", counter_clone.lock().unwrap());
}).name("sync_task");
let counter_clone = Arc::clone(&counter);
let async_task = taskflow.emplace_async(move || async move {
println!(" [Async] Starting async work");
tokio::time::sleep(Duration::from_millis(50)).await;
*counter_clone.lock().unwrap() += 10;
println!(" [Async] Counter = {}", counter_clone.lock().unwrap());
}).name("async_task");
let counter_clone = Arc::clone(&counter);
let final_sync = taskflow.emplace(move || {
println!(" [Final] Final counter = {}", counter_clone.lock().unwrap());
}).name("final");
sync_task.precede(&async_task);
async_task.precede(&final_sync);
executor.run_async(&taskflow).await;
println!(" ✓ Mixed workflow completed");
}
async fn demo_async_http_requests() {
println!("3. Simulated Async HTTP Requests");
println!(" Parallel async I/O operations\n");
let executor = AsyncExecutor::new(4);
let mut taskflow = Taskflow::new();
let results = Arc::new(Mutex::new(Vec::new()));
for i in 0..5 {
let results = Arc::clone(&results);
taskflow.emplace_async(move || async move {
println!(" [Request {}] Fetching data...", i);
tokio::time::sleep(Duration::from_millis(50 + i * 10)).await;
let data = format!("data_{}", i);
results.lock().unwrap().push(data.clone());
println!(" [Request {}] Received: {}", i, data);
}).name(&format!("request_{}", i));
}
executor.run_async(&taskflow).await;
let results = results.lock().unwrap();
println!(" ✓ Received {} responses: {:?}", results.len(), *results);
}
async fn demo_async_pipeline() {
println!("4. Async Data Pipeline");
println!(" Multi-stage async processing\n");
let executor = AsyncExecutor::new(4);
let mut taskflow = Taskflow::new();
let data = Arc::new(Mutex::new(Vec::new()));
let data_clone = Arc::clone(&data);
let fetch = taskflow.emplace_async(move || async move {
println!(" [Fetch] Fetching raw data...");
tokio::time::sleep(Duration::from_millis(100)).await;
data_clone.lock().unwrap().extend_from_slice(&[1, 2, 3, 4, 5]);
println!(" [Fetch] Fetched {} items", data_clone.lock().unwrap().len());
}).name("fetch");
let data_clone = Arc::clone(&data);
let transform = taskflow.emplace_async(move || async move {
println!(" [Transform] Transforming data...");
tokio::time::sleep(Duration::from_millis(50)).await;
let mut d = data_clone.lock().unwrap();
for item in d.iter_mut() {
*item *= 2;
}
println!(" [Transform] Transformed {} items", d.len());
}).name("transform");
let data_clone = Arc::clone(&data);
let filter = taskflow.emplace_async(move || async move {
println!(" [Filter] Filtering data...");
tokio::time::sleep(Duration::from_millis(30)).await;
let mut d = data_clone.lock().unwrap();
d.retain(|&x| x > 5);
println!(" [Filter] Retained {} items", d.len());
}).name("filter");
let data_clone = Arc::clone(&data);
let save = taskflow.emplace_async(move || async move {
println!(" [Save] Saving results...");
tokio::time::sleep(Duration::from_millis(40)).await;
let d = data_clone.lock().unwrap();
println!(" [Save] Final data: {:?}", *d);
}).name("save");
fetch.precede(&transform);
transform.precede(&filter);
filter.precede(&save);
executor.run_async(&taskflow).await;
println!(" ✓ Pipeline completed");
}