pub struct TaskFlow { /* private fields */ }Implementations§
Source§impl TaskFlow
impl TaskFlow
Sourcepub async fn new(config: TaskFlowConfig) -> Result<Self>
pub async fn new(config: TaskFlowConfig) -> Result<Self>
Examples found in repository?
examples/basic_usage.rs (line 10)
6async fn main() -> Result<(), Box<dyn std::error::Error>> {
7 init();
8
9 let config = TaskFlowConfig::with_in_memory();
10 let taskflow = TaskFlow::new(config).await?;
11
12 println!("TaskFlow framework started!");
13
14 let task_id = taskflow
15 .submit_http_task("fetch_example", "https://httpbin.org/get", Some("GET"))
16 .await?;
17
18 println!("Submitted HTTP task: {}", task_id);
19
20 let shell_task_id = taskflow
21 .submit_shell_task("list_files", "ls", vec!["-la"])
22 .await?;
23
24 println!("Submitted shell task: {}", shell_task_id);
25
26 let dependent_task = TaskDefinition::new("dependent_task", "shell_command")
27 .with_payload("command", serde_json::Value::String("echo".to_string()))
28 .with_payload(
29 "args",
30 serde_json::Value::Array(vec![serde_json::Value::String(
31 "This task depends on the shell task".to_string(),
32 )]),
33 )
34 .with_dependencies(vec![shell_task_id.clone()]);
35
36 let dependent_task_id = taskflow.submit_task(dependent_task).await?;
37 println!("Submitted dependent task: {}", dependent_task_id);
38
39 let taskflow_clone = std::sync::Arc::new(taskflow);
40 let taskflow_for_execution = taskflow_clone.clone();
41
42 let execution_handle = tokio::spawn(async move {
43 if let Err(e) = taskflow_for_execution.start().await {
44 eprintln!("TaskFlow execution failed: {}", e);
45 }
46 });
47
48 tokio::time::sleep(Duration::from_secs(2)).await;
49
50 loop {
51 let metrics = taskflow_clone.get_task_metrics().await?;
52 println!(
53 "Task metrics: pending={}, running={}, completed={}, failed={}, total={}",
54 metrics.pending, metrics.running, metrics.completed, metrics.failed, metrics.total
55 );
56
57 if metrics.pending == 0 && metrics.running == 0 {
58 break;
59 }
60
61 tokio::time::sleep(Duration::from_secs(1)).await;
62 }
63
64 println!("All tasks completed!");
65
66 let tasks = taskflow_clone.list_tasks(None).await?;
67 for task in tasks {
68 println!("Task: {} - Status: {:?}", task.definition.name, task.status);
69 if let Some(result) = &task.result {
70 if result.success {
71 println!(" Output: {:?}", result.output);
72 } else {
73 println!(" Error: {:?}", result.error);
74 }
75 }
76 }
77
78 taskflow_clone.shutdown().await?;
79 execution_handle.abort();
80
81 Ok(())
82}More examples
examples/custom_handler.rs (line 173)
169async fn main() -> Result<(), Box<dyn std::error::Error>> {
170 init();
171
172 let config = TaskFlowConfig::with_in_memory();
173 let taskflow = TaskFlow::new(config).await?;
174
175 taskflow.register_handler(Arc::new(MathTaskHandler)).await;
176 taskflow
177 .register_handler(Arc::new(DataProcessingHandler))
178 .await;
179
180 println!("TaskFlow with custom handlers started!");
181
182 let add_task = TaskDefinition::new("addition", "math_operation")
183 .with_payload("operation", serde_json::Value::String("add".to_string()))
184 .with_payload("a", serde_json::Value::Number(serde_json::Number::from(10)))
185 .with_payload("b", serde_json::Value::Number(serde_json::Number::from(5)));
186
187 let add_task_id = taskflow.submit_task(add_task).await?;
188 println!("Submitted addition task: {}", add_task_id);
189
190 let multiply_task = TaskDefinition::new("multiplication", "math_operation")
191 .with_payload(
192 "operation",
193 serde_json::Value::String("multiply".to_string()),
194 )
195 .with_payload("a", serde_json::Value::Number(serde_json::Number::from(7)))
196 .with_payload("b", serde_json::Value::Number(serde_json::Number::from(3)));
197
198 let multiply_task_id = taskflow.submit_task(multiply_task).await?;
199 println!("Submitted multiplication task: {}", multiply_task_id);
200
201 let data_array = vec![
202 serde_json::Value::Number(serde_json::Number::from(1)),
203 serde_json::Value::Number(serde_json::Number::from(2)),
204 serde_json::Value::Number(serde_json::Number::from(3)),
205 serde_json::Value::Number(serde_json::Number::from(4)),
206 serde_json::Value::Number(serde_json::Number::from(5)),
207 ];
208
209 let sum_task = TaskDefinition::new("sum_data", "data_processing")
210 .with_payload("operation", serde_json::Value::String("sum".to_string()))
211 .with_payload("data", serde_json::Value::Array(data_array.clone()));
212
213 let sum_task_id = taskflow.submit_task(sum_task).await?;
214 println!("Submitted sum task: {}", sum_task_id);
215
216 let avg_task = TaskDefinition::new("average_data", "data_processing")
217 .with_payload(
218 "operation",
219 serde_json::Value::String("average".to_string()),
220 )
221 .with_payload("data", serde_json::Value::Array(data_array))
222 .with_dependencies(vec![sum_task_id.clone()]);
223
224 let avg_task_id = taskflow.submit_task(avg_task).await?;
225 println!("Submitted average task (depends on sum): {}", avg_task_id);
226
227 let taskflow_clone = std::sync::Arc::new(taskflow);
228 let taskflow_for_execution = taskflow_clone.clone();
229
230 let execution_handle = tokio::spawn(async move {
231 if let Err(e) = taskflow_for_execution.start().await {
232 eprintln!("TaskFlow execution failed: {}", e);
233 }
234 });
235
236 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
237
238 loop {
239 let metrics = taskflow_clone.get_task_metrics().await?;
240 println!(
241 "Task metrics: pending={}, running={}, completed={}, failed={}",
242 metrics.pending, metrics.running, metrics.completed, metrics.failed
243 );
244
245 if metrics.pending == 0 && metrics.running == 0 {
246 break;
247 }
248
249 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
250 }
251
252 println!("\nAll tasks completed! Results:");
253
254 let tasks = taskflow_clone.list_tasks(None).await?;
255 for task in tasks {
256 println!(
257 "\nTask: {} ({})",
258 task.definition.name, task.definition.task_type
259 );
260 println!(" Status: {:?}", task.status);
261 if let Some(result) = &task.result {
262 if result.success {
263 println!(
264 " Result: {}",
265 result.output.as_ref().unwrap_or(&"No output".to_string())
266 );
267 println!(" Execution time: {}ms", result.execution_time_ms);
268 if !result.metadata.is_empty() {
269 println!(" Metadata: {:?}", result.metadata);
270 }
271 } else {
272 println!(
273 " Error: {}",
274 result
275 .error
276 .as_ref()
277 .unwrap_or(&"Unknown error".to_string())
278 );
279 }
280 }
281 }
282
283 let final_metrics = taskflow_clone.get_task_metrics().await?;
284 println!("\nFinal metrics:");
285 println!(" Total tasks: {}", final_metrics.total);
286 println!(
287 " Success rate: {:.1}%",
288 final_metrics.success_rate() * 100.0
289 );
290
291 taskflow_clone.shutdown().await?;
292 execution_handle.abort();
293
294 Ok(())
295}Sourcepub async fn from_yaml_file<P: AsRef<Path>>(path: P) -> Result<Self>
pub async fn from_yaml_file<P: AsRef<Path>>(path: P) -> Result<Self>
Examples found in repository?
examples/yaml_config_usage.rs (line 10)
6async fn main() -> Result<(), Box<dyn std::error::Error>> {
7 init();
8
9 // Method 1: Load from YAML file
10 let taskflow = TaskFlow::from_yaml_file("examples/config.yaml").await?;
11 println!("TaskFlow framework started with YAML configuration!");
12
13 // Method 2: You can also load from YAML string directly
14 /*
15 let yaml_config = r#"
16scheduler:
17 poll_interval_seconds: 2
18 max_concurrent_tasks: 30
19 enable_dependency_resolution: true
20 cleanup_completed_tasks_after_hours: 24
21
22executor:
23 worker_id: "custom-worker-001"
24 max_concurrent_tasks: 15
25 task_timeout_seconds: 300
26 heartbeat_interval_seconds: 10
27
28storage_type: InMemory
29"#;
30 let taskflow = TaskFlow::from_yaml_str(yaml_config).await?;
31 "#;
32 */
33
34 let task_id = taskflow
35 .submit_http_task("fetch_example", "https://httpbin.org/get", Some("GET"))
36 .await?;
37
38 println!("Submitted HTTP task: {}", task_id);
39
40 let shell_task_id = taskflow
41 .submit_shell_task("list_files", "ls", vec!["-la"])
42 .await?;
43
44 println!("Submitted shell task: {}", shell_task_id);
45
46 let taskflow_clone = std::sync::Arc::new(taskflow);
47 let taskflow_for_execution = taskflow_clone.clone();
48
49 let execution_handle = tokio::spawn(async move {
50 if let Err(e) = taskflow_for_execution.start().await {
51 eprintln!("TaskFlow execution failed: {}", e);
52 }
53 });
54
55 tokio::time::sleep(Duration::from_secs(2)).await;
56
57 loop {
58 let metrics = taskflow_clone.get_task_metrics().await?;
59 println!(
60 "Task metrics: pending={}, running={}, completed={}, failed={}, total={}",
61 metrics.pending, metrics.running, metrics.completed, metrics.failed, metrics.total
62 );
63
64 if metrics.pending == 0 && metrics.running == 0 {
65 break;
66 }
67
68 tokio::time::sleep(Duration::from_secs(1)).await;
69 }
70
71 println!("All tasks completed!");
72
73 let tasks = taskflow_clone.list_tasks(None).await?;
74 for task in tasks {
75 println!("Task: {} - Status: {:?}", task.definition.name, task.status);
76 if let Some(result) = &task.result {
77 if result.success {
78 println!(" Output: {:?}", result.output);
79 } else {
80 println!(" Error: {:?}", result.error);
81 }
82 }
83 }
84
85 taskflow_clone.shutdown().await?;
86 execution_handle.abort();
87
88 Ok(())
89}pub async fn from_yaml_str(config_content: &str) -> Result<Self>
Sourcepub async fn register_handler(&self, handler: Arc<dyn TaskHandler>)
pub async fn register_handler(&self, handler: Arc<dyn TaskHandler>)
Examples found in repository?
examples/custom_handler.rs (line 175)
169async fn main() -> Result<(), Box<dyn std::error::Error>> {
170 init();
171
172 let config = TaskFlowConfig::with_in_memory();
173 let taskflow = TaskFlow::new(config).await?;
174
175 taskflow.register_handler(Arc::new(MathTaskHandler)).await;
176 taskflow
177 .register_handler(Arc::new(DataProcessingHandler))
178 .await;
179
180 println!("TaskFlow with custom handlers started!");
181
182 let add_task = TaskDefinition::new("addition", "math_operation")
183 .with_payload("operation", serde_json::Value::String("add".to_string()))
184 .with_payload("a", serde_json::Value::Number(serde_json::Number::from(10)))
185 .with_payload("b", serde_json::Value::Number(serde_json::Number::from(5)));
186
187 let add_task_id = taskflow.submit_task(add_task).await?;
188 println!("Submitted addition task: {}", add_task_id);
189
190 let multiply_task = TaskDefinition::new("multiplication", "math_operation")
191 .with_payload(
192 "operation",
193 serde_json::Value::String("multiply".to_string()),
194 )
195 .with_payload("a", serde_json::Value::Number(serde_json::Number::from(7)))
196 .with_payload("b", serde_json::Value::Number(serde_json::Number::from(3)));
197
198 let multiply_task_id = taskflow.submit_task(multiply_task).await?;
199 println!("Submitted multiplication task: {}", multiply_task_id);
200
201 let data_array = vec![
202 serde_json::Value::Number(serde_json::Number::from(1)),
203 serde_json::Value::Number(serde_json::Number::from(2)),
204 serde_json::Value::Number(serde_json::Number::from(3)),
205 serde_json::Value::Number(serde_json::Number::from(4)),
206 serde_json::Value::Number(serde_json::Number::from(5)),
207 ];
208
209 let sum_task = TaskDefinition::new("sum_data", "data_processing")
210 .with_payload("operation", serde_json::Value::String("sum".to_string()))
211 .with_payload("data", serde_json::Value::Array(data_array.clone()));
212
213 let sum_task_id = taskflow.submit_task(sum_task).await?;
214 println!("Submitted sum task: {}", sum_task_id);
215
216 let avg_task = TaskDefinition::new("average_data", "data_processing")
217 .with_payload(
218 "operation",
219 serde_json::Value::String("average".to_string()),
220 )
221 .with_payload("data", serde_json::Value::Array(data_array))
222 .with_dependencies(vec![sum_task_id.clone()]);
223
224 let avg_task_id = taskflow.submit_task(avg_task).await?;
225 println!("Submitted average task (depends on sum): {}", avg_task_id);
226
227 let taskflow_clone = std::sync::Arc::new(taskflow);
228 let taskflow_for_execution = taskflow_clone.clone();
229
230 let execution_handle = tokio::spawn(async move {
231 if let Err(e) = taskflow_for_execution.start().await {
232 eprintln!("TaskFlow execution failed: {}", e);
233 }
234 });
235
236 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
237
238 loop {
239 let metrics = taskflow_clone.get_task_metrics().await?;
240 println!(
241 "Task metrics: pending={}, running={}, completed={}, failed={}",
242 metrics.pending, metrics.running, metrics.completed, metrics.failed
243 );
244
245 if metrics.pending == 0 && metrics.running == 0 {
246 break;
247 }
248
249 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
250 }
251
252 println!("\nAll tasks completed! Results:");
253
254 let tasks = taskflow_clone.list_tasks(None).await?;
255 for task in tasks {
256 println!(
257 "\nTask: {} ({})",
258 task.definition.name, task.definition.task_type
259 );
260 println!(" Status: {:?}", task.status);
261 if let Some(result) = &task.result {
262 if result.success {
263 println!(
264 " Result: {}",
265 result.output.as_ref().unwrap_or(&"No output".to_string())
266 );
267 println!(" Execution time: {}ms", result.execution_time_ms);
268 if !result.metadata.is_empty() {
269 println!(" Metadata: {:?}", result.metadata);
270 }
271 } else {
272 println!(
273 " Error: {}",
274 result
275 .error
276 .as_ref()
277 .unwrap_or(&"Unknown error".to_string())
278 );
279 }
280 }
281 }
282
283 let final_metrics = taskflow_clone.get_task_metrics().await?;
284 println!("\nFinal metrics:");
285 println!(" Total tasks: {}", final_metrics.total);
286 println!(
287 " Success rate: {:.1}%",
288 final_metrics.success_rate() * 100.0
289 );
290
291 taskflow_clone.shutdown().await?;
292 execution_handle.abort();
293
294 Ok(())
295}Sourcepub async fn submit_task(&self, definition: TaskDefinition) -> Result<String>
pub async fn submit_task(&self, definition: TaskDefinition) -> Result<String>
Examples found in repository?
examples/basic_usage.rs (line 36)
6async fn main() -> Result<(), Box<dyn std::error::Error>> {
7 init();
8
9 let config = TaskFlowConfig::with_in_memory();
10 let taskflow = TaskFlow::new(config).await?;
11
12 println!("TaskFlow framework started!");
13
14 let task_id = taskflow
15 .submit_http_task("fetch_example", "https://httpbin.org/get", Some("GET"))
16 .await?;
17
18 println!("Submitted HTTP task: {}", task_id);
19
20 let shell_task_id = taskflow
21 .submit_shell_task("list_files", "ls", vec!["-la"])
22 .await?;
23
24 println!("Submitted shell task: {}", shell_task_id);
25
26 let dependent_task = TaskDefinition::new("dependent_task", "shell_command")
27 .with_payload("command", serde_json::Value::String("echo".to_string()))
28 .with_payload(
29 "args",
30 serde_json::Value::Array(vec![serde_json::Value::String(
31 "This task depends on the shell task".to_string(),
32 )]),
33 )
34 .with_dependencies(vec![shell_task_id.clone()]);
35
36 let dependent_task_id = taskflow.submit_task(dependent_task).await?;
37 println!("Submitted dependent task: {}", dependent_task_id);
38
39 let taskflow_clone = std::sync::Arc::new(taskflow);
40 let taskflow_for_execution = taskflow_clone.clone();
41
42 let execution_handle = tokio::spawn(async move {
43 if let Err(e) = taskflow_for_execution.start().await {
44 eprintln!("TaskFlow execution failed: {}", e);
45 }
46 });
47
48 tokio::time::sleep(Duration::from_secs(2)).await;
49
50 loop {
51 let metrics = taskflow_clone.get_task_metrics().await?;
52 println!(
53 "Task metrics: pending={}, running={}, completed={}, failed={}, total={}",
54 metrics.pending, metrics.running, metrics.completed, metrics.failed, metrics.total
55 );
56
57 if metrics.pending == 0 && metrics.running == 0 {
58 break;
59 }
60
61 tokio::time::sleep(Duration::from_secs(1)).await;
62 }
63
64 println!("All tasks completed!");
65
66 let tasks = taskflow_clone.list_tasks(None).await?;
67 for task in tasks {
68 println!("Task: {} - Status: {:?}", task.definition.name, task.status);
69 if let Some(result) = &task.result {
70 if result.success {
71 println!(" Output: {:?}", result.output);
72 } else {
73 println!(" Error: {:?}", result.error);
74 }
75 }
76 }
77
78 taskflow_clone.shutdown().await?;
79 execution_handle.abort();
80
81 Ok(())
82}More examples
examples/custom_handler.rs (line 187)
169async fn main() -> Result<(), Box<dyn std::error::Error>> {
170 init();
171
172 let config = TaskFlowConfig::with_in_memory();
173 let taskflow = TaskFlow::new(config).await?;
174
175 taskflow.register_handler(Arc::new(MathTaskHandler)).await;
176 taskflow
177 .register_handler(Arc::new(DataProcessingHandler))
178 .await;
179
180 println!("TaskFlow with custom handlers started!");
181
182 let add_task = TaskDefinition::new("addition", "math_operation")
183 .with_payload("operation", serde_json::Value::String("add".to_string()))
184 .with_payload("a", serde_json::Value::Number(serde_json::Number::from(10)))
185 .with_payload("b", serde_json::Value::Number(serde_json::Number::from(5)));
186
187 let add_task_id = taskflow.submit_task(add_task).await?;
188 println!("Submitted addition task: {}", add_task_id);
189
190 let multiply_task = TaskDefinition::new("multiplication", "math_operation")
191 .with_payload(
192 "operation",
193 serde_json::Value::String("multiply".to_string()),
194 )
195 .with_payload("a", serde_json::Value::Number(serde_json::Number::from(7)))
196 .with_payload("b", serde_json::Value::Number(serde_json::Number::from(3)));
197
198 let multiply_task_id = taskflow.submit_task(multiply_task).await?;
199 println!("Submitted multiplication task: {}", multiply_task_id);
200
201 let data_array = vec![
202 serde_json::Value::Number(serde_json::Number::from(1)),
203 serde_json::Value::Number(serde_json::Number::from(2)),
204 serde_json::Value::Number(serde_json::Number::from(3)),
205 serde_json::Value::Number(serde_json::Number::from(4)),
206 serde_json::Value::Number(serde_json::Number::from(5)),
207 ];
208
209 let sum_task = TaskDefinition::new("sum_data", "data_processing")
210 .with_payload("operation", serde_json::Value::String("sum".to_string()))
211 .with_payload("data", serde_json::Value::Array(data_array.clone()));
212
213 let sum_task_id = taskflow.submit_task(sum_task).await?;
214 println!("Submitted sum task: {}", sum_task_id);
215
216 let avg_task = TaskDefinition::new("average_data", "data_processing")
217 .with_payload(
218 "operation",
219 serde_json::Value::String("average".to_string()),
220 )
221 .with_payload("data", serde_json::Value::Array(data_array))
222 .with_dependencies(vec![sum_task_id.clone()]);
223
224 let avg_task_id = taskflow.submit_task(avg_task).await?;
225 println!("Submitted average task (depends on sum): {}", avg_task_id);
226
227 let taskflow_clone = std::sync::Arc::new(taskflow);
228 let taskflow_for_execution = taskflow_clone.clone();
229
230 let execution_handle = tokio::spawn(async move {
231 if let Err(e) = taskflow_for_execution.start().await {
232 eprintln!("TaskFlow execution failed: {}", e);
233 }
234 });
235
236 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
237
238 loop {
239 let metrics = taskflow_clone.get_task_metrics().await?;
240 println!(
241 "Task metrics: pending={}, running={}, completed={}, failed={}",
242 metrics.pending, metrics.running, metrics.completed, metrics.failed
243 );
244
245 if metrics.pending == 0 && metrics.running == 0 {
246 break;
247 }
248
249 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
250 }
251
252 println!("\nAll tasks completed! Results:");
253
254 let tasks = taskflow_clone.list_tasks(None).await?;
255 for task in tasks {
256 println!(
257 "\nTask: {} ({})",
258 task.definition.name, task.definition.task_type
259 );
260 println!(" Status: {:?}", task.status);
261 if let Some(result) = &task.result {
262 if result.success {
263 println!(
264 " Result: {}",
265 result.output.as_ref().unwrap_or(&"No output".to_string())
266 );
267 println!(" Execution time: {}ms", result.execution_time_ms);
268 if !result.metadata.is_empty() {
269 println!(" Metadata: {:?}", result.metadata);
270 }
271 } else {
272 println!(
273 " Error: {}",
274 result
275 .error
276 .as_ref()
277 .unwrap_or(&"Unknown error".to_string())
278 );
279 }
280 }
281 }
282
283 let final_metrics = taskflow_clone.get_task_metrics().await?;
284 println!("\nFinal metrics:");
285 println!(" Total tasks: {}", final_metrics.total);
286 println!(
287 " Success rate: {:.1}%",
288 final_metrics.success_rate() * 100.0
289 );
290
291 taskflow_clone.shutdown().await?;
292 execution_handle.abort();
293
294 Ok(())
295}Sourcepub async fn submit_http_task(
&self,
name: &str,
url: &str,
method: Option<&str>,
) -> Result<String>
pub async fn submit_http_task( &self, name: &str, url: &str, method: Option<&str>, ) -> Result<String>
Examples found in repository?
examples/yaml_config_usage.rs (line 35)
6async fn main() -> Result<(), Box<dyn std::error::Error>> {
7 init();
8
9 // Method 1: Load from YAML file
10 let taskflow = TaskFlow::from_yaml_file("examples/config.yaml").await?;
11 println!("TaskFlow framework started with YAML configuration!");
12
13 // Method 2: You can also load from YAML string directly
14 /*
15 let yaml_config = r#"
16scheduler:
17 poll_interval_seconds: 2
18 max_concurrent_tasks: 30
19 enable_dependency_resolution: true
20 cleanup_completed_tasks_after_hours: 24
21
22executor:
23 worker_id: "custom-worker-001"
24 max_concurrent_tasks: 15
25 task_timeout_seconds: 300
26 heartbeat_interval_seconds: 10
27
28storage_type: InMemory
29"#;
30 let taskflow = TaskFlow::from_yaml_str(yaml_config).await?;
31 "#;
32 */
33
34 let task_id = taskflow
35 .submit_http_task("fetch_example", "https://httpbin.org/get", Some("GET"))
36 .await?;
37
38 println!("Submitted HTTP task: {}", task_id);
39
40 let shell_task_id = taskflow
41 .submit_shell_task("list_files", "ls", vec!["-la"])
42 .await?;
43
44 println!("Submitted shell task: {}", shell_task_id);
45
46 let taskflow_clone = std::sync::Arc::new(taskflow);
47 let taskflow_for_execution = taskflow_clone.clone();
48
49 let execution_handle = tokio::spawn(async move {
50 if let Err(e) = taskflow_for_execution.start().await {
51 eprintln!("TaskFlow execution failed: {}", e);
52 }
53 });
54
55 tokio::time::sleep(Duration::from_secs(2)).await;
56
57 loop {
58 let metrics = taskflow_clone.get_task_metrics().await?;
59 println!(
60 "Task metrics: pending={}, running={}, completed={}, failed={}, total={}",
61 metrics.pending, metrics.running, metrics.completed, metrics.failed, metrics.total
62 );
63
64 if metrics.pending == 0 && metrics.running == 0 {
65 break;
66 }
67
68 tokio::time::sleep(Duration::from_secs(1)).await;
69 }
70
71 println!("All tasks completed!");
72
73 let tasks = taskflow_clone.list_tasks(None).await?;
74 for task in tasks {
75 println!("Task: {} - Status: {:?}", task.definition.name, task.status);
76 if let Some(result) = &task.result {
77 if result.success {
78 println!(" Output: {:?}", result.output);
79 } else {
80 println!(" Error: {:?}", result.error);
81 }
82 }
83 }
84
85 taskflow_clone.shutdown().await?;
86 execution_handle.abort();
87
88 Ok(())
89}More examples
examples/basic_usage.rs (line 15)
6async fn main() -> Result<(), Box<dyn std::error::Error>> {
7 init();
8
9 let config = TaskFlowConfig::with_in_memory();
10 let taskflow = TaskFlow::new(config).await?;
11
12 println!("TaskFlow framework started!");
13
14 let task_id = taskflow
15 .submit_http_task("fetch_example", "https://httpbin.org/get", Some("GET"))
16 .await?;
17
18 println!("Submitted HTTP task: {}", task_id);
19
20 let shell_task_id = taskflow
21 .submit_shell_task("list_files", "ls", vec!["-la"])
22 .await?;
23
24 println!("Submitted shell task: {}", shell_task_id);
25
26 let dependent_task = TaskDefinition::new("dependent_task", "shell_command")
27 .with_payload("command", serde_json::Value::String("echo".to_string()))
28 .with_payload(
29 "args",
30 serde_json::Value::Array(vec![serde_json::Value::String(
31 "This task depends on the shell task".to_string(),
32 )]),
33 )
34 .with_dependencies(vec![shell_task_id.clone()]);
35
36 let dependent_task_id = taskflow.submit_task(dependent_task).await?;
37 println!("Submitted dependent task: {}", dependent_task_id);
38
39 let taskflow_clone = std::sync::Arc::new(taskflow);
40 let taskflow_for_execution = taskflow_clone.clone();
41
42 let execution_handle = tokio::spawn(async move {
43 if let Err(e) = taskflow_for_execution.start().await {
44 eprintln!("TaskFlow execution failed: {}", e);
45 }
46 });
47
48 tokio::time::sleep(Duration::from_secs(2)).await;
49
50 loop {
51 let metrics = taskflow_clone.get_task_metrics().await?;
52 println!(
53 "Task metrics: pending={}, running={}, completed={}, failed={}, total={}",
54 metrics.pending, metrics.running, metrics.completed, metrics.failed, metrics.total
55 );
56
57 if metrics.pending == 0 && metrics.running == 0 {
58 break;
59 }
60
61 tokio::time::sleep(Duration::from_secs(1)).await;
62 }
63
64 println!("All tasks completed!");
65
66 let tasks = taskflow_clone.list_tasks(None).await?;
67 for task in tasks {
68 println!("Task: {} - Status: {:?}", task.definition.name, task.status);
69 if let Some(result) = &task.result {
70 if result.success {
71 println!(" Output: {:?}", result.output);
72 } else {
73 println!(" Error: {:?}", result.error);
74 }
75 }
76 }
77
78 taskflow_clone.shutdown().await?;
79 execution_handle.abort();
80
81 Ok(())
82}Sourcepub async fn submit_shell_task(
&self,
name: &str,
command: &str,
args: Vec<&str>,
) -> Result<String>
pub async fn submit_shell_task( &self, name: &str, command: &str, args: Vec<&str>, ) -> Result<String>
Examples found in repository?
examples/yaml_config_usage.rs (line 41)
6async fn main() -> Result<(), Box<dyn std::error::Error>> {
7 init();
8
9 // Method 1: Load from YAML file
10 let taskflow = TaskFlow::from_yaml_file("examples/config.yaml").await?;
11 println!("TaskFlow framework started with YAML configuration!");
12
13 // Method 2: You can also load from YAML string directly
14 /*
15 let yaml_config = r#"
16scheduler:
17 poll_interval_seconds: 2
18 max_concurrent_tasks: 30
19 enable_dependency_resolution: true
20 cleanup_completed_tasks_after_hours: 24
21
22executor:
23 worker_id: "custom-worker-001"
24 max_concurrent_tasks: 15
25 task_timeout_seconds: 300
26 heartbeat_interval_seconds: 10
27
28storage_type: InMemory
29"#;
30 let taskflow = TaskFlow::from_yaml_str(yaml_config).await?;
31 "#;
32 */
33
34 let task_id = taskflow
35 .submit_http_task("fetch_example", "https://httpbin.org/get", Some("GET"))
36 .await?;
37
38 println!("Submitted HTTP task: {}", task_id);
39
40 let shell_task_id = taskflow
41 .submit_shell_task("list_files", "ls", vec!["-la"])
42 .await?;
43
44 println!("Submitted shell task: {}", shell_task_id);
45
46 let taskflow_clone = std::sync::Arc::new(taskflow);
47 let taskflow_for_execution = taskflow_clone.clone();
48
49 let execution_handle = tokio::spawn(async move {
50 if let Err(e) = taskflow_for_execution.start().await {
51 eprintln!("TaskFlow execution failed: {}", e);
52 }
53 });
54
55 tokio::time::sleep(Duration::from_secs(2)).await;
56
57 loop {
58 let metrics = taskflow_clone.get_task_metrics().await?;
59 println!(
60 "Task metrics: pending={}, running={}, completed={}, failed={}, total={}",
61 metrics.pending, metrics.running, metrics.completed, metrics.failed, metrics.total
62 );
63
64 if metrics.pending == 0 && metrics.running == 0 {
65 break;
66 }
67
68 tokio::time::sleep(Duration::from_secs(1)).await;
69 }
70
71 println!("All tasks completed!");
72
73 let tasks = taskflow_clone.list_tasks(None).await?;
74 for task in tasks {
75 println!("Task: {} - Status: {:?}", task.definition.name, task.status);
76 if let Some(result) = &task.result {
77 if result.success {
78 println!(" Output: {:?}", result.output);
79 } else {
80 println!(" Error: {:?}", result.error);
81 }
82 }
83 }
84
85 taskflow_clone.shutdown().await?;
86 execution_handle.abort();
87
88 Ok(())
89}More examples
examples/basic_usage.rs (line 21)
6async fn main() -> Result<(), Box<dyn std::error::Error>> {
7 init();
8
9 let config = TaskFlowConfig::with_in_memory();
10 let taskflow = TaskFlow::new(config).await?;
11
12 println!("TaskFlow framework started!");
13
14 let task_id = taskflow
15 .submit_http_task("fetch_example", "https://httpbin.org/get", Some("GET"))
16 .await?;
17
18 println!("Submitted HTTP task: {}", task_id);
19
20 let shell_task_id = taskflow
21 .submit_shell_task("list_files", "ls", vec!["-la"])
22 .await?;
23
24 println!("Submitted shell task: {}", shell_task_id);
25
26 let dependent_task = TaskDefinition::new("dependent_task", "shell_command")
27 .with_payload("command", serde_json::Value::String("echo".to_string()))
28 .with_payload(
29 "args",
30 serde_json::Value::Array(vec![serde_json::Value::String(
31 "This task depends on the shell task".to_string(),
32 )]),
33 )
34 .with_dependencies(vec![shell_task_id.clone()]);
35
36 let dependent_task_id = taskflow.submit_task(dependent_task).await?;
37 println!("Submitted dependent task: {}", dependent_task_id);
38
39 let taskflow_clone = std::sync::Arc::new(taskflow);
40 let taskflow_for_execution = taskflow_clone.clone();
41
42 let execution_handle = tokio::spawn(async move {
43 if let Err(e) = taskflow_for_execution.start().await {
44 eprintln!("TaskFlow execution failed: {}", e);
45 }
46 });
47
48 tokio::time::sleep(Duration::from_secs(2)).await;
49
50 loop {
51 let metrics = taskflow_clone.get_task_metrics().await?;
52 println!(
53 "Task metrics: pending={}, running={}, completed={}, failed={}, total={}",
54 metrics.pending, metrics.running, metrics.completed, metrics.failed, metrics.total
55 );
56
57 if metrics.pending == 0 && metrics.running == 0 {
58 break;
59 }
60
61 tokio::time::sleep(Duration::from_secs(1)).await;
62 }
63
64 println!("All tasks completed!");
65
66 let tasks = taskflow_clone.list_tasks(None).await?;
67 for task in tasks {
68 println!("Task: {} - Status: {:?}", task.definition.name, task.status);
69 if let Some(result) = &task.result {
70 if result.success {
71 println!(" Output: {:?}", result.output);
72 } else {
73 println!(" Error: {:?}", result.error);
74 }
75 }
76 }
77
78 taskflow_clone.shutdown().await?;
79 execution_handle.abort();
80
81 Ok(())
82}pub async fn get_task_status(&self, task_id: &str) -> Result<Option<TaskStatus>>
pub async fn cancel_task(&self, task_id: &str) -> Result<()>
Sourcepub async fn list_tasks(&self, status: Option<TaskStatus>) -> Result<Vec<Task>>
pub async fn list_tasks(&self, status: Option<TaskStatus>) -> Result<Vec<Task>>
Examples found in repository?
examples/yaml_config_usage.rs (line 73)
6async fn main() -> Result<(), Box<dyn std::error::Error>> {
7 init();
8
9 // Method 1: Load from YAML file
10 let taskflow = TaskFlow::from_yaml_file("examples/config.yaml").await?;
11 println!("TaskFlow framework started with YAML configuration!");
12
13 // Method 2: You can also load from YAML string directly
14 /*
15 let yaml_config = r#"
16scheduler:
17 poll_interval_seconds: 2
18 max_concurrent_tasks: 30
19 enable_dependency_resolution: true
20 cleanup_completed_tasks_after_hours: 24
21
22executor:
23 worker_id: "custom-worker-001"
24 max_concurrent_tasks: 15
25 task_timeout_seconds: 300
26 heartbeat_interval_seconds: 10
27
28storage_type: InMemory
29"#;
30 let taskflow = TaskFlow::from_yaml_str(yaml_config).await?;
31 "#;
32 */
33
34 let task_id = taskflow
35 .submit_http_task("fetch_example", "https://httpbin.org/get", Some("GET"))
36 .await?;
37
38 println!("Submitted HTTP task: {}", task_id);
39
40 let shell_task_id = taskflow
41 .submit_shell_task("list_files", "ls", vec!["-la"])
42 .await?;
43
44 println!("Submitted shell task: {}", shell_task_id);
45
46 let taskflow_clone = std::sync::Arc::new(taskflow);
47 let taskflow_for_execution = taskflow_clone.clone();
48
49 let execution_handle = tokio::spawn(async move {
50 if let Err(e) = taskflow_for_execution.start().await {
51 eprintln!("TaskFlow execution failed: {}", e);
52 }
53 });
54
55 tokio::time::sleep(Duration::from_secs(2)).await;
56
57 loop {
58 let metrics = taskflow_clone.get_task_metrics().await?;
59 println!(
60 "Task metrics: pending={}, running={}, completed={}, failed={}, total={}",
61 metrics.pending, metrics.running, metrics.completed, metrics.failed, metrics.total
62 );
63
64 if metrics.pending == 0 && metrics.running == 0 {
65 break;
66 }
67
68 tokio::time::sleep(Duration::from_secs(1)).await;
69 }
70
71 println!("All tasks completed!");
72
73 let tasks = taskflow_clone.list_tasks(None).await?;
74 for task in tasks {
75 println!("Task: {} - Status: {:?}", task.definition.name, task.status);
76 if let Some(result) = &task.result {
77 if result.success {
78 println!(" Output: {:?}", result.output);
79 } else {
80 println!(" Error: {:?}", result.error);
81 }
82 }
83 }
84
85 taskflow_clone.shutdown().await?;
86 execution_handle.abort();
87
88 Ok(())
89}More examples
examples/basic_usage.rs (line 66)
6async fn main() -> Result<(), Box<dyn std::error::Error>> {
7 init();
8
9 let config = TaskFlowConfig::with_in_memory();
10 let taskflow = TaskFlow::new(config).await?;
11
12 println!("TaskFlow framework started!");
13
14 let task_id = taskflow
15 .submit_http_task("fetch_example", "https://httpbin.org/get", Some("GET"))
16 .await?;
17
18 println!("Submitted HTTP task: {}", task_id);
19
20 let shell_task_id = taskflow
21 .submit_shell_task("list_files", "ls", vec!["-la"])
22 .await?;
23
24 println!("Submitted shell task: {}", shell_task_id);
25
26 let dependent_task = TaskDefinition::new("dependent_task", "shell_command")
27 .with_payload("command", serde_json::Value::String("echo".to_string()))
28 .with_payload(
29 "args",
30 serde_json::Value::Array(vec![serde_json::Value::String(
31 "This task depends on the shell task".to_string(),
32 )]),
33 )
34 .with_dependencies(vec![shell_task_id.clone()]);
35
36 let dependent_task_id = taskflow.submit_task(dependent_task).await?;
37 println!("Submitted dependent task: {}", dependent_task_id);
38
39 let taskflow_clone = std::sync::Arc::new(taskflow);
40 let taskflow_for_execution = taskflow_clone.clone();
41
42 let execution_handle = tokio::spawn(async move {
43 if let Err(e) = taskflow_for_execution.start().await {
44 eprintln!("TaskFlow execution failed: {}", e);
45 }
46 });
47
48 tokio::time::sleep(Duration::from_secs(2)).await;
49
50 loop {
51 let metrics = taskflow_clone.get_task_metrics().await?;
52 println!(
53 "Task metrics: pending={}, running={}, completed={}, failed={}, total={}",
54 metrics.pending, metrics.running, metrics.completed, metrics.failed, metrics.total
55 );
56
57 if metrics.pending == 0 && metrics.running == 0 {
58 break;
59 }
60
61 tokio::time::sleep(Duration::from_secs(1)).await;
62 }
63
64 println!("All tasks completed!");
65
66 let tasks = taskflow_clone.list_tasks(None).await?;
67 for task in tasks {
68 println!("Task: {} - Status: {:?}", task.definition.name, task.status);
69 if let Some(result) = &task.result {
70 if result.success {
71 println!(" Output: {:?}", result.output);
72 } else {
73 println!(" Error: {:?}", result.error);
74 }
75 }
76 }
77
78 taskflow_clone.shutdown().await?;
79 execution_handle.abort();
80
81 Ok(())
82}examples/custom_handler.rs (line 254)
169async fn main() -> Result<(), Box<dyn std::error::Error>> {
170 init();
171
172 let config = TaskFlowConfig::with_in_memory();
173 let taskflow = TaskFlow::new(config).await?;
174
175 taskflow.register_handler(Arc::new(MathTaskHandler)).await;
176 taskflow
177 .register_handler(Arc::new(DataProcessingHandler))
178 .await;
179
180 println!("TaskFlow with custom handlers started!");
181
182 let add_task = TaskDefinition::new("addition", "math_operation")
183 .with_payload("operation", serde_json::Value::String("add".to_string()))
184 .with_payload("a", serde_json::Value::Number(serde_json::Number::from(10)))
185 .with_payload("b", serde_json::Value::Number(serde_json::Number::from(5)));
186
187 let add_task_id = taskflow.submit_task(add_task).await?;
188 println!("Submitted addition task: {}", add_task_id);
189
190 let multiply_task = TaskDefinition::new("multiplication", "math_operation")
191 .with_payload(
192 "operation",
193 serde_json::Value::String("multiply".to_string()),
194 )
195 .with_payload("a", serde_json::Value::Number(serde_json::Number::from(7)))
196 .with_payload("b", serde_json::Value::Number(serde_json::Number::from(3)));
197
198 let multiply_task_id = taskflow.submit_task(multiply_task).await?;
199 println!("Submitted multiplication task: {}", multiply_task_id);
200
201 let data_array = vec![
202 serde_json::Value::Number(serde_json::Number::from(1)),
203 serde_json::Value::Number(serde_json::Number::from(2)),
204 serde_json::Value::Number(serde_json::Number::from(3)),
205 serde_json::Value::Number(serde_json::Number::from(4)),
206 serde_json::Value::Number(serde_json::Number::from(5)),
207 ];
208
209 let sum_task = TaskDefinition::new("sum_data", "data_processing")
210 .with_payload("operation", serde_json::Value::String("sum".to_string()))
211 .with_payload("data", serde_json::Value::Array(data_array.clone()));
212
213 let sum_task_id = taskflow.submit_task(sum_task).await?;
214 println!("Submitted sum task: {}", sum_task_id);
215
216 let avg_task = TaskDefinition::new("average_data", "data_processing")
217 .with_payload(
218 "operation",
219 serde_json::Value::String("average".to_string()),
220 )
221 .with_payload("data", serde_json::Value::Array(data_array))
222 .with_dependencies(vec![sum_task_id.clone()]);
223
224 let avg_task_id = taskflow.submit_task(avg_task).await?;
225 println!("Submitted average task (depends on sum): {}", avg_task_id);
226
227 let taskflow_clone = std::sync::Arc::new(taskflow);
228 let taskflow_for_execution = taskflow_clone.clone();
229
230 let execution_handle = tokio::spawn(async move {
231 if let Err(e) = taskflow_for_execution.start().await {
232 eprintln!("TaskFlow execution failed: {}", e);
233 }
234 });
235
236 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
237
238 loop {
239 let metrics = taskflow_clone.get_task_metrics().await?;
240 println!(
241 "Task metrics: pending={}, running={}, completed={}, failed={}",
242 metrics.pending, metrics.running, metrics.completed, metrics.failed
243 );
244
245 if metrics.pending == 0 && metrics.running == 0 {
246 break;
247 }
248
249 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
250 }
251
252 println!("\nAll tasks completed! Results:");
253
254 let tasks = taskflow_clone.list_tasks(None).await?;
255 for task in tasks {
256 println!(
257 "\nTask: {} ({})",
258 task.definition.name, task.definition.task_type
259 );
260 println!(" Status: {:?}", task.status);
261 if let Some(result) = &task.result {
262 if result.success {
263 println!(
264 " Result: {}",
265 result.output.as_ref().unwrap_or(&"No output".to_string())
266 );
267 println!(" Execution time: {}ms", result.execution_time_ms);
268 if !result.metadata.is_empty() {
269 println!(" Metadata: {:?}", result.metadata);
270 }
271 } else {
272 println!(
273 " Error: {}",
274 result
275 .error
276 .as_ref()
277 .unwrap_or(&"Unknown error".to_string())
278 );
279 }
280 }
281 }
282
283 let final_metrics = taskflow_clone.get_task_metrics().await?;
284 println!("\nFinal metrics:");
285 println!(" Total tasks: {}", final_metrics.total);
286 println!(
287 " Success rate: {:.1}%",
288 final_metrics.success_rate() * 100.0
289 );
290
291 taskflow_clone.shutdown().await?;
292 execution_handle.abort();
293
294 Ok(())
295}Sourcepub async fn start(&self) -> Result<()>
pub async fn start(&self) -> Result<()>
Examples found in repository?
examples/yaml_config_usage.rs (line 50)
6async fn main() -> Result<(), Box<dyn std::error::Error>> {
7 init();
8
9 // Method 1: Load from YAML file
10 let taskflow = TaskFlow::from_yaml_file("examples/config.yaml").await?;
11 println!("TaskFlow framework started with YAML configuration!");
12
13 // Method 2: You can also load from YAML string directly
14 /*
15 let yaml_config = r#"
16scheduler:
17 poll_interval_seconds: 2
18 max_concurrent_tasks: 30
19 enable_dependency_resolution: true
20 cleanup_completed_tasks_after_hours: 24
21
22executor:
23 worker_id: "custom-worker-001"
24 max_concurrent_tasks: 15
25 task_timeout_seconds: 300
26 heartbeat_interval_seconds: 10
27
28storage_type: InMemory
29"#;
30 let taskflow = TaskFlow::from_yaml_str(yaml_config).await?;
31 "#;
32 */
33
34 let task_id = taskflow
35 .submit_http_task("fetch_example", "https://httpbin.org/get", Some("GET"))
36 .await?;
37
38 println!("Submitted HTTP task: {}", task_id);
39
40 let shell_task_id = taskflow
41 .submit_shell_task("list_files", "ls", vec!["-la"])
42 .await?;
43
44 println!("Submitted shell task: {}", shell_task_id);
45
46 let taskflow_clone = std::sync::Arc::new(taskflow);
47 let taskflow_for_execution = taskflow_clone.clone();
48
49 let execution_handle = tokio::spawn(async move {
50 if let Err(e) = taskflow_for_execution.start().await {
51 eprintln!("TaskFlow execution failed: {}", e);
52 }
53 });
54
55 tokio::time::sleep(Duration::from_secs(2)).await;
56
57 loop {
58 let metrics = taskflow_clone.get_task_metrics().await?;
59 println!(
60 "Task metrics: pending={}, running={}, completed={}, failed={}, total={}",
61 metrics.pending, metrics.running, metrics.completed, metrics.failed, metrics.total
62 );
63
64 if metrics.pending == 0 && metrics.running == 0 {
65 break;
66 }
67
68 tokio::time::sleep(Duration::from_secs(1)).await;
69 }
70
71 println!("All tasks completed!");
72
73 let tasks = taskflow_clone.list_tasks(None).await?;
74 for task in tasks {
75 println!("Task: {} - Status: {:?}", task.definition.name, task.status);
76 if let Some(result) = &task.result {
77 if result.success {
78 println!(" Output: {:?}", result.output);
79 } else {
80 println!(" Error: {:?}", result.error);
81 }
82 }
83 }
84
85 taskflow_clone.shutdown().await?;
86 execution_handle.abort();
87
88 Ok(())
89}More examples
examples/basic_usage.rs (line 43)
6async fn main() -> Result<(), Box<dyn std::error::Error>> {
7 init();
8
9 let config = TaskFlowConfig::with_in_memory();
10 let taskflow = TaskFlow::new(config).await?;
11
12 println!("TaskFlow framework started!");
13
14 let task_id = taskflow
15 .submit_http_task("fetch_example", "https://httpbin.org/get", Some("GET"))
16 .await?;
17
18 println!("Submitted HTTP task: {}", task_id);
19
20 let shell_task_id = taskflow
21 .submit_shell_task("list_files", "ls", vec!["-la"])
22 .await?;
23
24 println!("Submitted shell task: {}", shell_task_id);
25
26 let dependent_task = TaskDefinition::new("dependent_task", "shell_command")
27 .with_payload("command", serde_json::Value::String("echo".to_string()))
28 .with_payload(
29 "args",
30 serde_json::Value::Array(vec![serde_json::Value::String(
31 "This task depends on the shell task".to_string(),
32 )]),
33 )
34 .with_dependencies(vec![shell_task_id.clone()]);
35
36 let dependent_task_id = taskflow.submit_task(dependent_task).await?;
37 println!("Submitted dependent task: {}", dependent_task_id);
38
39 let taskflow_clone = std::sync::Arc::new(taskflow);
40 let taskflow_for_execution = taskflow_clone.clone();
41
42 let execution_handle = tokio::spawn(async move {
43 if let Err(e) = taskflow_for_execution.start().await {
44 eprintln!("TaskFlow execution failed: {}", e);
45 }
46 });
47
48 tokio::time::sleep(Duration::from_secs(2)).await;
49
50 loop {
51 let metrics = taskflow_clone.get_task_metrics().await?;
52 println!(
53 "Task metrics: pending={}, running={}, completed={}, failed={}, total={}",
54 metrics.pending, metrics.running, metrics.completed, metrics.failed, metrics.total
55 );
56
57 if metrics.pending == 0 && metrics.running == 0 {
58 break;
59 }
60
61 tokio::time::sleep(Duration::from_secs(1)).await;
62 }
63
64 println!("All tasks completed!");
65
66 let tasks = taskflow_clone.list_tasks(None).await?;
67 for task in tasks {
68 println!("Task: {} - Status: {:?}", task.definition.name, task.status);
69 if let Some(result) = &task.result {
70 if result.success {
71 println!(" Output: {:?}", result.output);
72 } else {
73 println!(" Error: {:?}", result.error);
74 }
75 }
76 }
77
78 taskflow_clone.shutdown().await?;
79 execution_handle.abort();
80
81 Ok(())
82}examples/custom_handler.rs (line 231)
169async fn main() -> Result<(), Box<dyn std::error::Error>> {
170 init();
171
172 let config = TaskFlowConfig::with_in_memory();
173 let taskflow = TaskFlow::new(config).await?;
174
175 taskflow.register_handler(Arc::new(MathTaskHandler)).await;
176 taskflow
177 .register_handler(Arc::new(DataProcessingHandler))
178 .await;
179
180 println!("TaskFlow with custom handlers started!");
181
182 let add_task = TaskDefinition::new("addition", "math_operation")
183 .with_payload("operation", serde_json::Value::String("add".to_string()))
184 .with_payload("a", serde_json::Value::Number(serde_json::Number::from(10)))
185 .with_payload("b", serde_json::Value::Number(serde_json::Number::from(5)));
186
187 let add_task_id = taskflow.submit_task(add_task).await?;
188 println!("Submitted addition task: {}", add_task_id);
189
190 let multiply_task = TaskDefinition::new("multiplication", "math_operation")
191 .with_payload(
192 "operation",
193 serde_json::Value::String("multiply".to_string()),
194 )
195 .with_payload("a", serde_json::Value::Number(serde_json::Number::from(7)))
196 .with_payload("b", serde_json::Value::Number(serde_json::Number::from(3)));
197
198 let multiply_task_id = taskflow.submit_task(multiply_task).await?;
199 println!("Submitted multiplication task: {}", multiply_task_id);
200
201 let data_array = vec![
202 serde_json::Value::Number(serde_json::Number::from(1)),
203 serde_json::Value::Number(serde_json::Number::from(2)),
204 serde_json::Value::Number(serde_json::Number::from(3)),
205 serde_json::Value::Number(serde_json::Number::from(4)),
206 serde_json::Value::Number(serde_json::Number::from(5)),
207 ];
208
209 let sum_task = TaskDefinition::new("sum_data", "data_processing")
210 .with_payload("operation", serde_json::Value::String("sum".to_string()))
211 .with_payload("data", serde_json::Value::Array(data_array.clone()));
212
213 let sum_task_id = taskflow.submit_task(sum_task).await?;
214 println!("Submitted sum task: {}", sum_task_id);
215
216 let avg_task = TaskDefinition::new("average_data", "data_processing")
217 .with_payload(
218 "operation",
219 serde_json::Value::String("average".to_string()),
220 )
221 .with_payload("data", serde_json::Value::Array(data_array))
222 .with_dependencies(vec![sum_task_id.clone()]);
223
224 let avg_task_id = taskflow.submit_task(avg_task).await?;
225 println!("Submitted average task (depends on sum): {}", avg_task_id);
226
227 let taskflow_clone = std::sync::Arc::new(taskflow);
228 let taskflow_for_execution = taskflow_clone.clone();
229
230 let execution_handle = tokio::spawn(async move {
231 if let Err(e) = taskflow_for_execution.start().await {
232 eprintln!("TaskFlow execution failed: {}", e);
233 }
234 });
235
236 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
237
238 loop {
239 let metrics = taskflow_clone.get_task_metrics().await?;
240 println!(
241 "Task metrics: pending={}, running={}, completed={}, failed={}",
242 metrics.pending, metrics.running, metrics.completed, metrics.failed
243 );
244
245 if metrics.pending == 0 && metrics.running == 0 {
246 break;
247 }
248
249 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
250 }
251
252 println!("\nAll tasks completed! Results:");
253
254 let tasks = taskflow_clone.list_tasks(None).await?;
255 for task in tasks {
256 println!(
257 "\nTask: {} ({})",
258 task.definition.name, task.definition.task_type
259 );
260 println!(" Status: {:?}", task.status);
261 if let Some(result) = &task.result {
262 if result.success {
263 println!(
264 " Result: {}",
265 result.output.as_ref().unwrap_or(&"No output".to_string())
266 );
267 println!(" Execution time: {}ms", result.execution_time_ms);
268 if !result.metadata.is_empty() {
269 println!(" Metadata: {:?}", result.metadata);
270 }
271 } else {
272 println!(
273 " Error: {}",
274 result
275 .error
276 .as_ref()
277 .unwrap_or(&"Unknown error".to_string())
278 );
279 }
280 }
281 }
282
283 let final_metrics = taskflow_clone.get_task_metrics().await?;
284 println!("\nFinal metrics:");
285 println!(" Total tasks: {}", final_metrics.total);
286 println!(
287 " Success rate: {:.1}%",
288 final_metrics.success_rate() * 100.0
289 );
290
291 taskflow_clone.shutdown().await?;
292 execution_handle.abort();
293
294 Ok(())
295}Sourcepub async fn shutdown(&self) -> Result<()>
pub async fn shutdown(&self) -> Result<()>
Examples found in repository?
examples/yaml_config_usage.rs (line 85)
6async fn main() -> Result<(), Box<dyn std::error::Error>> {
7 init();
8
9 // Method 1: Load from YAML file
10 let taskflow = TaskFlow::from_yaml_file("examples/config.yaml").await?;
11 println!("TaskFlow framework started with YAML configuration!");
12
13 // Method 2: You can also load from YAML string directly
14 /*
15 let yaml_config = r#"
16scheduler:
17 poll_interval_seconds: 2
18 max_concurrent_tasks: 30
19 enable_dependency_resolution: true
20 cleanup_completed_tasks_after_hours: 24
21
22executor:
23 worker_id: "custom-worker-001"
24 max_concurrent_tasks: 15
25 task_timeout_seconds: 300
26 heartbeat_interval_seconds: 10
27
28storage_type: InMemory
29"#;
30 let taskflow = TaskFlow::from_yaml_str(yaml_config).await?;
31 "#;
32 */
33
34 let task_id = taskflow
35 .submit_http_task("fetch_example", "https://httpbin.org/get", Some("GET"))
36 .await?;
37
38 println!("Submitted HTTP task: {}", task_id);
39
40 let shell_task_id = taskflow
41 .submit_shell_task("list_files", "ls", vec!["-la"])
42 .await?;
43
44 println!("Submitted shell task: {}", shell_task_id);
45
46 let taskflow_clone = std::sync::Arc::new(taskflow);
47 let taskflow_for_execution = taskflow_clone.clone();
48
49 let execution_handle = tokio::spawn(async move {
50 if let Err(e) = taskflow_for_execution.start().await {
51 eprintln!("TaskFlow execution failed: {}", e);
52 }
53 });
54
55 tokio::time::sleep(Duration::from_secs(2)).await;
56
57 loop {
58 let metrics = taskflow_clone.get_task_metrics().await?;
59 println!(
60 "Task metrics: pending={}, running={}, completed={}, failed={}, total={}",
61 metrics.pending, metrics.running, metrics.completed, metrics.failed, metrics.total
62 );
63
64 if metrics.pending == 0 && metrics.running == 0 {
65 break;
66 }
67
68 tokio::time::sleep(Duration::from_secs(1)).await;
69 }
70
71 println!("All tasks completed!");
72
73 let tasks = taskflow_clone.list_tasks(None).await?;
74 for task in tasks {
75 println!("Task: {} - Status: {:?}", task.definition.name, task.status);
76 if let Some(result) = &task.result {
77 if result.success {
78 println!(" Output: {:?}", result.output);
79 } else {
80 println!(" Error: {:?}", result.error);
81 }
82 }
83 }
84
85 taskflow_clone.shutdown().await?;
86 execution_handle.abort();
87
88 Ok(())
89}More examples
examples/basic_usage.rs (line 78)
6async fn main() -> Result<(), Box<dyn std::error::Error>> {
7 init();
8
9 let config = TaskFlowConfig::with_in_memory();
10 let taskflow = TaskFlow::new(config).await?;
11
12 println!("TaskFlow framework started!");
13
14 let task_id = taskflow
15 .submit_http_task("fetch_example", "https://httpbin.org/get", Some("GET"))
16 .await?;
17
18 println!("Submitted HTTP task: {}", task_id);
19
20 let shell_task_id = taskflow
21 .submit_shell_task("list_files", "ls", vec!["-la"])
22 .await?;
23
24 println!("Submitted shell task: {}", shell_task_id);
25
26 let dependent_task = TaskDefinition::new("dependent_task", "shell_command")
27 .with_payload("command", serde_json::Value::String("echo".to_string()))
28 .with_payload(
29 "args",
30 serde_json::Value::Array(vec![serde_json::Value::String(
31 "This task depends on the shell task".to_string(),
32 )]),
33 )
34 .with_dependencies(vec![shell_task_id.clone()]);
35
36 let dependent_task_id = taskflow.submit_task(dependent_task).await?;
37 println!("Submitted dependent task: {}", dependent_task_id);
38
39 let taskflow_clone = std::sync::Arc::new(taskflow);
40 let taskflow_for_execution = taskflow_clone.clone();
41
42 let execution_handle = tokio::spawn(async move {
43 if let Err(e) = taskflow_for_execution.start().await {
44 eprintln!("TaskFlow execution failed: {}", e);
45 }
46 });
47
48 tokio::time::sleep(Duration::from_secs(2)).await;
49
50 loop {
51 let metrics = taskflow_clone.get_task_metrics().await?;
52 println!(
53 "Task metrics: pending={}, running={}, completed={}, failed={}, total={}",
54 metrics.pending, metrics.running, metrics.completed, metrics.failed, metrics.total
55 );
56
57 if metrics.pending == 0 && metrics.running == 0 {
58 break;
59 }
60
61 tokio::time::sleep(Duration::from_secs(1)).await;
62 }
63
64 println!("All tasks completed!");
65
66 let tasks = taskflow_clone.list_tasks(None).await?;
67 for task in tasks {
68 println!("Task: {} - Status: {:?}", task.definition.name, task.status);
69 if let Some(result) = &task.result {
70 if result.success {
71 println!(" Output: {:?}", result.output);
72 } else {
73 println!(" Error: {:?}", result.error);
74 }
75 }
76 }
77
78 taskflow_clone.shutdown().await?;
79 execution_handle.abort();
80
81 Ok(())
82}examples/custom_handler.rs (line 291)
169async fn main() -> Result<(), Box<dyn std::error::Error>> {
170 init();
171
172 let config = TaskFlowConfig::with_in_memory();
173 let taskflow = TaskFlow::new(config).await?;
174
175 taskflow.register_handler(Arc::new(MathTaskHandler)).await;
176 taskflow
177 .register_handler(Arc::new(DataProcessingHandler))
178 .await;
179
180 println!("TaskFlow with custom handlers started!");
181
182 let add_task = TaskDefinition::new("addition", "math_operation")
183 .with_payload("operation", serde_json::Value::String("add".to_string()))
184 .with_payload("a", serde_json::Value::Number(serde_json::Number::from(10)))
185 .with_payload("b", serde_json::Value::Number(serde_json::Number::from(5)));
186
187 let add_task_id = taskflow.submit_task(add_task).await?;
188 println!("Submitted addition task: {}", add_task_id);
189
190 let multiply_task = TaskDefinition::new("multiplication", "math_operation")
191 .with_payload(
192 "operation",
193 serde_json::Value::String("multiply".to_string()),
194 )
195 .with_payload("a", serde_json::Value::Number(serde_json::Number::from(7)))
196 .with_payload("b", serde_json::Value::Number(serde_json::Number::from(3)));
197
198 let multiply_task_id = taskflow.submit_task(multiply_task).await?;
199 println!("Submitted multiplication task: {}", multiply_task_id);
200
201 let data_array = vec![
202 serde_json::Value::Number(serde_json::Number::from(1)),
203 serde_json::Value::Number(serde_json::Number::from(2)),
204 serde_json::Value::Number(serde_json::Number::from(3)),
205 serde_json::Value::Number(serde_json::Number::from(4)),
206 serde_json::Value::Number(serde_json::Number::from(5)),
207 ];
208
209 let sum_task = TaskDefinition::new("sum_data", "data_processing")
210 .with_payload("operation", serde_json::Value::String("sum".to_string()))
211 .with_payload("data", serde_json::Value::Array(data_array.clone()));
212
213 let sum_task_id = taskflow.submit_task(sum_task).await?;
214 println!("Submitted sum task: {}", sum_task_id);
215
216 let avg_task = TaskDefinition::new("average_data", "data_processing")
217 .with_payload(
218 "operation",
219 serde_json::Value::String("average".to_string()),
220 )
221 .with_payload("data", serde_json::Value::Array(data_array))
222 .with_dependencies(vec![sum_task_id.clone()]);
223
224 let avg_task_id = taskflow.submit_task(avg_task).await?;
225 println!("Submitted average task (depends on sum): {}", avg_task_id);
226
227 let taskflow_clone = std::sync::Arc::new(taskflow);
228 let taskflow_for_execution = taskflow_clone.clone();
229
230 let execution_handle = tokio::spawn(async move {
231 if let Err(e) = taskflow_for_execution.start().await {
232 eprintln!("TaskFlow execution failed: {}", e);
233 }
234 });
235
236 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
237
238 loop {
239 let metrics = taskflow_clone.get_task_metrics().await?;
240 println!(
241 "Task metrics: pending={}, running={}, completed={}, failed={}",
242 metrics.pending, metrics.running, metrics.completed, metrics.failed
243 );
244
245 if metrics.pending == 0 && metrics.running == 0 {
246 break;
247 }
248
249 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
250 }
251
252 println!("\nAll tasks completed! Results:");
253
254 let tasks = taskflow_clone.list_tasks(None).await?;
255 for task in tasks {
256 println!(
257 "\nTask: {} ({})",
258 task.definition.name, task.definition.task_type
259 );
260 println!(" Status: {:?}", task.status);
261 if let Some(result) = &task.result {
262 if result.success {
263 println!(
264 " Result: {}",
265 result.output.as_ref().unwrap_or(&"No output".to_string())
266 );
267 println!(" Execution time: {}ms", result.execution_time_ms);
268 if !result.metadata.is_empty() {
269 println!(" Metadata: {:?}", result.metadata);
270 }
271 } else {
272 println!(
273 " Error: {}",
274 result
275 .error
276 .as_ref()
277 .unwrap_or(&"Unknown error".to_string())
278 );
279 }
280 }
281 }
282
283 let final_metrics = taskflow_clone.get_task_metrics().await?;
284 println!("\nFinal metrics:");
285 println!(" Total tasks: {}", final_metrics.total);
286 println!(
287 " Success rate: {:.1}%",
288 final_metrics.success_rate() * 100.0
289 );
290
291 taskflow_clone.shutdown().await?;
292 execution_handle.abort();
293
294 Ok(())
295}pub async fn wait_for_completion( &self, task_id: &str, timeout_seconds: Option<u64>, ) -> Result<Task>
Sourcepub async fn get_task_metrics(&self) -> Result<TaskMetrics>
pub async fn get_task_metrics(&self) -> Result<TaskMetrics>
Examples found in repository?
examples/yaml_config_usage.rs (line 58)
6async fn main() -> Result<(), Box<dyn std::error::Error>> {
7 init();
8
9 // Method 1: Load from YAML file
10 let taskflow = TaskFlow::from_yaml_file("examples/config.yaml").await?;
11 println!("TaskFlow framework started with YAML configuration!");
12
13 // Method 2: You can also load from YAML string directly
14 /*
15 let yaml_config = r#"
16scheduler:
17 poll_interval_seconds: 2
18 max_concurrent_tasks: 30
19 enable_dependency_resolution: true
20 cleanup_completed_tasks_after_hours: 24
21
22executor:
23 worker_id: "custom-worker-001"
24 max_concurrent_tasks: 15
25 task_timeout_seconds: 300
26 heartbeat_interval_seconds: 10
27
28storage_type: InMemory
29"#;
30 let taskflow = TaskFlow::from_yaml_str(yaml_config).await?;
31 "#;
32 */
33
34 let task_id = taskflow
35 .submit_http_task("fetch_example", "https://httpbin.org/get", Some("GET"))
36 .await?;
37
38 println!("Submitted HTTP task: {}", task_id);
39
40 let shell_task_id = taskflow
41 .submit_shell_task("list_files", "ls", vec!["-la"])
42 .await?;
43
44 println!("Submitted shell task: {}", shell_task_id);
45
46 let taskflow_clone = std::sync::Arc::new(taskflow);
47 let taskflow_for_execution = taskflow_clone.clone();
48
49 let execution_handle = tokio::spawn(async move {
50 if let Err(e) = taskflow_for_execution.start().await {
51 eprintln!("TaskFlow execution failed: {}", e);
52 }
53 });
54
55 tokio::time::sleep(Duration::from_secs(2)).await;
56
57 loop {
58 let metrics = taskflow_clone.get_task_metrics().await?;
59 println!(
60 "Task metrics: pending={}, running={}, completed={}, failed={}, total={}",
61 metrics.pending, metrics.running, metrics.completed, metrics.failed, metrics.total
62 );
63
64 if metrics.pending == 0 && metrics.running == 0 {
65 break;
66 }
67
68 tokio::time::sleep(Duration::from_secs(1)).await;
69 }
70
71 println!("All tasks completed!");
72
73 let tasks = taskflow_clone.list_tasks(None).await?;
74 for task in tasks {
75 println!("Task: {} - Status: {:?}", task.definition.name, task.status);
76 if let Some(result) = &task.result {
77 if result.success {
78 println!(" Output: {:?}", result.output);
79 } else {
80 println!(" Error: {:?}", result.error);
81 }
82 }
83 }
84
85 taskflow_clone.shutdown().await?;
86 execution_handle.abort();
87
88 Ok(())
89}More examples
examples/basic_usage.rs (line 51)
6async fn main() -> Result<(), Box<dyn std::error::Error>> {
7 init();
8
9 let config = TaskFlowConfig::with_in_memory();
10 let taskflow = TaskFlow::new(config).await?;
11
12 println!("TaskFlow framework started!");
13
14 let task_id = taskflow
15 .submit_http_task("fetch_example", "https://httpbin.org/get", Some("GET"))
16 .await?;
17
18 println!("Submitted HTTP task: {}", task_id);
19
20 let shell_task_id = taskflow
21 .submit_shell_task("list_files", "ls", vec!["-la"])
22 .await?;
23
24 println!("Submitted shell task: {}", shell_task_id);
25
26 let dependent_task = TaskDefinition::new("dependent_task", "shell_command")
27 .with_payload("command", serde_json::Value::String("echo".to_string()))
28 .with_payload(
29 "args",
30 serde_json::Value::Array(vec![serde_json::Value::String(
31 "This task depends on the shell task".to_string(),
32 )]),
33 )
34 .with_dependencies(vec![shell_task_id.clone()]);
35
36 let dependent_task_id = taskflow.submit_task(dependent_task).await?;
37 println!("Submitted dependent task: {}", dependent_task_id);
38
39 let taskflow_clone = std::sync::Arc::new(taskflow);
40 let taskflow_for_execution = taskflow_clone.clone();
41
42 let execution_handle = tokio::spawn(async move {
43 if let Err(e) = taskflow_for_execution.start().await {
44 eprintln!("TaskFlow execution failed: {}", e);
45 }
46 });
47
48 tokio::time::sleep(Duration::from_secs(2)).await;
49
50 loop {
51 let metrics = taskflow_clone.get_task_metrics().await?;
52 println!(
53 "Task metrics: pending={}, running={}, completed={}, failed={}, total={}",
54 metrics.pending, metrics.running, metrics.completed, metrics.failed, metrics.total
55 );
56
57 if metrics.pending == 0 && metrics.running == 0 {
58 break;
59 }
60
61 tokio::time::sleep(Duration::from_secs(1)).await;
62 }
63
64 println!("All tasks completed!");
65
66 let tasks = taskflow_clone.list_tasks(None).await?;
67 for task in tasks {
68 println!("Task: {} - Status: {:?}", task.definition.name, task.status);
69 if let Some(result) = &task.result {
70 if result.success {
71 println!(" Output: {:?}", result.output);
72 } else {
73 println!(" Error: {:?}", result.error);
74 }
75 }
76 }
77
78 taskflow_clone.shutdown().await?;
79 execution_handle.abort();
80
81 Ok(())
82}examples/custom_handler.rs (line 239)
169async fn main() -> Result<(), Box<dyn std::error::Error>> {
170 init();
171
172 let config = TaskFlowConfig::with_in_memory();
173 let taskflow = TaskFlow::new(config).await?;
174
175 taskflow.register_handler(Arc::new(MathTaskHandler)).await;
176 taskflow
177 .register_handler(Arc::new(DataProcessingHandler))
178 .await;
179
180 println!("TaskFlow with custom handlers started!");
181
182 let add_task = TaskDefinition::new("addition", "math_operation")
183 .with_payload("operation", serde_json::Value::String("add".to_string()))
184 .with_payload("a", serde_json::Value::Number(serde_json::Number::from(10)))
185 .with_payload("b", serde_json::Value::Number(serde_json::Number::from(5)));
186
187 let add_task_id = taskflow.submit_task(add_task).await?;
188 println!("Submitted addition task: {}", add_task_id);
189
190 let multiply_task = TaskDefinition::new("multiplication", "math_operation")
191 .with_payload(
192 "operation",
193 serde_json::Value::String("multiply".to_string()),
194 )
195 .with_payload("a", serde_json::Value::Number(serde_json::Number::from(7)))
196 .with_payload("b", serde_json::Value::Number(serde_json::Number::from(3)));
197
198 let multiply_task_id = taskflow.submit_task(multiply_task).await?;
199 println!("Submitted multiplication task: {}", multiply_task_id);
200
201 let data_array = vec![
202 serde_json::Value::Number(serde_json::Number::from(1)),
203 serde_json::Value::Number(serde_json::Number::from(2)),
204 serde_json::Value::Number(serde_json::Number::from(3)),
205 serde_json::Value::Number(serde_json::Number::from(4)),
206 serde_json::Value::Number(serde_json::Number::from(5)),
207 ];
208
209 let sum_task = TaskDefinition::new("sum_data", "data_processing")
210 .with_payload("operation", serde_json::Value::String("sum".to_string()))
211 .with_payload("data", serde_json::Value::Array(data_array.clone()));
212
213 let sum_task_id = taskflow.submit_task(sum_task).await?;
214 println!("Submitted sum task: {}", sum_task_id);
215
216 let avg_task = TaskDefinition::new("average_data", "data_processing")
217 .with_payload(
218 "operation",
219 serde_json::Value::String("average".to_string()),
220 )
221 .with_payload("data", serde_json::Value::Array(data_array))
222 .with_dependencies(vec![sum_task_id.clone()]);
223
224 let avg_task_id = taskflow.submit_task(avg_task).await?;
225 println!("Submitted average task (depends on sum): {}", avg_task_id);
226
227 let taskflow_clone = std::sync::Arc::new(taskflow);
228 let taskflow_for_execution = taskflow_clone.clone();
229
230 let execution_handle = tokio::spawn(async move {
231 if let Err(e) = taskflow_for_execution.start().await {
232 eprintln!("TaskFlow execution failed: {}", e);
233 }
234 });
235
236 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
237
238 loop {
239 let metrics = taskflow_clone.get_task_metrics().await?;
240 println!(
241 "Task metrics: pending={}, running={}, completed={}, failed={}",
242 metrics.pending, metrics.running, metrics.completed, metrics.failed
243 );
244
245 if metrics.pending == 0 && metrics.running == 0 {
246 break;
247 }
248
249 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
250 }
251
252 println!("\nAll tasks completed! Results:");
253
254 let tasks = taskflow_clone.list_tasks(None).await?;
255 for task in tasks {
256 println!(
257 "\nTask: {} ({})",
258 task.definition.name, task.definition.task_type
259 );
260 println!(" Status: {:?}", task.status);
261 if let Some(result) = &task.result {
262 if result.success {
263 println!(
264 " Result: {}",
265 result.output.as_ref().unwrap_or(&"No output".to_string())
266 );
267 println!(" Execution time: {}ms", result.execution_time_ms);
268 if !result.metadata.is_empty() {
269 println!(" Metadata: {:?}", result.metadata);
270 }
271 } else {
272 println!(
273 " Error: {}",
274 result
275 .error
276 .as_ref()
277 .unwrap_or(&"Unknown error".to_string())
278 );
279 }
280 }
281 }
282
283 let final_metrics = taskflow_clone.get_task_metrics().await?;
284 println!("\nFinal metrics:");
285 println!(" Total tasks: {}", final_metrics.total);
286 println!(
287 " Success rate: {:.1}%",
288 final_metrics.success_rate() * 100.0
289 );
290
291 taskflow_clone.shutdown().await?;
292 execution_handle.abort();
293
294 Ok(())
295}Auto Trait Implementations§
impl Freeze for TaskFlow
impl !RefUnwindSafe for TaskFlow
impl Send for TaskFlow
impl Sync for TaskFlow
impl Unpin for TaskFlow
impl !UnwindSafe for TaskFlow
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more