Struct Engine

Source
pub struct Engine { /* private fields */ }
Expand description

Engine that processes messages through workflows using non-blocking async IO.

This engine is optimized for IO-bound workloads like HTTP requests, database access, and file operations. It uses Tokio for efficient async task execution.

Workflows are processed sequentially to ensure that later workflows can depend on the results of earlier workflows.

Implementations§

Source§

impl Engine

Source

pub fn new() -> Self

Creates a new Engine instance with built-in function handlers pre-registered.

§Example
use dataflow_rs::Engine;

let engine = Engine::new();
Examples found in repository?
examples/benchmark.rs (line 14)
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13    // Create the workflow engine (built-in functions are auto-registered)
14    let mut engine = Engine::new();
15
16    // Define a workflow that:
17    // 1. Uses pre-loaded data instead of HTTP fetch
18    // 2. Enriches the message with transformed data
19    // 3. Demonstrates proper async workflow execution
20    let workflow_json = r#"
21    {
22        "id": "benchmark_workflow",
23        "name": "Benchmark Workflow Example",
24        "description": "Demonstrates async workflow execution with data transformation",
25        "condition": { "==": [true, true] },
26        "tasks": [
27            {
28                "id": "transform_data",
29                "name": "Transform Data",
30                "description": "Map API response to our data model",
31                "function": {
32                    "name": "map",
33                    "input": {
34                        "mappings": [
35                            {
36                                "path": "data.user.id", 
37                                "logic": { "var": "temp_data.body.id" }
38                            },
39                            {
40                                "path": "data.user.name", 
41                                "logic": { "var": "temp_data.body.name" }
42                            },
43                            {
44                                "path": "data.user.email", 
45                                "logic": { "var": "temp_data.body.email" }
46                            },
47                            {
48                                "path": "data.user.address", 
49                                "logic": {
50                                    "cat": [
51                                        { "var": "temp_data.body.address.street" },
52                                        ", ",
53                                        { "var": "temp_data.body.address.city" }
54                                    ]
55                                }
56                            },
57                            {
58                                "path": "data.user.company", 
59                                "logic": { "var": "temp_data.body.company.name" }
60                            },
61                            {
62                                "path": "data.processed_at", 
63                                "logic": { "cat": ["Processed at ", { "var": "metadata.timestamp" }] }
64                            }
65                        ]
66                    }
67                }
68            }
69        ]
70    }
71    "#;
72
73    // Parse and add the workflow to the engine
74    let workflow = Workflow::from_json(workflow_json)?;
75    engine.add_workflow(&workflow);
76
77    // Create sample user data (similar to what the HTTP endpoint would return)
78    let sample_user_data = json!({
79        "body": {
80            "id": 1,
81            "name": "Leanne Graham",
82            "username": "Bret",
83            "email": "Sincere@april.biz",
84            "address": {
85                "street": "Kulas Light",
86                "suite": "Apt. 556",
87                "city": "Gwenborough",
88                "zipcode": "92998-3874",
89                "geo": {
90                    "lat": "-37.3159",
91                    "lng": "81.1496"
92                }
93            },
94            "phone": "1-770-736-8031 x56442",
95            "website": "hildegard.org",
96            "company": {
97                "name": "Romaguera-Crona",
98                "catchPhrase": "Multi-layered client-server neural-net",
99                "bs": "harness real-time e-markets"
100            }
101        }
102    });
103
104    // Run async benchmark
105    println!("=== ASYNC BENCHMARK ===");
106    let async_results = run_async_benchmark(&engine, &sample_user_data, 1000).await?;
107
108    // Run sync benchmark for comparison (using blocking approach)
109    println!("\n=== SYNC BENCHMARK (for comparison) ===");
110    let sync_results = run_sync_benchmark(&engine, &sample_user_data, 1000).await?;
111
112    // Compare results
113    println!("\n=== COMPARISON ===");
114    println!("Async avg: {:?}", async_results.avg_time);
115    println!("Sync avg:  {:?}", sync_results.avg_time);
116    println!(
117        "Async is {:.2}x {} than sync",
118        if async_results.avg_time < sync_results.avg_time {
119            sync_results.avg_time.as_nanos() as f64 / async_results.avg_time.as_nanos() as f64
120        } else {
121            async_results.avg_time.as_nanos() as f64 / sync_results.avg_time.as_nanos() as f64
122        },
123        if async_results.avg_time < sync_results.avg_time {
124            "faster"
125        } else {
126            "slower"
127        }
128    );
129
130    // Log results to file
131    log_benchmark_results(
132        async_results.iterations,
133        async_results.min_time,
134        async_results.max_time,
135        async_results.avg_time,
136        async_results.p95,
137        async_results.p99,
138        async_results.total_time,
139        "async".to_string(),
140    )?;
141
142    log_benchmark_results(
143        sync_results.iterations,
144        sync_results.min_time,
145        sync_results.max_time,
146        sync_results.avg_time,
147        sync_results.p95,
148        sync_results.p99,
149        sync_results.total_time,
150        "sync".to_string(),
151    )?;
152
153    println!("\nBenchmark results saved to '{BENCHMARK_LOG_FILE}'");
154
155    Ok(())
156}
More examples
Hide additional examples
examples/complete_workflow.rs (line 7)
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6    // Create the workflow engine (built-in functions are auto-registered)
7    let mut engine = Engine::new();
8
9    // Define a workflow that:
10    // 1. Fetches data from a public API
11    // 2. Enriches the message with transformed data
12    // 3. Validates the enriched data
13    let workflow_json = r#"
14    {
15        "id": "complete_workflow",
16        "name": "Complete Workflow Example",
17        "priority": 0,
18        "description": "Demonstrates fetch -> enrich -> validate flow",
19        "condition": { "==": [true, true] },
20        "tasks": [
21            {
22                "id": "fetch_user_data",
23                "name": "Fetch User Data",
24                "description": "Get user data from a public API",
25                "function": {
26                    "name": "http",
27                    "input": {
28                        "url": "https://jsonplaceholder.typicode.com/users/1",
29                        "method": "GET",
30                        "headers": {
31                            "Accept": "application/json"
32                        }
33                    }
34                }
35            },
36            {
37                "id": "initialize_user",
38                "name": "Initialize User Structure",
39                "description": "Create empty user object in data",
40                "function": {
41                    "name": "map",
42                    "input": {
43                        "mappings": [
44                            {
45                                "path": "data",
46                                "logic": { "preserve": {"user": {}} }
47                            }
48                        ]
49                    }
50                }
51            },
52            {
53                "id": "transform_data",
54                "name": "Transform Data",
55                "description": "Map API response to our data model",
56                "function": {
57                    "name": "map",
58                    "input": {
59                        "mappings": [
60                            {
61                                "path": "data.user.id", 
62                                "logic": { "var": "temp_data.body.id" }
63                            },
64                            {
65                                "path": "data.user.name", 
66                                "logic": { "var": "temp_data.body.name" }
67                            },
68                            {
69                                "path": "data.user.email", 
70                                "logic": { "var": "temp_data.body.email" }
71                            },
72                            {
73                                "path": "data.user.address", 
74                                "logic": {
75                                    "cat": [
76                                        { "var": "temp_data.body.address.street" },
77                                        ", ",
78                                        { "var": "temp_data.body.address.city" }
79                                    ]
80                                }
81                            },
82                            {
83                                "path": "data.user.company", 
84                                "logic": { "var": "temp_data.body.company.name" }
85                            }
86                        ]
87                    }
88                }
89            },
90            {
91                "id": "validate_user_data",
92                "name": "Validate User Data",
93                "description": "Ensure the user data meets our requirements",
94                "function": {
95                    "name": "validate",
96                    "input": {
97                        "rules": [
98                            {
99                                "path": "data",
100                                "logic": { "!!": { "var": "data.user.id" } },
101                                "message": "User ID is required"
102                            },
103                            {
104                                "path": "data",
105                                "logic": { "!!": { "var": "data.user.name" } },
106                                "message": "User name is required"
107                            },
108                            {
109                                "path": "data",
110                                "logic": { "!!": { "var": "data.user.email" } },
111                                "message": "User email is required"
112                            },
113                            {
114                                "path": "data",
115                                "logic": {
116                                    "in": [
117                                        "@",
118                                        { "var": "data.user.email" }
119                                    ]
120                                },
121                                "message": "Email must be valid format"
122                            }
123                        ]
124                    }
125                }
126            }
127        ]
128    }
129    "#;
130
131    // Parse and add the workflow to the engine
132    let workflow = Workflow::from_json(workflow_json)?;
133    engine.add_workflow(&workflow);
134
135    // Create a message to process with properly initialized data structure
136    let mut message = Message::new(&json!({}));
137
138    // Process the message through the workflow asynchronously
139    println!("Processing message through workflow...");
140
141    match engine.process_message(&mut message).await {
142        Ok(_) => {
143            println!("Workflow completed successfully!");
144        }
145        Err(e) => {
146            eprintln!("Error executing workflow: {e:?}");
147            if !message.errors.is_empty() {
148                println!("\nErrors recorded in message:");
149                for err in &message.errors {
150                    println!(
151                        "- Workflow: {:?}, Task: {:?}, Error: {:?}",
152                        err.workflow_id, err.task_id, err.error_message
153                    );
154                }
155            }
156        }
157    }
158
159    println!(
160        "\nFull message structure:\n{}",
161        serde_json::to_string_pretty(&message)?
162    );
163
164    Ok(())
165}
Source

