basic_usage/
basic_usage.rs1use std::time::Duration;
2use taskflow_rs::{TaskDefinition, TaskFlow, framework::TaskFlowConfig};
3use tracing_subscriber::fmt::init;
4
5#[tokio::main]
6async fn main() -> Result<(), Box<dyn std::error::Error>> {
7 init();
8
9 let config = TaskFlowConfig::with_in_memory();
10 let taskflow = TaskFlow::new(config).await?;
11
12 println!("TaskFlow framework started!");
13
14 let task_id = taskflow
15 .submit_http_task("fetch_example", "https://httpbin.org/get", Some("GET"))
16 .await?;
17
18 println!("Submitted HTTP task: {}", task_id);
19
20 let shell_task_id = taskflow
21 .submit_shell_task("list_files", "ls", vec!["-la"])
22 .await?;
23
24 println!("Submitted shell task: {}", shell_task_id);
25
26 let dependent_task = TaskDefinition::new("dependent_task", "shell_command")
27 .with_payload("command", serde_json::Value::String("echo".to_string()))
28 .with_payload(
29 "args",
30 serde_json::Value::Array(vec![serde_json::Value::String(
31 "This task depends on the shell task".to_string(),
32 )]),
33 )
34 .with_dependencies(vec![shell_task_id.clone()]);
35
36 let dependent_task_id = taskflow.submit_task(dependent_task).await?;
37 println!("Submitted dependent task: {}", dependent_task_id);
38
39 let taskflow_clone = std::sync::Arc::new(taskflow);
40 let taskflow_for_execution = taskflow_clone.clone();
41
42 let execution_handle = tokio::spawn(async move {
43 if let Err(e) = taskflow_for_execution.start().await {
44 eprintln!("TaskFlow execution failed: {}", e);
45 }
46 });
47
48 tokio::time::sleep(Duration::from_secs(2)).await;
49
50 loop {
51 let metrics = taskflow_clone.get_task_metrics().await?;
52 println!(
53 "Task metrics: pending={}, running={}, completed={}, failed={}, total={}",
54 metrics.pending, metrics.running, metrics.completed, metrics.failed, metrics.total
55 );
56
57 if metrics.pending == 0 && metrics.running == 0 {
58 break;
59 }
60
61 tokio::time::sleep(Duration::from_secs(1)).await;
62 }
63
64 println!("All tasks completed!");
65
66 let tasks = taskflow_clone.list_tasks(None).await?;
67 for task in tasks {
68 println!("Task: {} - Status: {:?}", task.definition.name, task.status);
69 if let Some(result) = &task.result {
70 if result.success {
71 println!(" Output: {:?}", result.output);
72 } else {
73 println!(" Error: {:?}", result.error);
74 }
75 }
76 }
77
78 taskflow_clone.shutdown().await?;
79 execution_handle.abort();
80
81 Ok(())
82}