basic_usage/
basic_usage.rs

1use std::time::Duration;
2use taskflow_rs::{TaskDefinition, TaskFlow, framework::TaskFlowConfig};
3use tracing_subscriber::fmt::init;
4
5#[tokio::main]
6async fn main() -> Result<(), Box<dyn std::error::Error>> {
7    init();
8
9    let config = TaskFlowConfig::with_in_memory();
10    let taskflow = TaskFlow::new(config).await?;
11
12    println!("TaskFlow framework started!");
13
14    let task_id = taskflow
15        .submit_http_task("fetch_example", "https://httpbin.org/get", Some("GET"))
16        .await?;
17
18    println!("Submitted HTTP task: {}", task_id);
19
20    let shell_task_id = taskflow
21        .submit_shell_task("list_files", "ls", vec!["-la"])
22        .await?;
23
24    println!("Submitted shell task: {}", shell_task_id);
25
26    let dependent_task = TaskDefinition::new("dependent_task", "shell_command")
27        .with_payload("command", serde_json::Value::String("echo".to_string()))
28        .with_payload(
29            "args",
30            serde_json::Value::Array(vec![serde_json::Value::String(
31                "This task depends on the shell task".to_string(),
32            )]),
33        )
34        .with_dependencies(vec![shell_task_id.clone()]);
35
36    let dependent_task_id = taskflow.submit_task(dependent_task).await?;
37    println!("Submitted dependent task: {}", dependent_task_id);
38
39    let taskflow_clone = std::sync::Arc::new(taskflow);
40    let taskflow_for_execution = taskflow_clone.clone();
41
42    let execution_handle = tokio::spawn(async move {
43        if let Err(e) = taskflow_for_execution.start().await {
44            eprintln!("TaskFlow execution failed: {}", e);
45        }
46    });
47
48    tokio::time::sleep(Duration::from_secs(2)).await;
49
50    loop {
51        let metrics = taskflow_clone.get_task_metrics().await?;
52        println!(
53            "Task metrics: pending={}, running={}, completed={}, failed={}, total={}",
54            metrics.pending, metrics.running, metrics.completed, metrics.failed, metrics.total
55        );
56
57        if metrics.pending == 0 && metrics.running == 0 {
58            break;
59        }
60
61        tokio::time::sleep(Duration::from_secs(1)).await;
62    }
63
64    println!("All tasks completed!");
65
66    let tasks = taskflow_clone.list_tasks(None).await?;
67    for task in tasks {
68        println!("Task: {} - Status: {:?}", task.definition.name, task.status);
69        if let Some(result) = &task.result {
70            if result.success {
71                println!("  Output: {:?}", result.output);
72            } else {
73                println!("  Error: {:?}", result.error);
74            }
75        }
76    }
77
78    taskflow_clone.shutdown().await?;
79    execution_handle.abort();
80
81    Ok(())
82}