pub fn new_empty() -> Self

Create a new engine instance without any pre-registered functions

Examples found in repository?
examples/custom_function.rs (line 311)
307async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
308    println!("=== Custom Function Example ===\n");
309
310    // Create engine without built-in functions to demonstrate custom ones
311    let mut engine = Engine::new_empty();
312
313    // Register our custom functions
314    engine.register_task_function(
315        "statistics".to_string(),
316        Box::new(StatisticsFunction::new()),
317    );
318
319    engine.register_task_function(
320        "enrich_data".to_string(),
321        Box::new(DataEnrichmentFunction::new()),
322    );
323
324    // Also register built-in map function for data preparation
325    engine.register_task_function(
326        "map".to_string(),
327        Box::new(dataflow_rs::engine::functions::MapFunction::new()),
328    );
329
330    // Define a workflow that uses our custom functions
331    let workflow_json = r#"
332    {
333        "id": "custom_function_demo",
334        "name": "Custom Function Demo",
335        "description": "Demonstrates custom async functions in workflow",
336        "condition": { "==": [true, true] },
337        "tasks": [
338            {
339                "id": "prepare_data",
340                "name": "Prepare Data",
341                "description": "Extract and prepare data for analysis",
342                "function": {
343                    "name": "map",
344                    "input": {
345                        "mappings": [
346                            {
347                                "path": "data.numbers",
348                                "logic": { "var": "temp_data.measurements" }
349                            },
350                            {
351                                "path": "data.user_id",
352                                "logic": { "var": "temp_data.user_id" }
353                            }
354                        ]
355                    }
356                }
357            },
358            {
359                "id": "calculate_stats",
360                "name": "Calculate Statistics",
361                "description": "Calculate statistical measures from numeric data",
362                "function": {
363                    "name": "statistics",
364                    "input": {
365                        "data_path": "data.numbers",
366                        "output_path": "data.stats"
367                    }
368                }
369            },
370            {
371                "id": "enrich_user_data",
372                "name": "Enrich User Data",
373                "description": "Add additional user information",
374                "function": {
375                    "name": "enrich_data",
376                    "input": {
377                        "lookup_field": "user_id",
378                        "lookup_value": "user_123",
379                        "output_path": "data.user_info"
380                    }
381                }
382            }
383        ]
384    }
385    "#;
386
387    // Parse and add the workflow
388    let workflow = Workflow::from_json(workflow_json)?;
389    engine.add_workflow(&workflow);
390
391    // Create sample data
392    let sample_data = json!({
393        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
394        "user_id": "user_123",
395        "timestamp": "2024-01-15T10:30:00Z"
396    });
397
398    // Create and process message
399    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
400    message.temp_data = sample_data;
401    message.data = json!({});
402
403    println!("Processing message with custom functions...\n");
404
405    // Process the message through our custom workflow
406    match engine.process_message(&mut message).await {
407        Ok(_) => {
408            println!("✅ Message processed successfully!\n");
409
410            println!("📊 Final Results:");
411            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
412
413            println!("📋 Audit Trail:");
414            for (i, audit) in message.audit_trail.iter().enumerate() {
415                println!(
416                    "{}. Task: {} (Status: {})",
417                    i + 1,
418                    audit.task_id,
419                    audit.status_code
420                );
421                println!("   Timestamp: {}", audit.timestamp);
422                println!("   Changes: {} field(s) modified", audit.changes.len());
423            }
424
425            if message.has_errors() {
426                println!("\n⚠️  Errors encountered:");
427                for error in &message.errors {
428                    println!(
429                        "   - {}: {:?}",
430                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
431                        error.error_message
432                    );
433                }
434            }
435        }
436        Err(e) => {
437            println!("❌ Error processing message: {e:?}");
438        }
439    }
440
441    // Demonstrate another example with different data
442    let separator = "=".repeat(50);
443    println!("\n{separator}");
444    println!("=== Second Example with Different User ===\n");
445
446    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
447    message2.temp_data = json!({
448        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
449        "user_id": "user_456",
450        "timestamp": "2024-01-15T11:00:00Z"
451    });
452    message2.data = json!({});
453
454    // Create a workflow for the second user
455    let workflow2_json = r#"
456    {
457        "id": "custom_function_demo_2",
458        "name": "Custom Function Demo 2",
459        "description": "Second demo with different user",
460        "condition": { "==": [true, true] },
461        "tasks": [
462            {
463                "id": "prepare_data",
464                "name": "Prepare Data",
465                "function": {
466                    "name": "map",
467                    "input": {
468                        "mappings": [
469                            {
470                                "path": "data.numbers",
471                                "logic": { "var": "temp_data.measurements" }
472                            },
473                            {
474                                "path": "data.user_id",
475                                "logic": { "var": "temp_data.user_id" }
476                            }
477                        ]
478                    }
479                }
480            },
481            {
482                "id": "calculate_stats",
483                "name": "Calculate Statistics",
484                "function": {
485                    "name": "statistics",
486                    "input": {
487                        "data_path": "data.numbers",
488                        "output_path": "data.analysis"
489                    }
490                }
491            },
492            {
493                "id": "enrich_user_data",
494                "name": "Enrich User Data",
495                "function": {
496                    "name": "enrich_data",
497                    "input": {
498                        "lookup_field": "user_id",
499                        "lookup_value": "user_456",
500                        "output_path": "data.employee_details"
501                    }
502                }
503            }
504        ]
505    }
506    "#;
507
508    let workflow2 = Workflow::from_json(workflow2_json)?;
509    engine.add_workflow(&workflow2);
510
511    match engine.process_message(&mut message2).await {
512        Ok(_) => {
513            println!("✅ Second message processed successfully!\n");
514            println!("📊 Results for user_456:");
515            println!("{}", serde_json::to_string_pretty(&message2.data)?);
516        }
517        Err(e) => {
518            println!("❌ Error processing second message: {e:?}");
519        }
520    }
521
522    println!("\n🎉 Custom function examples completed!");
523
524    Ok(())
525}
Source

