pub struct TaskDefinition {
pub id: String,
pub name: String,
pub task_type: String,
pub payload: HashMap<String, Value>,
pub priority: i32,
pub max_retries: u32,
pub timeout_seconds: u64,
pub dependencies: Vec<String>,
pub tags: Vec<String>,
pub created_at: DateTime<Utc>,
pub scheduled_at: Option<DateTime<Utc>>,
}Fields§
§id: String§name: String§task_type: String§payload: HashMap<String, Value>§priority: i32§max_retries: u32§timeout_seconds: u64§dependencies: Vec<String>§created_at: DateTime<Utc>§scheduled_at: Option<DateTime<Utc>>Implementations§
Source§impl TaskDefinition
impl TaskDefinition
Sourcepub fn new(name: &str, task_type: &str) -> Self
pub fn new(name: &str, task_type: &str) -> Self
Examples found in repository?
examples/basic_usage.rs (line 26)
6async fn main() -> Result<(), Box<dyn std::error::Error>> {
7 init();
8
9 let config = TaskFlowConfig::with_in_memory();
10 let taskflow = TaskFlow::new(config).await?;
11
12 println!("TaskFlow framework started!");
13
14 let task_id = taskflow
15 .submit_http_task("fetch_example", "https://httpbin.org/get", Some("GET"))
16 .await?;
17
18 println!("Submitted HTTP task: {}", task_id);
19
20 let shell_task_id = taskflow
21 .submit_shell_task("list_files", "ls", vec!["-la"])
22 .await?;
23
24 println!("Submitted shell task: {}", shell_task_id);
25
26 let dependent_task = TaskDefinition::new("dependent_task", "shell_command")
27 .with_payload("command", serde_json::Value::String("echo".to_string()))
28 .with_payload(
29 "args",
30 serde_json::Value::Array(vec![serde_json::Value::String(
31 "This task depends on the shell task".to_string(),
32 )]),
33 )
34 .with_dependencies(vec![shell_task_id.clone()]);
35
36 let dependent_task_id = taskflow.submit_task(dependent_task).await?;
37 println!("Submitted dependent task: {}", dependent_task_id);
38
39 let taskflow_clone = std::sync::Arc::new(taskflow);
40 let taskflow_for_execution = taskflow_clone.clone();
41
42 let execution_handle = tokio::spawn(async move {
43 if let Err(e) = taskflow_for_execution.start().await {
44 eprintln!("TaskFlow execution failed: {}", e);
45 }
46 });
47
48 tokio::time::sleep(Duration::from_secs(2)).await;
49
50 loop {
51 let metrics = taskflow_clone.get_task_metrics().await?;
52 println!(
53 "Task metrics: pending={}, running={}, completed={}, failed={}, total={}",
54 metrics.pending, metrics.running, metrics.completed, metrics.failed, metrics.total
55 );
56
57 if metrics.pending == 0 && metrics.running == 0 {
58 break;
59 }
60
61 tokio::time::sleep(Duration::from_secs(1)).await;
62 }
63
64 println!("All tasks completed!");
65
66 let tasks = taskflow_clone.list_tasks(None).await?;
67 for task in tasks {
68 println!("Task: {} - Status: {:?}", task.definition.name, task.status);
69 if let Some(result) = &task.result {
70 if result.success {
71 println!(" Output: {:?}", result.output);
72 } else {
73 println!(" Error: {:?}", result.error);
74 }
75 }
76 }
77
78 taskflow_clone.shutdown().await?;
79 execution_handle.abort();
80
81 Ok(())
82}More examples
examples/simple_execution.rs (line 15)
8async fn main() -> Result<(), Box<dyn std::error::Error>> {
9 println!("Testing handler execution directly...\n");
10
11 // Test Python handler
12 println!("=== Testing Python Handler ===");
13 let python_handler = PythonTaskHandler::new();
14 let python_task = Task::new(
15 TaskDefinition::new("test-python", "python_script")
16 .with_payload(
17 "script",
18 json!("print('Hello from direct execution!'); x = 5 + 3; print(f'5 + 3 = {x}')"),
19 )
20 .with_payload("args", json!([])),
21 );
22
23 match python_handler.execute(&python_task).await {
24 Ok(result) => {
25 println!("Python execution successful: {}", result.success);
26 println!("Output: {:?}", result.output);
27 }
28 Err(e) => println!("Python execution failed: {}", e),
29 }
30
31 println!("\n=== Testing File Handler ===");
32 // Test File handler
33 let file_handler = FileTaskHandler::new();
34
35 // Create a test file
36 let create_task = Task::new(
37 TaskDefinition::new("test-create", "file_operation")
38 .with_payload("operation", json!("write"))
39 .with_payload("path", json!("/tmp/test_direct.txt"))
40 .with_payload(
41 "content",
42 json!("This was created by direct handler execution\nTest successful!"),
43 ),
44 );
45
46 match file_handler.execute(&create_task).await {
47 Ok(result) => {
48 println!("File creation successful: {}", result.success);
49 println!("Output: {:?}", result.output);
50 }
51 Err(e) => println!("File creation failed: {}", e),
52 }
53
54 // Read the file back
55 println!("\n=== Testing File Read ===");
56 let read_task = Task::new(
57 TaskDefinition::new("test-read", "file_operation")
58 .with_payload("operation", json!("read"))
59 .with_payload("path", json!("/tmp/test_direct.txt")),
60 );
61
62 match file_handler.execute(&read_task).await {
63 Ok(result) => {
64 println!("File read successful: {}", result.success);
65 if let Some(content) = result.output {
66 println!("File content: {}", content);
67 }
68 }
69 Err(e) => println!("File read failed: {}", e),
70 }
71
72 // Cleanup
73 println!("\n=== Cleaning Up ===");
74 let delete_task = Task::new(
75 TaskDefinition::new("test-delete", "file_operation")
76 .with_payload("operation", json!("delete"))
77 .with_payload("path", json!("/tmp/test_direct.txt")),
78 );
79
80 match file_handler.execute(&delete_task).await {
81 Ok(result) => println!("File cleanup successful: {}", result.success),
82 Err(e) => println!("File cleanup failed: {}", e),
83 }
84
85 println!("\nDirect handler execution test completed!");
86 Ok(())
87}examples/custom_handler.rs (line 182)
169async fn main() -> Result<(), Box<dyn std::error::Error>> {
170 init();
171
172 let config = TaskFlowConfig::with_in_memory();
173 let taskflow = TaskFlow::new(config).await?;
174
175 taskflow.register_handler(Arc::new(MathTaskHandler)).await;
176 taskflow
177 .register_handler(Arc::new(DataProcessingHandler))
178 .await;
179
180 println!("TaskFlow with custom handlers started!");
181
182 let add_task = TaskDefinition::new("addition", "math_operation")
183 .with_payload("operation", serde_json::Value::String("add".to_string()))
184 .with_payload("a", serde_json::Value::Number(serde_json::Number::from(10)))
185 .with_payload("b", serde_json::Value::Number(serde_json::Number::from(5)));
186
187 let add_task_id = taskflow.submit_task(add_task).await?;
188 println!("Submitted addition task: {}", add_task_id);
189
190 let multiply_task = TaskDefinition::new("multiplication", "math_operation")
191 .with_payload(
192 "operation",
193 serde_json::Value::String("multiply".to_string()),
194 )
195 .with_payload("a", serde_json::Value::Number(serde_json::Number::from(7)))
196 .with_payload("b", serde_json::Value::Number(serde_json::Number::from(3)));
197
198 let multiply_task_id = taskflow.submit_task(multiply_task).await?;
199 println!("Submitted multiplication task: {}", multiply_task_id);
200
201 let data_array = vec![
202 serde_json::Value::Number(serde_json::Number::from(1)),
203 serde_json::Value::Number(serde_json::Number::from(2)),
204 serde_json::Value::Number(serde_json::Number::from(3)),
205 serde_json::Value::Number(serde_json::Number::from(4)),
206 serde_json::Value::Number(serde_json::Number::from(5)),
207 ];
208
209 let sum_task = TaskDefinition::new("sum_data", "data_processing")
210 .with_payload("operation", serde_json::Value::String("sum".to_string()))
211 .with_payload("data", serde_json::Value::Array(data_array.clone()));
212
213 let sum_task_id = taskflow.submit_task(sum_task).await?;
214 println!("Submitted sum task: {}", sum_task_id);
215
216 let avg_task = TaskDefinition::new("average_data", "data_processing")
217 .with_payload(
218 "operation",
219 serde_json::Value::String("average".to_string()),
220 )
221 .with_payload("data", serde_json::Value::Array(data_array))
222 .with_dependencies(vec![sum_task_id.clone()]);
223
224 let avg_task_id = taskflow.submit_task(avg_task).await?;
225 println!("Submitted average task (depends on sum): {}", avg_task_id);
226
227 let taskflow_clone = std::sync::Arc::new(taskflow);
228 let taskflow_for_execution = taskflow_clone.clone();
229
230 let execution_handle = tokio::spawn(async move {
231 if let Err(e) = taskflow_for_execution.start().await {
232 eprintln!("TaskFlow execution failed: {}", e);
233 }
234 });
235
236 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
237
238 loop {
239 let metrics = taskflow_clone.get_task_metrics().await?;
240 println!(
241 "Task metrics: pending={}, running={}, completed={}, failed={}",
242 metrics.pending, metrics.running, metrics.completed, metrics.failed
243 );
244
245 if metrics.pending == 0 && metrics.running == 0 {
246 break;
247 }
248
249 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
250 }
251
252 println!("\nAll tasks completed! Results:");
253
254 let tasks = taskflow_clone.list_tasks(None).await?;
255 for task in tasks {
256 println!(
257 "\nTask: {} ({})",
258 task.definition.name, task.definition.task_type
259 );
260 println!(" Status: {:?}", task.status);
261 if let Some(result) = &task.result {
262 if result.success {
263 println!(
264 " Result: {}",
265 result.output.as_ref().unwrap_or(&"No output".to_string())
266 );
267 println!(" Execution time: {}ms", result.execution_time_ms);
268 if !result.metadata.is_empty() {
269 println!(" Metadata: {:?}", result.metadata);
270 }
271 } else {
272 println!(
273 " Error: {}",
274 result
275 .error
276 .as_ref()
277 .unwrap_or(&"Unknown error".to_string())
278 );
279 }
280 }
281 }
282
283 let final_metrics = taskflow_clone.get_task_metrics().await?;
284 println!("\nFinal metrics:");
285 println!(" Total tasks: {}", final_metrics.total);
286 println!(
287 " Success rate: {:.1}%",
288 final_metrics.success_rate() * 100.0
289 );
290
291 taskflow_clone.shutdown().await?;
292 execution_handle.abort();
293
294 Ok(())
295}Sourcepub fn with_payload(self, key: &str, value: Value) -> Self
pub fn with_payload(self, key: &str, value: Value) -> Self
Examples found in repository?
examples/basic_usage.rs (line 27)
6async fn main() -> Result<(), Box<dyn std::error::Error>> {
7 init();
8
9 let config = TaskFlowConfig::with_in_memory();
10 let taskflow = TaskFlow::new(config).await?;
11
12 println!("TaskFlow framework started!");
13
14 let task_id = taskflow
15 .submit_http_task("fetch_example", "https://httpbin.org/get", Some("GET"))
16 .await?;
17
18 println!("Submitted HTTP task: {}", task_id);
19
20 let shell_task_id = taskflow
21 .submit_shell_task("list_files", "ls", vec!["-la"])
22 .await?;
23
24 println!("Submitted shell task: {}", shell_task_id);
25
26 let dependent_task = TaskDefinition::new("dependent_task", "shell_command")
27 .with_payload("command", serde_json::Value::String("echo".to_string()))
28 .with_payload(
29 "args",
30 serde_json::Value::Array(vec![serde_json::Value::String(
31 "This task depends on the shell task".to_string(),
32 )]),
33 )
34 .with_dependencies(vec![shell_task_id.clone()]);
35
36 let dependent_task_id = taskflow.submit_task(dependent_task).await?;
37 println!("Submitted dependent task: {}", dependent_task_id);
38
39 let taskflow_clone = std::sync::Arc::new(taskflow);
40 let taskflow_for_execution = taskflow_clone.clone();
41
42 let execution_handle = tokio::spawn(async move {
43 if let Err(e) = taskflow_for_execution.start().await {
44 eprintln!("TaskFlow execution failed: {}", e);
45 }
46 });
47
48 tokio::time::sleep(Duration::from_secs(2)).await;
49
50 loop {
51 let metrics = taskflow_clone.get_task_metrics().await?;
52 println!(
53 "Task metrics: pending={}, running={}, completed={}, failed={}, total={}",
54 metrics.pending, metrics.running, metrics.completed, metrics.failed, metrics.total
55 );
56
57 if metrics.pending == 0 && metrics.running == 0 {
58 break;
59 }
60
61 tokio::time::sleep(Duration::from_secs(1)).await;
62 }
63
64 println!("All tasks completed!");
65
66 let tasks = taskflow_clone.list_tasks(None).await?;
67 for task in tasks {
68 println!("Task: {} - Status: {:?}", task.definition.name, task.status);
69 if let Some(result) = &task.result {
70 if result.success {
71 println!(" Output: {:?}", result.output);
72 } else {
73 println!(" Error: {:?}", result.error);
74 }
75 }
76 }
77
78 taskflow_clone.shutdown().await?;
79 execution_handle.abort();
80
81 Ok(())
82}More examples
examples/simple_execution.rs (lines 16-19)
8async fn main() -> Result<(), Box<dyn std::error::Error>> {
9 println!("Testing handler execution directly...\n");
10
11 // Test Python handler
12 println!("=== Testing Python Handler ===");
13 let python_handler = PythonTaskHandler::new();
14 let python_task = Task::new(
15 TaskDefinition::new("test-python", "python_script")
16 .with_payload(
17 "script",
18 json!("print('Hello from direct execution!'); x = 5 + 3; print(f'5 + 3 = {x}')"),
19 )
20 .with_payload("args", json!([])),
21 );
22
23 match python_handler.execute(&python_task).await {
24 Ok(result) => {
25 println!("Python execution successful: {}", result.success);
26 println!("Output: {:?}", result.output);
27 }
28 Err(e) => println!("Python execution failed: {}", e),
29 }
30
31 println!("\n=== Testing File Handler ===");
32 // Test File handler
33 let file_handler = FileTaskHandler::new();
34
35 // Create a test file
36 let create_task = Task::new(
37 TaskDefinition::new("test-create", "file_operation")
38 .with_payload("operation", json!("write"))
39 .with_payload("path", json!("/tmp/test_direct.txt"))
40 .with_payload(
41 "content",
42 json!("This was created by direct handler execution\nTest successful!"),
43 ),
44 );
45
46 match file_handler.execute(&create_task).await {
47 Ok(result) => {
48 println!("File creation successful: {}", result.success);
49 println!("Output: {:?}", result.output);
50 }
51 Err(e) => println!("File creation failed: {}", e),
52 }
53
54 // Read the file back
55 println!("\n=== Testing File Read ===");
56 let read_task = Task::new(
57 TaskDefinition::new("test-read", "file_operation")
58 .with_payload("operation", json!("read"))
59 .with_payload("path", json!("/tmp/test_direct.txt")),
60 );
61
62 match file_handler.execute(&read_task).await {
63 Ok(result) => {
64 println!("File read successful: {}", result.success);
65 if let Some(content) = result.output {
66 println!("File content: {}", content);
67 }
68 }
69 Err(e) => println!("File read failed: {}", e),
70 }
71
72 // Cleanup
73 println!("\n=== Cleaning Up ===");
74 let delete_task = Task::new(
75 TaskDefinition::new("test-delete", "file_operation")
76 .with_payload("operation", json!("delete"))
77 .with_payload("path", json!("/tmp/test_direct.txt")),
78 );
79
80 match file_handler.execute(&delete_task).await {
81 Ok(result) => println!("File cleanup successful: {}", result.success),
82 Err(e) => println!("File cleanup failed: {}", e),
83 }
84
85 println!("\nDirect handler execution test completed!");
86 Ok(())
87}examples/custom_handler.rs (line 183)
169async fn main() -> Result<(), Box<dyn std::error::Error>> {
170 init();
171
172 let config = TaskFlowConfig::with_in_memory();
173 let taskflow = TaskFlow::new(config).await?;
174
175 taskflow.register_handler(Arc::new(MathTaskHandler)).await;
176 taskflow
177 .register_handler(Arc::new(DataProcessingHandler))
178 .await;
179
180 println!("TaskFlow with custom handlers started!");
181
182 let add_task = TaskDefinition::new("addition", "math_operation")
183 .with_payload("operation", serde_json::Value::String("add".to_string()))
184 .with_payload("a", serde_json::Value::Number(serde_json::Number::from(10)))
185 .with_payload("b", serde_json::Value::Number(serde_json::Number::from(5)));
186
187 let add_task_id = taskflow.submit_task(add_task).await?;
188 println!("Submitted addition task: {}", add_task_id);
189
190 let multiply_task = TaskDefinition::new("multiplication", "math_operation")
191 .with_payload(
192 "operation",
193 serde_json::Value::String("multiply".to_string()),
194 )
195 .with_payload("a", serde_json::Value::Number(serde_json::Number::from(7)))
196 .with_payload("b", serde_json::Value::Number(serde_json::Number::from(3)));
197
198 let multiply_task_id = taskflow.submit_task(multiply_task).await?;
199 println!("Submitted multiplication task: {}", multiply_task_id);
200
201 let data_array = vec![
202 serde_json::Value::Number(serde_json::Number::from(1)),
203 serde_json::Value::Number(serde_json::Number::from(2)),
204 serde_json::Value::Number(serde_json::Number::from(3)),
205 serde_json::Value::Number(serde_json::Number::from(4)),
206 serde_json::Value::Number(serde_json::Number::from(5)),
207 ];
208
209 let sum_task = TaskDefinition::new("sum_data", "data_processing")
210 .with_payload("operation", serde_json::Value::String("sum".to_string()))
211 .with_payload("data", serde_json::Value::Array(data_array.clone()));
212
213 let sum_task_id = taskflow.submit_task(sum_task).await?;
214 println!("Submitted sum task: {}", sum_task_id);
215
216 let avg_task = TaskDefinition::new("average_data", "data_processing")
217 .with_payload(
218 "operation",
219 serde_json::Value::String("average".to_string()),
220 )
221 .with_payload("data", serde_json::Value::Array(data_array))
222 .with_dependencies(vec![sum_task_id.clone()]);
223
224 let avg_task_id = taskflow.submit_task(avg_task).await?;
225 println!("Submitted average task (depends on sum): {}", avg_task_id);
226
227 let taskflow_clone = std::sync::Arc::new(taskflow);
228 let taskflow_for_execution = taskflow_clone.clone();
229
230 let execution_handle = tokio::spawn(async move {
231 if let Err(e) = taskflow_for_execution.start().await {
232 eprintln!("TaskFlow execution failed: {}", e);
233 }
234 });
235
236 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
237
238 loop {
239 let metrics = taskflow_clone.get_task_metrics().await?;
240 println!(
241 "Task metrics: pending={}, running={}, completed={}, failed={}",
242 metrics.pending, metrics.running, metrics.completed, metrics.failed
243 );
244
245 if metrics.pending == 0 && metrics.running == 0 {
246 break;
247 }
248
249 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
250 }
251
252 println!("\nAll tasks completed! Results:");
253
254 let tasks = taskflow_clone.list_tasks(None).await?;
255 for task in tasks {
256 println!(
257 "\nTask: {} ({})",
258 task.definition.name, task.definition.task_type
259 );
260 println!(" Status: {:?}", task.status);
261 if let Some(result) = &task.result {
262 if result.success {
263 println!(
264 " Result: {}",
265 result.output.as_ref().unwrap_or(&"No output".to_string())
266 );
267 println!(" Execution time: {}ms", result.execution_time_ms);
268 if !result.metadata.is_empty() {
269 println!(" Metadata: {:?}", result.metadata);
270 }
271 } else {
272 println!(
273 " Error: {}",
274 result
275 .error
276 .as_ref()
277 .unwrap_or(&"Unknown error".to_string())
278 );
279 }
280 }
281 }
282
283 let final_metrics = taskflow_clone.get_task_metrics().await?;
284 println!("\nFinal metrics:");
285 println!(" Total tasks: {}", final_metrics.total);
286 println!(
287 " Success rate: {:.1}%",
288 final_metrics.success_rate() * 100.0
289 );
290
291 taskflow_clone.shutdown().await?;
292 execution_handle.abort();
293
294 Ok(())
295}pub fn with_priority(self, priority: i32) -> Self
pub fn with_timeout(self, timeout_seconds: u64) -> Self
Sourcepub fn with_dependencies(self, dependencies: Vec<String>) -> Self
pub fn with_dependencies(self, dependencies: Vec<String>) -> Self
Examples found in repository?
examples/basic_usage.rs (line 34)
6async fn main() -> Result<(), Box<dyn std::error::Error>> {
7 init();
8
9 let config = TaskFlowConfig::with_in_memory();
10 let taskflow = TaskFlow::new(config).await?;
11
12 println!("TaskFlow framework started!");
13
14 let task_id = taskflow
15 .submit_http_task("fetch_example", "https://httpbin.org/get", Some("GET"))
16 .await?;
17
18 println!("Submitted HTTP task: {}", task_id);
19
20 let shell_task_id = taskflow
21 .submit_shell_task("list_files", "ls", vec!["-la"])
22 .await?;
23
24 println!("Submitted shell task: {}", shell_task_id);
25
26 let dependent_task = TaskDefinition::new("dependent_task", "shell_command")
27 .with_payload("command", serde_json::Value::String("echo".to_string()))
28 .with_payload(
29 "args",
30 serde_json::Value::Array(vec![serde_json::Value::String(
31 "This task depends on the shell task".to_string(),
32 )]),
33 )
34 .with_dependencies(vec![shell_task_id.clone()]);
35
36 let dependent_task_id = taskflow.submit_task(dependent_task).await?;
37 println!("Submitted dependent task: {}", dependent_task_id);
38
39 let taskflow_clone = std::sync::Arc::new(taskflow);
40 let taskflow_for_execution = taskflow_clone.clone();
41
42 let execution_handle = tokio::spawn(async move {
43 if let Err(e) = taskflow_for_execution.start().await {
44 eprintln!("TaskFlow execution failed: {}", e);
45 }
46 });
47
48 tokio::time::sleep(Duration::from_secs(2)).await;
49
50 loop {
51 let metrics = taskflow_clone.get_task_metrics().await?;
52 println!(
53 "Task metrics: pending={}, running={}, completed={}, failed={}, total={}",
54 metrics.pending, metrics.running, metrics.completed, metrics.failed, metrics.total
55 );
56
57 if metrics.pending == 0 && metrics.running == 0 {
58 break;
59 }
60
61 tokio::time::sleep(Duration::from_secs(1)).await;
62 }
63
64 println!("All tasks completed!");
65
66 let tasks = taskflow_clone.list_tasks(None).await?;
67 for task in tasks {
68 println!("Task: {} - Status: {:?}", task.definition.name, task.status);
69 if let Some(result) = &task.result {
70 if result.success {
71 println!(" Output: {:?}", result.output);
72 } else {
73 println!(" Error: {:?}", result.error);
74 }
75 }
76 }
77
78 taskflow_clone.shutdown().await?;
79 execution_handle.abort();
80
81 Ok(())
82}More examples
examples/custom_handler.rs (line 222)
169async fn main() -> Result<(), Box<dyn std::error::Error>> {
170 init();
171
172 let config = TaskFlowConfig::with_in_memory();
173 let taskflow = TaskFlow::new(config).await?;
174
175 taskflow.register_handler(Arc::new(MathTaskHandler)).await;
176 taskflow
177 .register_handler(Arc::new(DataProcessingHandler))
178 .await;
179
180 println!("TaskFlow with custom handlers started!");
181
182 let add_task = TaskDefinition::new("addition", "math_operation")
183 .with_payload("operation", serde_json::Value::String("add".to_string()))
184 .with_payload("a", serde_json::Value::Number(serde_json::Number::from(10)))
185 .with_payload("b", serde_json::Value::Number(serde_json::Number::from(5)));
186
187 let add_task_id = taskflow.submit_task(add_task).await?;
188 println!("Submitted addition task: {}", add_task_id);
189
190 let multiply_task = TaskDefinition::new("multiplication", "math_operation")
191 .with_payload(
192 "operation",
193 serde_json::Value::String("multiply".to_string()),
194 )
195 .with_payload("a", serde_json::Value::Number(serde_json::Number::from(7)))
196 .with_payload("b", serde_json::Value::Number(serde_json::Number::from(3)));
197
198 let multiply_task_id = taskflow.submit_task(multiply_task).await?;
199 println!("Submitted multiplication task: {}", multiply_task_id);
200
201 let data_array = vec![
202 serde_json::Value::Number(serde_json::Number::from(1)),
203 serde_json::Value::Number(serde_json::Number::from(2)),
204 serde_json::Value::Number(serde_json::Number::from(3)),
205 serde_json::Value::Number(serde_json::Number::from(4)),
206 serde_json::Value::Number(serde_json::Number::from(5)),
207 ];
208
209 let sum_task = TaskDefinition::new("sum_data", "data_processing")
210 .with_payload("operation", serde_json::Value::String("sum".to_string()))
211 .with_payload("data", serde_json::Value::Array(data_array.clone()));
212
213 let sum_task_id = taskflow.submit_task(sum_task).await?;
214 println!("Submitted sum task: {}", sum_task_id);
215
216 let avg_task = TaskDefinition::new("average_data", "data_processing")
217 .with_payload(
218 "operation",
219 serde_json::Value::String("average".to_string()),
220 )
221 .with_payload("data", serde_json::Value::Array(data_array))
222 .with_dependencies(vec![sum_task_id.clone()]);
223
224 let avg_task_id = taskflow.submit_task(avg_task).await?;
225 println!("Submitted average task (depends on sum): {}", avg_task_id);
226
227 let taskflow_clone = std::sync::Arc::new(taskflow);
228 let taskflow_for_execution = taskflow_clone.clone();
229
230 let execution_handle = tokio::spawn(async move {
231 if let Err(e) = taskflow_for_execution.start().await {
232 eprintln!("TaskFlow execution failed: {}", e);
233 }
234 });
235
236 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
237
238 loop {
239 let metrics = taskflow_clone.get_task_metrics().await?;
240 println!(
241 "Task metrics: pending={}, running={}, completed={}, failed={}",
242 metrics.pending, metrics.running, metrics.completed, metrics.failed
243 );
244
245 if metrics.pending == 0 && metrics.running == 0 {
246 break;
247 }
248
249 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
250 }
251
252 println!("\nAll tasks completed! Results:");
253
254 let tasks = taskflow_clone.list_tasks(None).await?;
255 for task in tasks {
256 println!(
257 "\nTask: {} ({})",
258 task.definition.name, task.definition.task_type
259 );
260 println!(" Status: {:?}", task.status);
261 if let Some(result) = &task.result {
262 if result.success {
263 println!(
264 " Result: {}",
265 result.output.as_ref().unwrap_or(&"No output".to_string())
266 );
267 println!(" Execution time: {}ms", result.execution_time_ms);
268 if !result.metadata.is_empty() {
269 println!(" Metadata: {:?}", result.metadata);
270 }
271 } else {
272 println!(
273 " Error: {}",
274 result
275 .error
276 .as_ref()
277 .unwrap_or(&"Unknown error".to_string())
278 );
279 }
280 }
281 }
282
283 let final_metrics = taskflow_clone.get_task_metrics().await?;
284 println!("\nFinal metrics:");
285 println!(" Total tasks: {}", final_metrics.total);
286 println!(
287 " Success rate: {:.1}%",
288 final_metrics.success_rate() * 100.0
289 );
290
291 taskflow_clone.shutdown().await?;
292 execution_handle.abort();
293
294 Ok(())
295}pub fn schedule_at(self, scheduled_at: DateTime<Utc>) -> Self
Trait Implementations§
Source§impl Clone for TaskDefinition
impl Clone for TaskDefinition
Source§fn clone(&self) -> TaskDefinition
fn clone(&self) -> TaskDefinition
Returns a duplicate of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source. Read moreSource§impl Debug for TaskDefinition
impl Debug for TaskDefinition
Source§impl<'de> Deserialize<'de> for TaskDefinition
impl<'de> Deserialize<'de> for TaskDefinition
Source§fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
Deserialize this value from the given Serde deserializer. Read more
Auto Trait Implementations§
impl Freeze for TaskDefinition
impl RefUnwindSafe for TaskDefinition
impl Send for TaskDefinition
impl Sync for TaskDefinition
impl Unpin for TaskDefinition
impl UnwindSafe for TaskDefinition
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more