pub struct Workflow {
pub id: String,
pub name: String,
pub priority: u32,
pub description: Option<String>,
pub condition: Option<Value>,
pub tasks: Vec<Task>,
}
Fields§
§id: String
§name: String
§priority: u32
§description: Option<String>
§condition: Option<Value>
§tasks: Vec<Task>
Implementations§
Source§impl Workflow
impl Workflow
pub fn new() -> Self
Sourcepub fn from_json(json_str: &str) -> Result<Self>
pub fn from_json(json_str: &str) -> Result<Self>
Load workflow from JSON string
Examples found in repository?
examples/benchmark.rs (line 74)
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13 // Create the workflow engine (built-in functions are auto-registered)
14 let mut engine = Engine::new();
15
16 // Define a workflow that:
17 // 1. Uses pre-loaded data instead of HTTP fetch
18 // 2. Enriches the message with transformed data
19 // 3. Demonstrates proper async workflow execution
20 let workflow_json = r#"
21 {
22 "id": "benchmark_workflow",
23 "name": "Benchmark Workflow Example",
24 "description": "Demonstrates async workflow execution with data transformation",
25 "condition": { "==": [true, true] },
26 "tasks": [
27 {
28 "id": "transform_data",
29 "name": "Transform Data",
30 "description": "Map API response to our data model",
31 "function": {
32 "name": "map",
33 "input": {
34 "mappings": [
35 {
36 "path": "data.user.id",
37 "logic": { "var": "temp_data.body.id" }
38 },
39 {
40 "path": "data.user.name",
41 "logic": { "var": "temp_data.body.name" }
42 },
43 {
44 "path": "data.user.email",
45 "logic": { "var": "temp_data.body.email" }
46 },
47 {
48 "path": "data.user.address",
49 "logic": {
50 "cat": [
51 { "var": "temp_data.body.address.street" },
52 ", ",
53 { "var": "temp_data.body.address.city" }
54 ]
55 }
56 },
57 {
58 "path": "data.user.company",
59 "logic": { "var": "temp_data.body.company.name" }
60 },
61 {
62 "path": "data.processed_at",
63 "logic": { "cat": ["Processed at ", { "var": "metadata.timestamp" }] }
64 }
65 ]
66 }
67 }
68 }
69 ]
70 }
71 "#;
72
73 // Parse and add the workflow to the engine
74 let workflow = Workflow::from_json(workflow_json)?;
75 engine.add_workflow(&workflow);
76
77 // Create sample user data (similar to what the HTTP endpoint would return)
78 let sample_user_data = json!({
79 "body": {
80 "id": 1,
81 "name": "Leanne Graham",
82 "username": "Bret",
83 "email": "Sincere@april.biz",
84 "address": {
85 "street": "Kulas Light",
86 "suite": "Apt. 556",
87 "city": "Gwenborough",
88 "zipcode": "92998-3874",
89 "geo": {
90 "lat": "-37.3159",
91 "lng": "81.1496"
92 }
93 },
94 "phone": "1-770-736-8031 x56442",
95 "website": "hildegard.org",
96 "company": {
97 "name": "Romaguera-Crona",
98 "catchPhrase": "Multi-layered client-server neural-net",
99 "bs": "harness real-time e-markets"
100 }
101 }
102 });
103
104 // Run async benchmark
105 println!("=== ASYNC BENCHMARK ===");
106 let async_results = run_async_benchmark(&engine, &sample_user_data, 1000).await?;
107
108 // Run sync benchmark for comparison (using blocking approach)
109 println!("\n=== SYNC BENCHMARK (for comparison) ===");
110 let sync_results = run_sync_benchmark(&engine, &sample_user_data, 1000).await?;
111
112 // Compare results
113 println!("\n=== COMPARISON ===");
114 println!("Async avg: {:?}", async_results.avg_time);
115 println!("Sync avg: {:?}", sync_results.avg_time);
116 println!(
117 "Async is {:.2}x {} than sync",
118 if async_results.avg_time < sync_results.avg_time {
119 sync_results.avg_time.as_nanos() as f64 / async_results.avg_time.as_nanos() as f64
120 } else {
121 async_results.avg_time.as_nanos() as f64 / sync_results.avg_time.as_nanos() as f64
122 },
123 if async_results.avg_time < sync_results.avg_time {
124 "faster"
125 } else {
126 "slower"
127 }
128 );
129
130 // Log results to file
131 log_benchmark_results(
132 async_results.iterations,
133 async_results.min_time,
134 async_results.max_time,
135 async_results.avg_time,
136 async_results.p95,
137 async_results.p99,
138 async_results.total_time,
139 "async".to_string(),
140 )?;
141
142 log_benchmark_results(
143 sync_results.iterations,
144 sync_results.min_time,
145 sync_results.max_time,
146 sync_results.avg_time,
147 sync_results.p95,
148 sync_results.p99,
149 sync_results.total_time,
150 "sync".to_string(),
151 )?;
152
153 println!("\nBenchmark results saved to '{BENCHMARK_LOG_FILE}'");
154
155 Ok(())
156}
More examples
examples/complete_workflow.rs (line 132)
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6 // Create the workflow engine (built-in functions are auto-registered)
7 let mut engine = Engine::new();
8
9 // Define a workflow that:
10 // 1. Fetches data from a public API
11 // 2. Enriches the message with transformed data
12 // 3. Validates the enriched data
13 let workflow_json = r#"
14 {
15 "id": "complete_workflow",
16 "name": "Complete Workflow Example",
17 "priority": 0,
18 "description": "Demonstrates fetch -> enrich -> validate flow",
19 "condition": { "==": [true, true] },
20 "tasks": [
21 {
22 "id": "fetch_user_data",
23 "name": "Fetch User Data",
24 "description": "Get user data from a public API",
25 "function": {
26 "name": "http",
27 "input": {
28 "url": "https://jsonplaceholder.typicode.com/users/1",
29 "method": "GET",
30 "headers": {
31 "Accept": "application/json"
32 }
33 }
34 }
35 },
36 {
37 "id": "initialize_user",
38 "name": "Initialize User Structure",
39 "description": "Create empty user object in data",
40 "function": {
41 "name": "map",
42 "input": {
43 "mappings": [
44 {
45 "path": "data",
46 "logic": { "preserve": {"user": {}} }
47 }
48 ]
49 }
50 }
51 },
52 {
53 "id": "transform_data",
54 "name": "Transform Data",
55 "description": "Map API response to our data model",
56 "function": {
57 "name": "map",
58 "input": {
59 "mappings": [
60 {
61 "path": "data.user.id",
62 "logic": { "var": "temp_data.body.id" }
63 },
64 {
65 "path": "data.user.name",
66 "logic": { "var": "temp_data.body.name" }
67 },
68 {
69 "path": "data.user.email",
70 "logic": { "var": "temp_data.body.email" }
71 },
72 {
73 "path": "data.user.address",
74 "logic": {
75 "cat": [
76 { "var": "temp_data.body.address.street" },
77 ", ",
78 { "var": "temp_data.body.address.city" }
79 ]
80 }
81 },
82 {
83 "path": "data.user.company",
84 "logic": { "var": "temp_data.body.company.name" }
85 }
86 ]
87 }
88 }
89 },
90 {
91 "id": "validate_user_data",
92 "name": "Validate User Data",
93 "description": "Ensure the user data meets our requirements",
94 "function": {
95 "name": "validate",
96 "input": {
97 "rules": [
98 {
99 "path": "data",
100 "logic": { "!!": { "var": "data.user.id" } },
101 "message": "User ID is required"
102 },
103 {
104 "path": "data",
105 "logic": { "!!": { "var": "data.user.name" } },
106 "message": "User name is required"
107 },
108 {
109 "path": "data",
110 "logic": { "!!": { "var": "data.user.email" } },
111 "message": "User email is required"
112 },
113 {
114 "path": "data",
115 "logic": {
116 "in": [
117 "@",
118 { "var": "data.user.email" }
119 ]
120 },
121 "message": "Email must be valid format"
122 }
123 ]
124 }
125 }
126 }
127 ]
128 }
129 "#;
130
131 // Parse and add the workflow to the engine
132 let workflow = Workflow::from_json(workflow_json)?;
133 engine.add_workflow(&workflow);
134
135 // Create a message to process with properly initialized data structure
136 let mut message = Message::new(&json!({}));
137
138 // Process the message through the workflow asynchronously
139 println!("Processing message through workflow...");
140
141 match engine.process_message(&mut message).await {
142 Ok(_) => {
143 println!("Workflow completed successfully!");
144 }
145 Err(e) => {
146 eprintln!("Error executing workflow: {e:?}");
147 if !message.errors.is_empty() {
148 println!("\nErrors recorded in message:");
149 for err in &message.errors {
150 println!(
151 "- Workflow: {:?}, Task: {:?}, Error: {:?}",
152 err.workflow_id, err.task_id, err.error_message
153 );
154 }
155 }
156 }
157 }
158
159 println!(
160 "\nFull message structure:\n{}",
161 serde_json::to_string_pretty(&message)?
162 );
163
164 Ok(())
165}
examples/custom_function.rs (line 388)
307async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
308 println!("=== Custom Function Example ===\n");
309
310 // Create engine without built-in functions to demonstrate custom ones
311 let mut engine = Engine::new_empty();
312
313 // Register our custom functions
314 engine.register_task_function(
315 "statistics".to_string(),
316 Box::new(StatisticsFunction::new()),
317 );
318
319 engine.register_task_function(
320 "enrich_data".to_string(),
321 Box::new(DataEnrichmentFunction::new()),
322 );
323
324 // Also register built-in map function for data preparation
325 engine.register_task_function(
326 "map".to_string(),
327 Box::new(dataflow_rs::engine::functions::MapFunction::new()),
328 );
329
330 // Define a workflow that uses our custom functions
331 let workflow_json = r#"
332 {
333 "id": "custom_function_demo",
334 "name": "Custom Function Demo",
335 "description": "Demonstrates custom async functions in workflow",
336 "condition": { "==": [true, true] },
337 "tasks": [
338 {
339 "id": "prepare_data",
340 "name": "Prepare Data",
341 "description": "Extract and prepare data for analysis",
342 "function": {
343 "name": "map",
344 "input": {
345 "mappings": [
346 {
347 "path": "data.numbers",
348 "logic": { "var": "temp_data.measurements" }
349 },
350 {
351 "path": "data.user_id",
352 "logic": { "var": "temp_data.user_id" }
353 }
354 ]
355 }
356 }
357 },
358 {
359 "id": "calculate_stats",
360 "name": "Calculate Statistics",
361 "description": "Calculate statistical measures from numeric data",
362 "function": {
363 "name": "statistics",
364 "input": {
365 "data_path": "data.numbers",
366 "output_path": "data.stats"
367 }
368 }
369 },
370 {
371 "id": "enrich_user_data",
372 "name": "Enrich User Data",
373 "description": "Add additional user information",
374 "function": {
375 "name": "enrich_data",
376 "input": {
377 "lookup_field": "user_id",
378 "lookup_value": "user_123",
379 "output_path": "data.user_info"
380 }
381 }
382 }
383 ]
384 }
385 "#;
386
387 // Parse and add the workflow
388 let workflow = Workflow::from_json(workflow_json)?;
389 engine.add_workflow(&workflow);
390
391 // Create sample data
392 let sample_data = json!({
393 "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
394 "user_id": "user_123",
395 "timestamp": "2024-01-15T10:30:00Z"
396 });
397
398 // Create and process message
399 let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
400 message.temp_data = sample_data;
401 message.data = json!({});
402
403 println!("Processing message with custom functions...\n");
404
405 // Process the message through our custom workflow
406 match engine.process_message(&mut message).await {
407 Ok(_) => {
408 println!("✅ Message processed successfully!\n");
409
410 println!("📊 Final Results:");
411 println!("{}\n", serde_json::to_string_pretty(&message.data)?);
412
413 println!("📋 Audit Trail:");
414 for (i, audit) in message.audit_trail.iter().enumerate() {
415 println!(
416 "{}. Task: {} (Status: {})",
417 i + 1,
418 audit.task_id,
419 audit.status_code
420 );
421 println!(" Timestamp: {}", audit.timestamp);
422 println!(" Changes: {} field(s) modified", audit.changes.len());
423 }
424
425 if message.has_errors() {
426 println!("\n⚠️ Errors encountered:");
427 for error in &message.errors {
428 println!(
429 " - {}: {:?}",
430 error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
431 error.error_message
432 );
433 }
434 }
435 }
436 Err(e) => {
437 println!("❌ Error processing message: {e:?}");
438 }
439 }
440
441 // Demonstrate another example with different data
442 let separator = "=".repeat(50);
443 println!("\n{separator}");
444 println!("=== Second Example with Different User ===\n");
445
446 let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
447 message2.temp_data = json!({
448 "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
449 "user_id": "user_456",
450 "timestamp": "2024-01-15T11:00:00Z"
451 });
452 message2.data = json!({});
453
454 // Create a workflow for the second user
455 let workflow2_json = r#"
456 {
457 "id": "custom_function_demo_2",
458 "name": "Custom Function Demo 2",
459 "description": "Second demo with different user",
460 "condition": { "==": [true, true] },
461 "tasks": [
462 {
463 "id": "prepare_data",
464 "name": "Prepare Data",
465 "function": {
466 "name": "map",
467 "input": {
468 "mappings": [
469 {
470 "path": "data.numbers",
471 "logic": { "var": "temp_data.measurements" }
472 },
473 {
474 "path": "data.user_id",
475 "logic": { "var": "temp_data.user_id" }
476 }
477 ]
478 }
479 }
480 },
481 {
482 "id": "calculate_stats",
483 "name": "Calculate Statistics",
484 "function": {
485 "name": "statistics",
486 "input": {
487 "data_path": "data.numbers",
488 "output_path": "data.analysis"
489 }
490 }
491 },
492 {
493 "id": "enrich_user_data",
494 "name": "Enrich User Data",
495 "function": {
496 "name": "enrich_data",
497 "input": {
498 "lookup_field": "user_id",
499 "lookup_value": "user_456",
500 "output_path": "data.employee_details"
501 }
502 }
503 }
504 ]
505 }
506 "#;
507
508 let workflow2 = Workflow::from_json(workflow2_json)?;
509 engine.add_workflow(&workflow2);
510
511 match engine.process_message(&mut message2).await {
512 Ok(_) => {
513 println!("✅ Second message processed successfully!\n");
514 println!("📊 Results for user_456:");
515 println!("{}", serde_json::to_string_pretty(&message2.data)?);
516 }
517 Err(e) => {
518 println!("❌ Error processing second message: {e:?}");
519 }
520 }
521
522 println!("\n🎉 Custom function examples completed!");
523
524 Ok(())
525}
Trait Implementations§
Source§impl<'de> Deserialize<'de> for Workflow
impl<'de> Deserialize<'de> for Workflow
Source§fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
Deserialize this value from the given Serde deserializer. Read more
Auto Trait Implementations§
impl Freeze for Workflow
impl RefUnwindSafe for Workflow
impl Send for Workflow
impl Sync for Workflow
impl Unpin for Workflow
impl UnwindSafe for Workflow
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more