pub fn with_retry_config(self, config: RetryConfig) -> Self

Configure retry behavior

Source

pub fn add_workflow(&mut self, workflow: &Workflow)

Adds a workflow to the engine.

§Arguments
  • workflow - The workflow to add
Examples found in repository?
examples/benchmark.rs (line 75)
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13    // Create the workflow engine (built-in functions are auto-registered)
14    let mut engine = Engine::new();
15
16    // Define a workflow that:
17    // 1. Uses pre-loaded data instead of HTTP fetch
18    // 2. Enriches the message with transformed data
19    // 3. Demonstrates proper async workflow execution
20    let workflow_json = r#"
21    {
22        "id": "benchmark_workflow",
23        "name": "Benchmark Workflow Example",
24        "description": "Demonstrates async workflow execution with data transformation",
25        "condition": { "==": [true, true] },
26        "tasks": [
27            {
28                "id": "transform_data",
29                "name": "Transform Data",
30                "description": "Map API response to our data model",
31                "function": {
32                    "name": "map",
33                    "input": {
34                        "mappings": [
35                            {
36                                "path": "data.user.id", 
37                                "logic": { "var": "temp_data.body.id" }
38                            },
39                            {
40                                "path": "data.user.name", 
41                                "logic": { "var": "temp_data.body.name" }
42                            },
43                            {
44                                "path": "data.user.email", 
45                                "logic": { "var": "temp_data.body.email" }
46                            },
47                            {
48                                "path": "data.user.address", 
49                                "logic": {
50                                    "cat": [
51                                        { "var": "temp_data.body.address.street" },
52                                        ", ",
53                                        { "var": "temp_data.body.address.city" }
54                                    ]
55                                }
56                            },
57                            {
58                                "path": "data.user.company", 
59                                "logic": { "var": "temp_data.body.company.name" }
60                            },
61                            {
62                                "path": "data.processed_at", 
63                                "logic": { "cat": ["Processed at ", { "var": "metadata.timestamp" }] }
64                            }
65                        ]
66                    }
67                }
68            }
69        ]
70    }
71    "#;
72
73    // Parse and add the workflow to the engine
74    let workflow = Workflow::from_json(workflow_json)?;
75    engine.add_workflow(&workflow);
76
77    // Create sample user data (similar to what the HTTP endpoint would return)
78    let sample_user_data = json!({
79        "body": {
80            "id": 1,
81            "name": "Leanne Graham",
82            "username": "Bret",
83            "email": "Sincere@april.biz",
84            "address": {
85                "street": "Kulas Light",
86                "suite": "Apt. 556",
87                "city": "Gwenborough",
88                "zipcode": "92998-3874",
89                "geo": {
90                    "lat": "-37.3159",
91                    "lng": "81.1496"
92                }
93            },
94            "phone": "1-770-736-8031 x56442",
95            "website": "hildegard.org",
96            "company": {
97                "name": "Romaguera-Crona",
98                "catchPhrase": "Multi-layered client-server neural-net",
99                "bs": "harness real-time e-markets"
100            }
101        }
102    });
103
104    // Run async benchmark
105    println!("=== ASYNC BENCHMARK ===");
106    let async_results = run_async_benchmark(&engine, &sample_user_data, 1000).await?;
107
108    // Run sync benchmark for comparison (using blocking approach)
109    println!("\n=== SYNC BENCHMARK (for comparison) ===");
110    let sync_results = run_sync_benchmark(&engine, &sample_user_data, 1000).await?;
111
112    // Compare results
113    println!("\n=== COMPARISON ===");
114    println!("Async avg: {:?}", async_results.avg_time);
115    println!("Sync avg:  {:?}", sync_results.avg_time);
116    println!(
117        "Async is {:.2}x {} than sync",
118        if async_results.avg_time < sync_results.avg_time {
119            sync_results.avg_time.as_nanos() as f64 / async_results.avg_time.as_nanos() as f64
120        } else {
121            async_results.avg_time.as_nanos() as f64 / sync_results.avg_time.as_nanos() as f64
122        },
123        if async_results.avg_time < sync_results.avg_time {
124            "faster"
125        } else {
126            "slower"
127        }
128    );
129
130    // Log results to file
131    log_benchmark_results(
132        async_results.iterations,
133        async_results.min_time,
134        async_results.max_time,
135        async_results.avg_time,
136        async_results.p95,
137        async_results.p99,
138        async_results.total_time,
139        "async".to_string(),
140    )?;
141
142    log_benchmark_results(
143        sync_results.iterations,
144        sync_results.min_time,
145        sync_results.max_time,
146        sync_results.avg_time,
147        sync_results.p95,
148        sync_results.p99,
149        sync_results.total_time,
150        "sync".to_string(),
151    )?;
152
153    println!("\nBenchmark results saved to '{BENCHMARK_LOG_FILE}'");
154
155    Ok(())
156}
More examples
Hide additional examples
examples/complete_workflow.rs (line 133)
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6    // Create the workflow engine (built-in functions are auto-registered)
7    let mut engine = Engine::new();
8
9    // Define a workflow that:
10    // 1. Fetches data from a public API
11    // 2. Enriches the message with transformed data
12    // 3. Validates the enriched data
13    let workflow_json = r#"
14    {
15        "id": "complete_workflow",
16        "name": "Complete Workflow Example",
17        "priority": 0,
18        "description": "Demonstrates fetch -> enrich -> validate flow",
19        "condition": { "==": [true, true] },
20        "tasks": [
21            {
22                "id": "fetch_user_data",
23                "name": "Fetch User Data",
24                "description": "Get user data from a public API",
25                "function": {
26                    "name": "http",
27                    "input": {
28                        "url": "https://jsonplaceholder.typicode.com/users/1",
29                        "method": "GET",
30                        "headers": {
31                            "Accept": "application/json"
32                        }
33                    }
34                }
35            },
36            {
37                "id": "initialize_user",
38                "name": "Initialize User Structure",
39                "description": "Create empty user object in data",
40                "function": {
41                    "name": "map",
42                    "input": {
43                        "mappings": [
44                            {
45                                "path": "data",
46                                "logic": { "preserve": {"user": {}} }
47                            }
48                        ]
49                    }
50                }
51            },
52            {
53                "id": "transform_data",
54                "name": "Transform Data",
55                "description": "Map API response to our data model",
56                "function": {
57                    "name": "map",
58                    "input": {
59                        "mappings": [
60                            {
61                                "path": "data.user.id", 
62                                "logic": { "var": "temp_data.body.id" }
63                            },
64                            {
65                                "path": "data.user.name", 
66                                "logic": { "var": "temp_data.body.name" }
67                            },
68                            {
69                                "path": "data.user.email", 
70                                "logic": { "var": "temp_data.body.email" }
71                            },
72                            {
73                                "path": "data.user.address", 
74                                "logic": {
75                                    "cat": [
76                                        { "var": "temp_data.body.address.street" },
77                                        ", ",
78                                        { "var": "temp_data.body.address.city" }
79                                    ]
80                                }
81                            },
82                            {
83                                "path": "data.user.company", 
84                                "logic": { "var": "temp_data.body.company.name" }
85                            }
86                        ]
87                    }
88                }
89            },
90            {
91                "id": "validate_user_data",
92                "name": "Validate User Data",
93                "description": "Ensure the user data meets our requirements",
94                "function": {
95                    "name": "validate",
96                    "input": {
97                        "rules": [
98                            {
99                                "path": "data",
100                                "logic": { "!!": { "var": "data.user.id" } },
101                                "message": "User ID is required"
102                            },
103                            {
104                                "path": "data",
105                                "logic": { "!!": { "var": "data.user.name" } },
106                                "message": "User name is required"
107                            },
108                            {
109                                "path": "data",
110                                "logic": { "!!": { "var": "data.user.email" } },
111                                "message": "User email is required"
112                            },
113                            {
114                                "path": "data",
115                                "logic": {
116                                    "in": [
117                                        "@",
118                                        { "var": "data.user.email" }
119                                    ]
120                                },
121                                "message": "Email must be valid format"
122                            }
123                        ]
124                    }
125                }
126            }
127        ]
128    }
129    "#;
130
131    // Parse and add the workflow to the engine
132    let workflow = Workflow::from_json(workflow_json)?;
133    engine.add_workflow(&workflow);
134
135    // Create a message to process with properly initialized data structure
136    let mut message = Message::new(&json!({}));
137
138    // Process the message through the workflow asynchronously
139    println!("Processing message through workflow...");
140
141    match engine.process_message(&mut message).await {
142        Ok(_) => {
143            println!("Workflow completed successfully!");
144        }
145        Err(e) => {
146            eprintln!("Error executing workflow: {e:?}");
147            if !message.errors.is_empty() {
148                println!("\nErrors recorded in message:");
149                for err in &message.errors {
150                    println!(
151                        "- Workflow: {:?}, Task: {:?}, Error: {:?}",
152                        err.workflow_id, err.task_id, err.error_message
153                    );
154                }
155            }
156        }
157    }
158
159    println!(
160        "\nFull message structure:\n{}",
161        serde_json::to_string_pretty(&message)?
162    );
163
164    Ok(())
165}
examples/custom_function.rs (line 389)
307async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
308    println!("=== Custom Function Example ===\n");
309
310    // Create engine without built-in functions to demonstrate custom ones
311    let mut engine = Engine::new_empty();
312
313    // Register our custom functions
314    engine.register_task_function(
315        "statistics".to_string(),
316        Box::new(StatisticsFunction::new()),
317    );
318
319    engine.register_task_function(
320        "enrich_data".to_string(),
321        Box::new(DataEnrichmentFunction::new()),
322    );
323
324    // Also register built-in map function for data preparation
325    engine.register_task_function(
326        "map".to_string(),
327        Box::new(dataflow_rs::engine::functions::MapFunction::new()),
328    );
329
330    // Define a workflow that uses our custom functions
331    let workflow_json = r#"
332    {
333        "id": "custom_function_demo",
334        "name": "Custom Function Demo",
335        "description": "Demonstrates custom async functions in workflow",
336        "condition": { "==": [true, true] },
337        "tasks": [
338            {
339                "id": "prepare_data",
340                "name": "Prepare Data",
341                "description": "Extract and prepare data for analysis",
342                "function": {
343                    "name": "map",
344                    "input": {
345                        "mappings": [
346                            {
347                                "path": "data.numbers",
348                                "logic": { "var": "temp_data.measurements" }
349                            },
350                            {
351                                "path": "data.user_id",
352                                "logic": { "var": "temp_data.user_id" }
353                            }
354                        ]
355                    }
356                }
357            },
358            {
359                "id": "calculate_stats",
360                "name": "Calculate Statistics",
361                "description": "Calculate statistical measures from numeric data",
362                "function": {
363                    "name": "statistics",
364                    "input": {
365                        "data_path": "data.numbers",
366                        "output_path": "data.stats"
367                    }
368                }
369            },
370            {
371                "id": "enrich_user_data",
372                "name": "Enrich User Data",
373                "description": "Add additional user information",
374                "function": {
375                    "name": "enrich_data",
376                    "input": {
377                        "lookup_field": "user_id",
378                        "lookup_value": "user_123",
379                        "output_path": "data.user_info"
380                    }
381                }
382            }
383        ]
384    }
385    "#;
386
387    // Parse and add the workflow
388    let workflow = Workflow::from_json(workflow_json)?;
389    engine.add_workflow(&workflow);
390
391    // Create sample data
392    let sample_data = json!({
393        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
394        "user_id": "user_123",
395        "timestamp": "2024-01-15T10:30:00Z"
396    });
397
398    // Create and process message
399    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
400    message.temp_data = sample_data;
401    message.data = json!({});
402
403    println!("Processing message with custom functions...\n");
404
405    // Process the message through our custom workflow
406    match engine.process_message(&mut message).await {
407        Ok(_) => {
408            println!("✅ Message processed successfully!\n");
409
410            println!("📊 Final Results:");
411            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
412
413            println!("📋 Audit Trail:");
414            for (i, audit) in message.audit_trail.iter().enumerate() {
415                println!(
416                    "{}. Task: {} (Status: {})",
417                    i + 1,
418                    audit.task_id,
419                    audit.status_code
420                );
421                println!("   Timestamp: {}", audit.timestamp);
422                println!("   Changes: {} field(s) modified", audit.changes.len());
423            }
424
425            if message.has_errors() {
426                println!("\n⚠️  Errors encountered:");
427                for error in &message.errors {
428                    println!(
429                        "   - {}: {:?}",
430                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
431                        error.error_message
432                    );
433                }
434            }
435        }
436        Err(e) => {
437            println!("❌ Error processing message: {e:?}");
438        }
439    }
440
441    // Demonstrate another example with different data
442    let separator = "=".repeat(50);
443    println!("\n{separator}");
444    println!("=== Second Example with Different User ===\n");
445
446    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
447    message2.temp_data = json!({
448        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
449        "user_id": "user_456",
450        "timestamp": "2024-01-15T11:00:00Z"
451    });
452    message2.data = json!({});
453
454    // Create a workflow for the second user
455    let workflow2_json = r#"
456    {
457        "id": "custom_function_demo_2",
458        "name": "Custom Function Demo 2",
459        "description": "Second demo with different user",
460        "condition": { "==": [true, true] },
461        "tasks": [
462            {
463                "id": "prepare_data",
464                "name": "Prepare Data",
465                "function": {
466                    "name": "map",
467                    "input": {
468                        "mappings": [
469                            {
470                                "path": "data.numbers",
471                                "logic": { "var": "temp_data.measurements" }
472                            },
473                            {
474                                "path": "data.user_id",
475                                "logic": { "var": "temp_data.user_id" }
476                            }
477                        ]
478                    }
479                }
480            },
481            {
482                "id": "calculate_stats",
483                "name": "Calculate Statistics",
484                "function": {
485                    "name": "statistics",
486                    "input": {
487                        "data_path": "data.numbers",
488                        "output_path": "data.analysis"
489                    }
490                }
491            },
492            {
493                "id": "enrich_user_data",
494                "name": "Enrich User Data",
495                "function": {
496                    "name": "enrich_data",
497                    "input": {
498                        "lookup_field": "user_id",
499                        "lookup_value": "user_456",
500                        "output_path": "data.employee_details"
501                    }
502                }
503            }
504        ]
505    }
506    "#;
507
508    let workflow2 = Workflow::from_json(workflow2_json)?;
509    engine.add_workflow(&workflow2);
510
511    match engine.process_message(&mut message2).await {
512        Ok(_) => {
513            println!("✅ Second message processed successfully!\n");
514            println!("📊 Results for user_456:");
515            println!("{}", serde_json::to_string_pretty(&message2.data)?);
516        }
517        Err(e) => {
518            println!("❌ Error processing second message: {e:?}");
519        }
520    }
521
522    println!("\n🎉 Custom function examples completed!");
523
524    Ok(())
525}
Source

pub fn register_task_function( &mut self, name: String, handler: Box<dyn AsyncFunctionHandler + Send + Sync>, )

Registers a custom function handler with the engine.

§Arguments
  • name - The name of the function handler
  • handler - The function handler implementation
Examples found in repository?
examples/custom_function.rs (lines 314-317)
307async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
308    println!("=== Custom Function Example ===\n");
309
310    // Create engine without built-in functions to demonstrate custom ones
311    let mut engine = Engine::new_empty();
312
313    // Register our custom functions
314    engine.register_task_function(
315        "statistics".to_string(),
316        Box::new(StatisticsFunction::new()),
317    );
318
319    engine.register_task_function(
320        "enrich_data".to_string(),
321        Box::new(DataEnrichmentFunction::new()),
322    );
323
324    // Also register built-in map function for data preparation
325    engine.register_task_function(
326        "map".to_string(),
327        Box::new(dataflow_rs::engine::functions::MapFunction::new()),
328    );
329
330    // Define a workflow that uses our custom functions
331    let workflow_json = r#"
332    {
333        "id": "custom_function_demo",
334        "name": "Custom Function Demo",
335        "description": "Demonstrates custom async functions in workflow",
336        "condition": { "==": [true, true] },
337        "tasks": [
338            {
339                "id": "prepare_data",
340                "name": "Prepare Data",
341                "description": "Extract and prepare data for analysis",
342                "function": {
343                    "name": "map",
344                    "input": {
345                        "mappings": [
346                            {
347                                "path": "data.numbers",
348                                "logic": { "var": "temp_data.measurements" }
349                            },
350                            {
351                                "path": "data.user_id",
352                                "logic": { "var": "temp_data.user_id" }
353                            }
354                        ]
355                    }
356                }
357            },
358            {
359                "id": "calculate_stats",
360                "name": "Calculate Statistics",
361                "description": "Calculate statistical measures from numeric data",
362                "function": {
363                    "name": "statistics",
364                    "input": {
365                        "data_path": "data.numbers",
366                        "output_path": "data.stats"
367                    }
368                }
369            },
370            {
371                "id": "enrich_user_data",
372                "name": "Enrich User Data",
373                "description": "Add additional user information",
374                "function": {
375                    "name": "enrich_data",
376                    "input": {
377                        "lookup_field": "user_id",
378                        "lookup_value": "user_123",
379                        "output_path": "data.user_info"
380                    }
381                }
382            }
383        ]
384    }
385    "#;
386
387    // Parse and add the workflow
388    let workflow = Workflow::from_json(workflow_json)?;
389    engine.add_workflow(&workflow);
390
391    // Create sample data
392    let sample_data = json!({
393        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
394        "user_id": "user_123",
395        "timestamp": "2024-01-15T10:30:00Z"
396    });
397
398    // Create and process message
399    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
400    message.temp_data = sample_data;
401    message.data = json!({});
402
403    println!("Processing message with custom functions...\n");
404
405    // Process the message through our custom workflow
406    match engine.process_message(&mut message).await {
407        Ok(_) => {
408            println!("✅ Message processed successfully!\n");
409
410            println!("📊 Final Results:");
411            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
412
413            println!("📋 Audit Trail:");
414            for (i, audit) in message.audit_trail.iter().enumerate() {
415                println!(
416                    "{}. Task: {} (Status: {})",
417                    i + 1,
418                    audit.task_id,
419                    audit.status_code
420                );
421                println!("   Timestamp: {}", audit.timestamp);
422                println!("   Changes: {} field(s) modified", audit.changes.len());
423            }
424
425            if message.has_errors() {
426                println!("\n⚠️  Errors encountered:");
427                for error in &message.errors {
428                    println!(
429                        "   - {}: {:?}",
430                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
431                        error.error_message
432                    );
433                }
434            }
435        }
436        Err(e) => {
437            println!("❌ Error processing message: {e:?}");
438        }
439    }
440
441    // Demonstrate another example with different data
442    let separator = "=".repeat(50);
443    println!("\n{separator}");
444    println!("=== Second Example with Different User ===\n");
445
446    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
447    message2.temp_data = json!({
448        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
449        "user_id": "user_456",
450        "timestamp": "2024-01-15T11:00:00Z"
451    });
452    message2.data = json!({});
453
454    // Create a workflow for the second user
455    let workflow2_json = r#"
456    {
457        "id": "custom_function_demo_2",
458        "name": "Custom Function Demo 2",
459        "description": "Second demo with different user",
460        "condition": { "==": [true, true] },
461        "tasks": [
462            {
463                "id": "prepare_data",
464                "name": "Prepare Data",
465                "function": {
466                    "name": "map",
467                    "input": {
468                        "mappings": [
469                            {
470                                "path": "data.numbers",
471                                "logic": { "var": "temp_data.measurements" }
472                            },
473                            {
474                                "path": "data.user_id",
475                                "logic": { "var": "temp_data.user_id" }
476                            }
477                        ]
478                    }
479                }
480            },
481            {
482                "id": "calculate_stats",
483                "name": "Calculate Statistics",
484                "function": {
485                    "name": "statistics",
486                    "input": {
487                        "data_path": "data.numbers",
488                        "output_path": "data.analysis"
489                    }
490                }
491            },
492            {
493                "id": "enrich_user_data",
494                "name": "Enrich User Data",
495                "function": {
496                    "name": "enrich_data",
497                    "input": {
498                        "lookup_field": "user_id",
499                        "lookup_value": "user_456",
500                        "output_path": "data.employee_details"
501                    }
502                }
503            }
504        ]
505    }
506    "#;
507
508    let workflow2 = Workflow::from_json(workflow2_json)?;
509    engine.add_workflow(&workflow2);
510
511    match engine.process_message(&mut message2).await {
512        Ok(_) => {
513            println!("✅ Second message processed successfully!\n");
514            println!("📊 Results for user_456:");
515            println!("{}", serde_json::to_string_pretty(&message2.data)?);
516        }
517        Err(e) => {
518            println!("❌ Error processing second message: {e:?}");
519        }
520    }
521
522    println!("\n🎉 Custom function examples completed!");
523
524    Ok(())
525}
Source

pub fn has_function(&self, name: &str) -> bool

Check if a function with the given name is registered

Source

pub async fn process_message(&self, message: &mut Message) -> Result<()>

Processes a message through workflows that match their conditions.

This async method:

  1. Iterates through workflows sequentially in deterministic order (sorted by ID)
  2. Evaluates conditions for each workflow right before execution
  3. Executes matching workflows one after another (not concurrently)
  4. Updates the message with processing results and audit trail

Workflows are executed sequentially because later workflows may depend on the results of earlier workflows, and their conditions may change based on modifications made by previous workflows.

§Arguments
  • message - The message to process
§Returns
  • Result<()> - Success or an error if processing failed
Examples found in repository?
examples/benchmark.rs (line 192)
169async fn run_async_benchmark(
170    engine: &Engine,
171    sample_user_data: &Value,
172    num_iterations: usize,
173) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
174    let mut total_duration = Duration::new(0, 0);
175    let mut min_duration = Duration::new(u64::MAX, 0);
176    let mut max_duration = Duration::new(0, 0);
177    let mut all_durations = Vec::with_capacity(num_iterations);
178    let mut error_count = 0;
179
180    println!("Starting async benchmark with {num_iterations} iterations...");
181
182    for i in 0..num_iterations {
183        let mut message = Message::new(&json!({}));
184        message.temp_data = sample_user_data.clone();
185        message.data = json!({});
186        message.metadata = json!({
187            "timestamp": chrono::Utc::now().to_rfc3339(),
188            "iteration": i
189        });
190
191        let start = Instant::now();
192        match engine.process_message(&mut message).await {
193            Ok(_) => {
194                let duration = start.elapsed();
195                all_durations.push(duration);
196                total_duration += duration;
197                min_duration = min_duration.min(duration);
198                max_duration = max_duration.max(duration);
199
200                // Check for processing errors
201                if message.has_errors() {
202                    error_count += 1;
203                    if error_count <= 5 {
204                        // Only print first 5 errors
205                        println!("Processing errors in iteration {}: {:?}", i, message.errors);
206                    }
207                }
208            }
209            Err(e) => {
210                error_count += 1;
211                if error_count <= 5 {
212                    println!("Error in iteration {i}: {e:?}");
213                }
214                // Still record the time even for errors
215                let duration = start.elapsed();
216                all_durations.push(duration);
217                total_duration += duration;
218                min_duration = min_duration.min(duration);
219                max_duration = max_duration.max(duration);
220            }
221        }
222
223        if (i + 1) % 1000 == 0 {
224            println!("Completed {} async iterations", i + 1);
225        }
226    }
227
228    if error_count > 0 {
229        println!("Total errors encountered: {error_count}");
230    }
231
232    // Sort durations for percentile calculations
233    all_durations.sort();
234
235    let p95_idx = (num_iterations as f64 * 0.95) as usize;
236    let p99_idx = (num_iterations as f64 * 0.99) as usize;
237    let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
238    let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
239    let avg_duration = total_duration / num_iterations as u32;
240
241    println!("\nAsync Benchmark Results (v{VERSION}):");
242    println!("  Iterations: {num_iterations}");
243    println!("  Errors: {error_count}");
244    println!("  Min time: {min_duration:?}");
245    println!("  Max time: {max_duration:?}");
246    println!("  Avg time: {avg_duration:?}");
247    println!("  95th percentile: {p95:?}");
248    println!("  99th percentile: {p99:?}");
249    println!("  Total time: {total_duration:?}");
250
251    Ok(BenchmarkResults {
252        iterations: num_iterations,
253        min_time: min_duration,
254        max_time: max_duration,
255        avg_time: avg_duration,
256        p95: *p95,
257        p99: *p99,
258        total_time: total_duration,
259    })
260}
261
262async fn run_sync_benchmark(
263    engine: &Engine,
264    sample_user_data: &Value,
265    num_iterations: usize,
266) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
267    let mut total_duration = Duration::new(0, 0);
268    let mut min_duration = Duration::new(u64::MAX, 0);
269    let mut max_duration = Duration::new(0, 0);
270    let mut all_durations = Vec::with_capacity(num_iterations);
271    let mut error_count = 0;
272
273    println!("Starting sync-style benchmark with {num_iterations} iterations...");
274
275    for i in 0..num_iterations {
276        let mut message = Message::new(&json!({}));
277        message.temp_data = sample_user_data.clone();
278        message.data = json!({});
279        message.metadata = json!({
280            "timestamp": chrono::Utc::now().to_rfc3339(),
281            "iteration": i
282        });
283
284        let start = Instant::now();
285        // Use tokio::task::block_in_place to simulate sync behavior
286        let result = tokio::task::block_in_place(|| {
287            tokio::runtime::Handle::current().block_on(engine.process_message(&mut message))
288        });
289
290        match result {
291            Ok(_) => {
292                let duration = start.elapsed();
293                all_durations.push(duration);
294                total_duration += duration;
295                min_duration = min_duration.min(duration);
296                max_duration = max_duration.max(duration);
297
298                if message.has_errors() {
299                    error_count += 1;
300                }
301            }
302            Err(e) => {
303                error_count += 1;
304                if error_count <= 5 {
305                    println!("Sync error in iteration {i}: {e:?}");
306                }
307                let duration = start.elapsed();
308                all_durations.push(duration);
309                total_duration += duration;
310                min_duration = min_duration.min(duration);
311                max_duration = max_duration.max(duration);
312            }
313        }
314
315        if (i + 1) % 1000 == 0 {
316            println!("Completed {} sync iterations", i + 1);
317        }
318    }
319
320    if error_count > 0 {
321        println!("Total sync errors encountered: {error_count}");
322    }
323
324    all_durations.sort();
325
326    let p95_idx = (num_iterations as f64 * 0.95) as usize;
327    let p99_idx = (num_iterations as f64 * 0.99) as usize;
328    let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
329    let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
330    let avg_duration = total_duration / num_iterations as u32;
331
332    println!("\nSync Benchmark Results (v{VERSION}):");
333    println!("  Iterations: {num_iterations}");
334    println!("  Errors: {error_count}");
335    println!("  Min time: {min_duration:?}");
336    println!("  Max time: {max_duration:?}");
337    println!("  Avg time: {avg_duration:?}");
338    println!("  95th percentile: {p95:?}");
339    println!("  99th percentile: {p99:?}");
340    println!("  Total time: {total_duration:?}");
341
342    Ok(BenchmarkResults {
343        iterations: num_iterations,
344        min_time: min_duration,
345        max_time: max_duration,
346        avg_time: avg_duration,
347        p95: *p95,
348        p99: *p99,
349        total_time: total_duration,
350    })
351}
More examples
Hide additional examples
examples/complete_workflow.rs (line 141)
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6    // Create the workflow engine (built-in functions are auto-registered)
7    let mut engine = Engine::new();
8
9    // Define a workflow that:
10    // 1. Fetches data from a public API
11    // 2. Enriches the message with transformed data
12    // 3. Validates the enriched data
13    let workflow_json = r#"
14    {
15        "id": "complete_workflow",
16        "name": "Complete Workflow Example",
17        "priority": 0,
18        "description": "Demonstrates fetch -> enrich -> validate flow",
19        "condition": { "==": [true, true] },
20        "tasks": [
21            {
22                "id": "fetch_user_data",
23                "name": "Fetch User Data",
24                "description": "Get user data from a public API",
25                "function": {
26                    "name": "http",
27                    "input": {
28                        "url": "https://jsonplaceholder.typicode.com/users/1",
29                        "method": "GET",
30                        "headers": {
31                            "Accept": "application/json"
32                        }
33                    }
34                }
35            },
36            {
37                "id": "initialize_user",
38                "name": "Initialize User Structure",
39                "description": "Create empty user object in data",
40                "function": {
41                    "name": "map",
42                    "input": {
43                        "mappings": [
44                            {
45                                "path": "data",
46                                "logic": { "preserve": {"user": {}} }
47                            }
48                        ]
49                    }
50                }
51            },
52            {
53                "id": "transform_data",
54                "name": "Transform Data",
55                "description": "Map API response to our data model",
56                "function": {
57                    "name": "map",
58                    "input": {
59                        "mappings": [
60                            {
61                                "path": "data.user.id", 
62                                "logic": { "var": "temp_data.body.id" }
63                            },
64                            {
65                                "path": "data.user.name", 
66                                "logic": { "var": "temp_data.body.name" }
67                            },
68                            {
69                                "path": "data.user.email", 
70                                "logic": { "var": "temp_data.body.email" }
71                            },
72                            {
73                                "path": "data.user.address", 
74                                "logic": {
75                                    "cat": [
76                                        { "var": "temp_data.body.address.street" },
77                                        ", ",
78                                        { "var": "temp_data.body.address.city" }
79                                    ]
80                                }
81                            },
82                            {
83                                "path": "data.user.company", 
84                                "logic": { "var": "temp_data.body.company.name" }
85                            }
86                        ]
87                    }
88                }
89            },
90            {
91                "id": "validate_user_data",
92                "name": "Validate User Data",
93                "description": "Ensure the user data meets our requirements",
94                "function": {
95                    "name": "validate",
96                    "input": {
97                        "rules": [
98                            {
99                                "path": "data",
100                                "logic": { "!!": { "var": "data.user.id" } },
101                                "message": "User ID is required"
102                            },
103                            {
104                                "path": "data",
105                                "logic": { "!!": { "var": "data.user.name" } },
106                                "message": "User name is required"
107                            },
108                            {
109                                "path": "data",
110                                "logic": { "!!": { "var": "data.user.email" } },
111                                "message": "User email is required"
112                            },
113                            {
114                                "path": "data",
115                                "logic": {
116                                    "in": [
117                                        "@",
118                                        { "var": "data.user.email" }
119                                    ]
120                                },
121                                "message": "Email must be valid format"
122                            }
123                        ]
124                    }
125                }
126            }
127        ]
128    }
129    "#;
130
131    // Parse and add the workflow to the engine
132    let workflow = Workflow::from_json(workflow_json)?;
133    engine.add_workflow(&workflow);
134
135    // Create a message to process with properly initialized data structure
136    let mut message = Message::new(&json!({}));
137
138    // Process the message through the workflow asynchronously
139    println!("Processing message through workflow...");
140
141    match engine.process_message(&mut message).await {
142        Ok(_) => {
143            println!("Workflow completed successfully!");
144        }
145        Err(e) => {
146            eprintln!("Error executing workflow: {e:?}");
147            if !message.errors.is_empty() {
148                println!("\nErrors recorded in message:");
149                for err in &message.errors {
150                    println!(
151                        "- Workflow: {:?}, Task: {:?}, Error: {:?}",
152                        err.workflow_id, err.task_id, err.error_message
153                    );
154                }
155            }
156        }
157    }
158
159    println!(
160        "\nFull message structure:\n{}",
161        serde_json::to_string_pretty(&message)?
162    );
163
164    Ok(())
165}
examples/custom_function.rs (line 406)
307async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
308    println!("=== Custom Function Example ===\n");
309
310    // Create engine without built-in functions to demonstrate custom ones
311    let mut engine = Engine::new_empty();
312
313    // Register our custom functions
314    engine.register_task_function(
315        "statistics".to_string(),
316        Box::new(StatisticsFunction::new()),
317    );
318
319    engine.register_task_function(
320        "enrich_data".to_string(),
321        Box::new(DataEnrichmentFunction::new()),
322    );
323
324    // Also register built-in map function for data preparation
325    engine.register_task_function(
326        "map".to_string(),
327        Box::new(dataflow_rs::engine::functions::MapFunction::new()),
328    );
329
330    // Define a workflow that uses our custom functions
331    let workflow_json = r#"
332    {
333        "id": "custom_function_demo",
334        "name": "Custom Function Demo",
335        "description": "Demonstrates custom async functions in workflow",
336        "condition": { "==": [true, true] },
337        "tasks": [
338            {
339                "id": "prepare_data",
340                "name": "Prepare Data",
341                "description": "Extract and prepare data for analysis",
342                "function": {
343                    "name": "map",
344                    "input": {
345                        "mappings": [
346                            {
347                                "path": "data.numbers",
348                                "logic": { "var": "temp_data.measurements" }
349                            },
350                            {
351                                "path": "data.user_id",
352                                "logic": { "var": "temp_data.user_id" }
353                            }
354                        ]
355                    }
356                }
357            },
358            {
359                "id": "calculate_stats",
360                "name": "Calculate Statistics",
361                "description": "Calculate statistical measures from numeric data",
362                "function": {
363                    "name": "statistics",
364                    "input": {
365                        "data_path": "data.numbers",
366                        "output_path": "data.stats"
367                    }
368                }
369            },
370            {
371                "id": "enrich_user_data",
372                "name": "Enrich User Data",
373                "description": "Add additional user information",
374                "function": {
375                    "name": "enrich_data",
376                    "input": {
377                        "lookup_field": "user_id",
378                        "lookup_value": "user_123",
379                        "output_path": "data.user_info"
380                    }
381                }
382            }
383        ]
384    }
385    "#;
386
387    // Parse and add the workflow
388    let workflow = Workflow::from_json(workflow_json)?;
389    engine.add_workflow(&workflow);
390
391    // Create sample data
392    let sample_data = json!({
393        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
394        "user_id": "user_123",
395        "timestamp": "2024-01-15T10:30:00Z"
396    });
397
398    // Create and process message
399    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
400    message.temp_data = sample_data;
401    message.data = json!({});
402
403    println!("Processing message with custom functions...\n");
404
405    // Process the message through our custom workflow
406    match engine.process_message(&mut message).await {
407        Ok(_) => {
408            println!("✅ Message processed successfully!\n");
409
410            println!("📊 Final Results:");
411            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
412
413            println!("📋 Audit Trail:");
414            for (i, audit) in message.audit_trail.iter().enumerate() {
415                println!(
416                    "{}. Task: {} (Status: {})",
417                    i + 1,
418                    audit.task_id,
419                    audit.status_code
420                );
421                println!("   Timestamp: {}", audit.timestamp);
422                println!("   Changes: {} field(s) modified", audit.changes.len());
423            }
424
425            if message.has_errors() {
426                println!("\n⚠️  Errors encountered:");
427                for error in &message.errors {
428                    println!(
429                        "   - {}: {:?}",
430                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
431                        error.error_message
432                    );
433                }
434            }
435        }
436        Err(e) => {
437            println!("❌ Error processing message: {e:?}");
438        }
439    }
440
441    // Demonstrate another example with different data
442    let separator = "=".repeat(50);
443    println!("\n{separator}");
444    println!("=== Second Example with Different User ===\n");
445
446    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
447    message2.temp_data = json!({
448        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
449        "user_id": "user_456",
450        "timestamp": "2024-01-15T11:00:00Z"
451    });
452    message2.data = json!({});
453
454    // Create a workflow for the second user
455    let workflow2_json = r#"
456    {
457        "id": "custom_function_demo_2",
458        "name": "Custom Function Demo 2",
459        "description": "Second demo with different user",
460        "condition": { "==": [true, true] },
461        "tasks": [
462            {
463                "id": "prepare_data",
464                "name": "Prepare Data",
465                "function": {
466                    "name": "map",
467                    "input": {
468                        "mappings": [
469                            {
470                                "path": "data.numbers",
471                                "logic": { "var": "temp_data.measurements" }
472                            },
473                            {
474                                "path": "data.user_id",
475                                "logic": { "var": "temp_data.user_id" }
476                            }
477                        ]
478                    }
479                }
480            },
481            {
482                "id": "calculate_stats",
483                "name": "Calculate Statistics",
484                "function": {
485                    "name": "statistics",
486                    "input": {
487                        "data_path": "data.numbers",
488                        "output_path": "data.analysis"
489                    }
490                }
491            },
492            {
493                "id": "enrich_user_data",
494                "name": "Enrich User Data",
495                "function": {
496                    "name": "enrich_data",
497                    "input": {
498                        "lookup_field": "user_id",
499                        "lookup_value": "user_456",
500                        "output_path": "data.employee_details"
501                    }
502                }
503            }
504        ]
505    }
506    "#;
507
508    let workflow2 = Workflow::from_json(workflow2_json)?;
509    engine.add_workflow(&workflow2);
510
511    match engine.process_message(&mut message2).await {
512        Ok(_) => {
513            println!("✅ Second message processed successfully!\n");
514            println!("📊 Results for user_456:");
515            println!("{}", serde_json::to_string_pretty(&message2.data)?);
516        }
517        Err(e) => {
518            println!("❌ Error processing second message: {e:?}");
519        }
520    }
521
522    println!("\n🎉 Custom function examples completed!");
523
524    Ok(())
525}

Trait Implementations§

Source§

impl Default for Engine

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

§

impl Freeze for Engine

§

impl !RefUnwindSafe for Engine

§

impl Send for Engine

§

impl Sync for Engine

§

impl Unpin for Engine

§

impl !UnwindSafe for Engine

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,