Struct Engine

Source
pub struct Engine { /* private fields */ }
Expand description

Engine that processes messages through workflows using non-blocking async IO.

This engine is optimized for IO-bound workloads like HTTP requests, database access, and file operations. It uses Tokio for efficient async task execution.

Workflows are processed sequentially to ensure that later workflows can depend on the results of earlier workflows.

Implementations§

Source§

impl Engine

Source

pub fn new() -> Self

Creates a new Engine instance with built-in function handlers pre-registered.

§Example
use dataflow_rs::Engine;

let engine = Engine::new();
Examples found in repository?
examples/benchmark.rs (line 14)
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13    // Create the workflow engine (built-in functions are auto-registered)
14    let mut engine = Engine::new();
15
16    // Define a workflow that:
17    // 1. Uses pre-loaded data instead of HTTP fetch
18    // 2. Enriches the message with transformed data
19    // 3. Demonstrates proper async workflow execution
20    let workflow_json = r#"
21    {
22        "id": "benchmark_workflow",
23        "name": "Benchmark Workflow Example",
24        "description": "Demonstrates async workflow execution with data transformation",
25        "condition": { "==": [true, true] },
26        "tasks": [
27            {
28                "id": "transform_data",
29                "name": "Transform Data",
30                "description": "Map API response to our data model",
31                "function": {
32                    "name": "map",
33                    "input": {
34                        "mappings": [
35                            {
36                                "path": "data.user.id", 
37                                "logic": { "var": "temp_data.body.id" }
38                            },
39                            {
40                                "path": "data.user.name", 
41                                "logic": { "var": "temp_data.body.name" }
42                            },
43                            {
44                                "path": "data.user.email", 
45                                "logic": { "var": "temp_data.body.email" }
46                            },
47                            {
48                                "path": "data.user.address", 
49                                "logic": {
50                                    "cat": [
51                                        { "var": "temp_data.body.address.street" },
52                                        ", ",
53                                        { "var": "temp_data.body.address.city" }
54                                    ]
55                                }
56                            },
57                            {
58                                "path": "data.user.company", 
59                                "logic": { "var": "temp_data.body.company.name" }
60                            },
61                            {
62                                "path": "data.processed_at", 
63                                "logic": { "cat": ["Processed at ", { "var": "metadata.timestamp" }] }
64                            }
65                        ]
66                    }
67                }
68            }
69        ]
70    }
71    "#;
72
73    // Parse and add the workflow to the engine
74    let workflow = Workflow::from_json(workflow_json)?;
75    engine.add_workflow(&workflow);
76
77    // Create sample user data (similar to what the HTTP endpoint would return)
78    let sample_user_data = json!({
79        "body": {
80            "id": 1,
81            "name": "Leanne Graham",
82            "username": "Bret",
83            "email": "Sincere@april.biz",
84            "address": {
85                "street": "Kulas Light",
86                "suite": "Apt. 556",
87                "city": "Gwenborough",
88                "zipcode": "92998-3874",
89                "geo": {
90                    "lat": "-37.3159",
91                    "lng": "81.1496"
92                }
93            },
94            "phone": "1-770-736-8031 x56442",
95            "website": "hildegard.org",
96            "company": {
97                "name": "Romaguera-Crona",
98                "catchPhrase": "Multi-layered client-server neural-net",
99                "bs": "harness real-time e-markets"
100            }
101        }
102    });
103
104    // Run async benchmark
105    println!("=== ASYNC BENCHMARK ===");
106    let async_results = run_async_benchmark(&engine, &sample_user_data, 1000).await?;
107
108    // Run sync benchmark for comparison (using blocking approach)
109    println!("\n=== SYNC BENCHMARK (for comparison) ===");
110    let sync_results = run_sync_benchmark(&engine, &sample_user_data, 1000).await?;
111
112    // Compare results
113    println!("\n=== COMPARISON ===");
114    println!("Async avg: {:?}", async_results.avg_time);
115    println!("Sync avg:  {:?}", sync_results.avg_time);
116    println!(
117        "Async is {:.2}x {} than sync",
118        if async_results.avg_time < sync_results.avg_time {
119            sync_results.avg_time.as_nanos() as f64 / async_results.avg_time.as_nanos() as f64
120        } else {
121            async_results.avg_time.as_nanos() as f64 / sync_results.avg_time.as_nanos() as f64
122        },
123        if async_results.avg_time < sync_results.avg_time {
124            "faster"
125        } else {
126            "slower"
127        }
128    );
129
130    // Log results to file
131    log_benchmark_results(
132        async_results.iterations,
133        async_results.min_time,
134        async_results.max_time,
135        async_results.avg_time,
136        async_results.p95,
137        async_results.p99,
138        async_results.total_time,
139        "async".to_string(),
140    )?;
141
142    log_benchmark_results(
143        sync_results.iterations,
144        sync_results.min_time,
145        sync_results.max_time,
146        sync_results.avg_time,
147        sync_results.p95,
148        sync_results.p99,
149        sync_results.total_time,
150        "sync".to_string(),
151    )?;
152
153    println!("\nBenchmark results saved to '{}'", BENCHMARK_LOG_FILE);
154
155    Ok(())
156}
More examples
Hide additional examples
examples/complete_workflow.rs (line 7)
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6    // Create the workflow engine (built-in functions are auto-registered)
7    let mut engine = Engine::new();
8
9    // Define a workflow that:
10    // 1. Fetches data from a public API
11    // 2. Enriches the message with transformed data
12    // 3. Validates the enriched data
13    let workflow_json = r#"
14    {
15        "id": "complete_workflow",
16        "name": "Complete Workflow Example",
17        "priority": 0,
18        "description": "Demonstrates fetch -> enrich -> validate flow",
19        "condition": { "==": [true, true] },
20        "tasks": [
21            {
22                "id": "fetch_user_data",
23                "name": "Fetch User Data",
24                "description": "Get user data from a public API",
25                "function": {
26                    "name": "http",
27                    "input": {
28                        "url": "https://jsonplaceholder.typicode.com/users/1",
29                        "method": "GET",
30                        "headers": {
31                            "Accept": "application/json"
32                        }
33                    }
34                }
35            },
36            {
37                "id": "initialize_user",
38                "name": "Initialize User Structure",
39                "description": "Create empty user object in data",
40                "function": {
41                    "name": "map",
42                    "input": {
43                        "mappings": [
44                            {
45                                "path": "data",
46                                "logic": { "preserve": {"user": {}} }
47                            }
48                        ]
49                    }
50                }
51            },
52            {
53                "id": "transform_data",
54                "name": "Transform Data",
55                "description": "Map API response to our data model",
56                "function": {
57                    "name": "map",
58                    "input": {
59                        "mappings": [
60                            {
61                                "path": "data.user.id", 
62                                "logic": { "var": "temp_data.body.id" }
63                            },
64                            {
65                                "path": "data.user.name", 
66                                "logic": { "var": "temp_data.body.name" }
67                            },
68                            {
69                                "path": "data.user.email", 
70                                "logic": { "var": "temp_data.body.email" }
71                            },
72                            {
73                                "path": "data.user.address", 
74                                "logic": {
75                                    "cat": [
76                                        { "var": "temp_data.body.address.street" },
77                                        ", ",
78                                        { "var": "temp_data.body.address.city" }
79                                    ]
80                                }
81                            },
82                            {
83                                "path": "data.user.company", 
84                                "logic": { "var": "temp_data.body.company.name" }
85                            }
86                        ]
87                    }
88                }
89            },
90            {
91                "id": "validate_user_data",
92                "name": "Validate User Data",
93                "description": "Ensure the user data meets our requirements",
94                "function": {
95                    "name": "validate",
96                    "input": {
97                        "rules": [
98                            {
99                                "path": "data",
100                                "logic": { "!!": { "var": "data.user.id" } },
101                                "message": "User ID is required"
102                            },
103                            {
104                                "path": "data",
105                                "logic": { "!!": { "var": "data.user.name" } },
106                                "message": "User name is required"
107                            },
108                            {
109                                "path": "data",
110                                "logic": { "!!": { "var": "data.user.email" } },
111                                "message": "User email is required"
112                            },
113                            {
114                                "path": "data",
115                                "logic": {
116                                    "in": [
117                                        "@",
118                                        { "var": "data.user.email" }
119                                    ]
120                                },
121                                "message": "Email must be valid format"
122                            }
123                        ]
124                    }
125                }
126            }
127        ]
128    }
129    "#;
130
131    // Parse and add the workflow to the engine
132    let workflow = Workflow::from_json(workflow_json)?;
133    engine.add_workflow(&workflow);
134
135    // Create a message to process with properly initialized data structure
136    let mut message = Message::new(&json!({}));
137
138    // Process the message through the workflow asynchronously
139    println!("Processing message through workflow...");
140
141    match engine.process_message(&mut message).await {
142        Ok(_) => {
143            println!("Workflow completed successfully!");
144        }
145        Err(e) => {
146            eprintln!("Error executing workflow: {:?}", e);
147            if !message.errors.is_empty() {
148                println!("\nErrors recorded in message:");
149                for err in &message.errors {
150                    println!(
151                        "- Workflow: {:?}, Task: {:?}, Error: {:?}",
152                        err.workflow_id, err.task_id, err.error_message
153                    );
154                }
155            }
156        }
157    }
158
159    println!(
160        "\nFull message structure:\n{}",
161        serde_json::to_string_pretty(&message)?
162    );
163
164    Ok(())
165}
Source

pub fn new_empty() -> Self

Create a new engine instance without any pre-registered functions

Examples found in repository?
examples/custom_function.rs (line 311)
307async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
308    println!("=== Custom Function Example ===\n");
309
310    // Create engine without built-in functions to demonstrate custom ones
311    let mut engine = Engine::new_empty();
312
313    // Register our custom functions
314    engine.register_task_function(
315        "statistics".to_string(),
316        Box::new(StatisticsFunction::new()),
317    );
318
319    engine.register_task_function(
320        "enrich_data".to_string(),
321        Box::new(DataEnrichmentFunction::new()),
322    );
323
324    // Also register built-in map function for data preparation
325    engine.register_task_function(
326        "map".to_string(),
327        Box::new(dataflow_rs::engine::functions::MapFunction::new()),
328    );
329
330    // Define a workflow that uses our custom functions
331    let workflow_json = r#"
332    {
333        "id": "custom_function_demo",
334        "name": "Custom Function Demo",
335        "description": "Demonstrates custom async functions in workflow",
336        "condition": { "==": [true, true] },
337        "tasks": [
338            {
339                "id": "prepare_data",
340                "name": "Prepare Data",
341                "description": "Extract and prepare data for analysis",
342                "function": {
343                    "name": "map",
344                    "input": {
345                        "mappings": [
346                            {
347                                "path": "data.numbers",
348                                "logic": { "var": "temp_data.measurements" }
349                            },
350                            {
351                                "path": "data.user_id",
352                                "logic": { "var": "temp_data.user_id" }
353                            }
354                        ]
355                    }
356                }
357            },
358            {
359                "id": "calculate_stats",
360                "name": "Calculate Statistics",
361                "description": "Calculate statistical measures from numeric data",
362                "function": {
363                    "name": "statistics",
364                    "input": {
365                        "data_path": "data.numbers",
366                        "output_path": "data.stats"
367                    }
368                }
369            },
370            {
371                "id": "enrich_user_data",
372                "name": "Enrich User Data",
373                "description": "Add additional user information",
374                "function": {
375                    "name": "enrich_data",
376                    "input": {
377                        "lookup_field": "user_id",
378                        "lookup_value": "user_123",
379                        "output_path": "data.user_info"
380                    }
381                }
382            }
383        ]
384    }
385    "#;
386
387    // Parse and add the workflow
388    let workflow = Workflow::from_json(workflow_json)?;
389    engine.add_workflow(&workflow);
390
391    // Create sample data
392    let sample_data = json!({
393        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
394        "user_id": "user_123",
395        "timestamp": "2024-01-15T10:30:00Z"
396    });
397
398    // Create and process message
399    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
400    message.temp_data = sample_data;
401    message.data = json!({});
402
403    println!("Processing message with custom functions...\n");
404
405    // Process the message through our custom workflow
406    match engine.process_message(&mut message).await {
407        Ok(_) => {
408            println!("✅ Message processed successfully!\n");
409
410            println!("📊 Final Results:");
411            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
412
413            println!("📋 Audit Trail:");
414            for (i, audit) in message.audit_trail.iter().enumerate() {
415                println!(
416                    "{}. Task: {} (Status: {})",
417                    i + 1,
418                    audit.task_id,
419                    audit.status_code
420                );
421                println!("   Timestamp: {}", audit.timestamp);
422                println!("   Changes: {} field(s) modified", audit.changes.len());
423            }
424
425            if message.has_errors() {
426                println!("\n⚠️  Errors encountered:");
427                for error in &message.errors {
428                    println!(
429                        "   - {}: {:?}",
430                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
431                        error.error_message
432                    );
433                }
434            }
435        }
436        Err(e) => {
437            println!("❌ Error processing message: {:?}", e);
438        }
439    }
440
441    // Demonstrate another example with different data
442    let separator = "=".repeat(50);
443    println!("\n{}", separator);
444    println!("=== Second Example with Different User ===\n");
445
446    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
447    message2.temp_data = json!({
448        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
449        "user_id": "user_456",
450        "timestamp": "2024-01-15T11:00:00Z"
451    });
452    message2.data = json!({});
453
454    // Create a workflow for the second user
455    let workflow2_json = r#"
456    {
457        "id": "custom_function_demo_2",
458        "name": "Custom Function Demo 2",
459        "description": "Second demo with different user",
460        "condition": { "==": [true, true] },
461        "tasks": [
462            {
463                "id": "prepare_data",
464                "name": "Prepare Data",
465                "function": {
466                    "name": "map",
467                    "input": {
468                        "mappings": [
469                            {
470                                "path": "data.numbers",
471                                "logic": { "var": "temp_data.measurements" }
472                            },
473                            {
474                                "path": "data.user_id",
475                                "logic": { "var": "temp_data.user_id" }
476                            }
477                        ]
478                    }
479                }
480            },
481            {
482                "id": "calculate_stats",
483                "name": "Calculate Statistics",
484                "function": {
485                    "name": "statistics",
486                    "input": {
487                        "data_path": "data.numbers",
488                        "output_path": "data.analysis"
489                    }
490                }
491            },
492            {
493                "id": "enrich_user_data",
494                "name": "Enrich User Data",
495                "function": {
496                    "name": "enrich_data",
497                    "input": {
498                        "lookup_field": "user_id",
499                        "lookup_value": "user_456",
500                        "output_path": "data.employee_details"
501                    }
502                }
503            }
504        ]
505    }
506    "#;
507
508    let workflow2 = Workflow::from_json(workflow2_json)?;
509    engine.add_workflow(&workflow2);
510
511    match engine.process_message(&mut message2).await {
512        Ok(_) => {
513            println!("✅ Second message processed successfully!\n");
514            println!("📊 Results for user_456:");
515            println!("{}", serde_json::to_string_pretty(&message2.data)?);
516        }
517        Err(e) => {
518            println!("❌ Error processing second message: {:?}", e);
519        }
520    }
521
522    println!("\n🎉 Custom function examples completed!");
523
524    Ok(())
525}
Source

pub fn with_retry_config(self, config: RetryConfig) -> Self

Configure retry behavior

Source

pub fn add_workflow(&mut self, workflow: &Workflow)

Adds a workflow to the engine.

§Arguments
  • workflow - The workflow to add
Examples found in repository?
examples/benchmark.rs (line 75)
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13    // Create the workflow engine (built-in functions are auto-registered)
14    let mut engine = Engine::new();
15
16    // Define a workflow that:
17    // 1. Uses pre-loaded data instead of HTTP fetch
18    // 2. Enriches the message with transformed data
19    // 3. Demonstrates proper async workflow execution
20    let workflow_json = r#"
21    {
22        "id": "benchmark_workflow",
23        "name": "Benchmark Workflow Example",
24        "description": "Demonstrates async workflow execution with data transformation",
25        "condition": { "==": [true, true] },
26        "tasks": [
27            {
28                "id": "transform_data",
29                "name": "Transform Data",
30                "description": "Map API response to our data model",
31                "function": {
32                    "name": "map",
33                    "input": {
34                        "mappings": [
35                            {
36                                "path": "data.user.id", 
37                                "logic": { "var": "temp_data.body.id" }
38                            },
39                            {
40                                "path": "data.user.name", 
41                                "logic": { "var": "temp_data.body.name" }
42                            },
43                            {
44                                "path": "data.user.email", 
45                                "logic": { "var": "temp_data.body.email" }
46                            },
47                            {
48                                "path": "data.user.address", 
49                                "logic": {
50                                    "cat": [
51                                        { "var": "temp_data.body.address.street" },
52                                        ", ",
53                                        { "var": "temp_data.body.address.city" }
54                                    ]
55                                }
56                            },
57                            {
58                                "path": "data.user.company", 
59                                "logic": { "var": "temp_data.body.company.name" }
60                            },
61                            {
62                                "path": "data.processed_at", 
63                                "logic": { "cat": ["Processed at ", { "var": "metadata.timestamp" }] }
64                            }
65                        ]
66                    }
67                }
68            }
69        ]
70    }
71    "#;
72
73    // Parse and add the workflow to the engine
74    let workflow = Workflow::from_json(workflow_json)?;
75    engine.add_workflow(&workflow);
76
77    // Create sample user data (similar to what the HTTP endpoint would return)
78    let sample_user_data = json!({
79        "body": {
80            "id": 1,
81            "name": "Leanne Graham",
82            "username": "Bret",
83            "email": "Sincere@april.biz",
84            "address": {
85                "street": "Kulas Light",
86                "suite": "Apt. 556",
87                "city": "Gwenborough",
88                "zipcode": "92998-3874",
89                "geo": {
90                    "lat": "-37.3159",
91                    "lng": "81.1496"
92                }
93            },
94            "phone": "1-770-736-8031 x56442",
95            "website": "hildegard.org",
96            "company": {
97                "name": "Romaguera-Crona",
98                "catchPhrase": "Multi-layered client-server neural-net",
99                "bs": "harness real-time e-markets"
100            }
101        }
102    });
103
104    // Run async benchmark
105    println!("=== ASYNC BENCHMARK ===");
106    let async_results = run_async_benchmark(&engine, &sample_user_data, 1000).await?;
107
108    // Run sync benchmark for comparison (using blocking approach)
109    println!("\n=== SYNC BENCHMARK (for comparison) ===");
110    let sync_results = run_sync_benchmark(&engine, &sample_user_data, 1000).await?;
111
112    // Compare results
113    println!("\n=== COMPARISON ===");
114    println!("Async avg: {:?}", async_results.avg_time);
115    println!("Sync avg:  {:?}", sync_results.avg_time);
116    println!(
117        "Async is {:.2}x {} than sync",
118        if async_results.avg_time < sync_results.avg_time {
119            sync_results.avg_time.as_nanos() as f64 / async_results.avg_time.as_nanos() as f64
120        } else {
121            async_results.avg_time.as_nanos() as f64 / sync_results.avg_time.as_nanos() as f64
122        },
123        if async_results.avg_time < sync_results.avg_time {
124            "faster"
125        } else {
126            "slower"
127        }
128    );
129
130    // Log results to file
131    log_benchmark_results(
132        async_results.iterations,
133        async_results.min_time,
134        async_results.max_time,
135        async_results.avg_time,
136        async_results.p95,
137        async_results.p99,
138        async_results.total_time,
139        "async".to_string(),
140    )?;
141
142    log_benchmark_results(
143        sync_results.iterations,
144        sync_results.min_time,
145        sync_results.max_time,
146        sync_results.avg_time,
147        sync_results.p95,
148        sync_results.p99,
149        sync_results.total_time,
150        "sync".to_string(),
151    )?;
152
153    println!("\nBenchmark results saved to '{}'", BENCHMARK_LOG_FILE);
154
155    Ok(())
156}
More examples
Hide additional examples
examples/complete_workflow.rs (line 133)
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6    // Create the workflow engine (built-in functions are auto-registered)
7    let mut engine = Engine::new();
8
9    // Define a workflow that:
10    // 1. Fetches data from a public API
11    // 2. Enriches the message with transformed data
12    // 3. Validates the enriched data
13    let workflow_json = r#"
14    {
15        "id": "complete_workflow",
16        "name": "Complete Workflow Example",
17        "priority": 0,
18        "description": "Demonstrates fetch -> enrich -> validate flow",
19        "condition": { "==": [true, true] },
20        "tasks": [
21            {
22                "id": "fetch_user_data",
23                "name": "Fetch User Data",
24                "description": "Get user data from a public API",
25                "function": {
26                    "name": "http",
27                    "input": {
28                        "url": "https://jsonplaceholder.typicode.com/users/1",
29                        "method": "GET",
30                        "headers": {
31                            "Accept": "application/json"
32                        }
33                    }
34                }
35            },
36            {
37                "id": "initialize_user",
38                "name": "Initialize User Structure",
39                "description": "Create empty user object in data",
40                "function": {
41                    "name": "map",
42                    "input": {
43                        "mappings": [
44                            {
45                                "path": "data",
46                                "logic": { "preserve": {"user": {}} }
47                            }
48                        ]
49                    }
50                }
51            },
52            {
53                "id": "transform_data",
54                "name": "Transform Data",
55                "description": "Map API response to our data model",
56                "function": {
57                    "name": "map",
58                    "input": {
59                        "mappings": [
60                            {
61                                "path": "data.user.id", 
62                                "logic": { "var": "temp_data.body.id" }
63                            },
64                            {
65                                "path": "data.user.name", 
66                                "logic": { "var": "temp_data.body.name" }
67                            },
68                            {
69                                "path": "data.user.email", 
70                                "logic": { "var": "temp_data.body.email" }
71                            },
72                            {
73                                "path": "data.user.address", 
74                                "logic": {
75                                    "cat": [
76                                        { "var": "temp_data.body.address.street" },
77                                        ", ",
78                                        { "var": "temp_data.body.address.city" }
79                                    ]
80                                }
81                            },
82                            {
83                                "path": "data.user.company", 
84                                "logic": { "var": "temp_data.body.company.name" }
85                            }
86                        ]
87                    }
88                }
89            },
90            {
91                "id": "validate_user_data",
92                "name": "Validate User Data",
93                "description": "Ensure the user data meets our requirements",
94                "function": {
95                    "name": "validate",
96                    "input": {
97                        "rules": [
98                            {
99                                "path": "data",
100                                "logic": { "!!": { "var": "data.user.id" } },
101                                "message": "User ID is required"
102                            },
103                            {
104                                "path": "data",
105                                "logic": { "!!": { "var": "data.user.name" } },
106                                "message": "User name is required"
107                            },
108                            {
109                                "path": "data",
110                                "logic": { "!!": { "var": "data.user.email" } },
111                                "message": "User email is required"
112                            },
113                            {
114                                "path": "data",
115                                "logic": {
116                                    "in": [
117                                        "@",
118                                        { "var": "data.user.email" }
119                                    ]
120                                },
121                                "message": "Email must be valid format"
122                            }
123                        ]
124                    }
125                }
126            }
127        ]
128    }
129    "#;
130
131    // Parse and add the workflow to the engine
132    let workflow = Workflow::from_json(workflow_json)?;
133    engine.add_workflow(&workflow);
134
135    // Create a message to process with properly initialized data structure
136    let mut message = Message::new(&json!({}));
137
138    // Process the message through the workflow asynchronously
139    println!("Processing message through workflow...");
140
141    match engine.process_message(&mut message).await {
142        Ok(_) => {
143            println!("Workflow completed successfully!");
144        }
145        Err(e) => {
146            eprintln!("Error executing workflow: {:?}", e);
147            if !message.errors.is_empty() {
148                println!("\nErrors recorded in message:");
149                for err in &message.errors {
150                    println!(
151                        "- Workflow: {:?}, Task: {:?}, Error: {:?}",
152                        err.workflow_id, err.task_id, err.error_message
153                    );
154                }
155            }
156        }
157    }
158
159    println!(
160        "\nFull message structure:\n{}",
161        serde_json::to_string_pretty(&message)?
162    );
163
164    Ok(())
165}
examples/custom_function.rs (line 389)
307async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
308    println!("=== Custom Function Example ===\n");
309
310    // Create engine without built-in functions to demonstrate custom ones
311    let mut engine = Engine::new_empty();
312
313    // Register our custom functions
314    engine.register_task_function(
315        "statistics".to_string(),
316        Box::new(StatisticsFunction::new()),
317    );
318
319    engine.register_task_function(
320        "enrich_data".to_string(),
321        Box::new(DataEnrichmentFunction::new()),
322    );
323
324    // Also register built-in map function for data preparation
325    engine.register_task_function(
326        "map".to_string(),
327        Box::new(dataflow_rs::engine::functions::MapFunction::new()),
328    );
329
330    // Define a workflow that uses our custom functions
331    let workflow_json = r#"
332    {
333        "id": "custom_function_demo",
334        "name": "Custom Function Demo",
335        "description": "Demonstrates custom async functions in workflow",
336        "condition": { "==": [true, true] },
337        "tasks": [
338            {
339                "id": "prepare_data",
340                "name": "Prepare Data",
341                "description": "Extract and prepare data for analysis",
342                "function": {
343                    "name": "map",
344                    "input": {
345                        "mappings": [
346                            {
347                                "path": "data.numbers",
348                                "logic": { "var": "temp_data.measurements" }
349                            },
350                            {
351                                "path": "data.user_id",
352                                "logic": { "var": "temp_data.user_id" }
353                            }
354                        ]
355                    }
356                }
357            },
358            {
359                "id": "calculate_stats",
360                "name": "Calculate Statistics",
361                "description": "Calculate statistical measures from numeric data",
362                "function": {
363                    "name": "statistics",
364                    "input": {
365                        "data_path": "data.numbers",
366                        "output_path": "data.stats"
367                    }
368                }
369            },
370            {
371                "id": "enrich_user_data",
372                "name": "Enrich User Data",
373                "description": "Add additional user information",
374                "function": {
375                    "name": "enrich_data",
376                    "input": {
377                        "lookup_field": "user_id",
378                        "lookup_value": "user_123",
379                        "output_path": "data.user_info"
380                    }
381                }
382            }
383        ]
384    }
385    "#;
386
387    // Parse and add the workflow
388    let workflow = Workflow::from_json(workflow_json)?;
389    engine.add_workflow(&workflow);
390
391    // Create sample data
392    let sample_data = json!({
393        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
394        "user_id": "user_123",
395        "timestamp": "2024-01-15T10:30:00Z"
396    });
397
398    // Create and process message
399    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
400    message.temp_data = sample_data;
401    message.data = json!({});
402
403    println!("Processing message with custom functions...\n");
404
405    // Process the message through our custom workflow
406    match engine.process_message(&mut message).await {
407        Ok(_) => {
408            println!("✅ Message processed successfully!\n");
409
410            println!("📊 Final Results:");
411            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
412
413            println!("📋 Audit Trail:");
414            for (i, audit) in message.audit_trail.iter().enumerate() {
415                println!(
416                    "{}. Task: {} (Status: {})",
417                    i + 1,
418                    audit.task_id,
419                    audit.status_code
420                );
421                println!("   Timestamp: {}", audit.timestamp);
422                println!("   Changes: {} field(s) modified", audit.changes.len());
423            }
424
425            if message.has_errors() {
426                println!("\n⚠️  Errors encountered:");
427                for error in &message.errors {
428                    println!(
429                        "   - {}: {:?}",
430                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
431                        error.error_message
432                    );
433                }
434            }
435        }
436        Err(e) => {
437            println!("❌ Error processing message: {:?}", e);
438        }
439    }
440
441    // Demonstrate another example with different data
442    let separator = "=".repeat(50);
443    println!("\n{}", separator);
444    println!("=== Second Example with Different User ===\n");
445
446    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
447    message2.temp_data = json!({
448        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
449        "user_id": "user_456",
450        "timestamp": "2024-01-15T11:00:00Z"
451    });
452    message2.data = json!({});
453
454    // Create a workflow for the second user
455    let workflow2_json = r#"
456    {
457        "id": "custom_function_demo_2",
458        "name": "Custom Function Demo 2",
459        "description": "Second demo with different user",
460        "condition": { "==": [true, true] },
461        "tasks": [
462            {
463                "id": "prepare_data",
464                "name": "Prepare Data",
465                "function": {
466                    "name": "map",
467                    "input": {
468                        "mappings": [
469                            {
470                                "path": "data.numbers",
471                                "logic": { "var": "temp_data.measurements" }
472                            },
473                            {
474                                "path": "data.user_id",
475                                "logic": { "var": "temp_data.user_id" }
476                            }
477                        ]
478                    }
479                }
480            },
481            {
482                "id": "calculate_stats",
483                "name": "Calculate Statistics",
484                "function": {
485                    "name": "statistics",
486                    "input": {
487                        "data_path": "data.numbers",
488                        "output_path": "data.analysis"
489                    }
490                }
491            },
492            {
493                "id": "enrich_user_data",
494                "name": "Enrich User Data",
495                "function": {
496                    "name": "enrich_data",
497                    "input": {
498                        "lookup_field": "user_id",
499                        "lookup_value": "user_456",
500                        "output_path": "data.employee_details"
501                    }
502                }
503            }
504        ]
505    }
506    "#;
507
508    let workflow2 = Workflow::from_json(workflow2_json)?;
509    engine.add_workflow(&workflow2);
510
511    match engine.process_message(&mut message2).await {
512        Ok(_) => {
513            println!("✅ Second message processed successfully!\n");
514            println!("📊 Results for user_456:");
515            println!("{}", serde_json::to_string_pretty(&message2.data)?);
516        }
517        Err(e) => {
518            println!("❌ Error processing second message: {:?}", e);
519        }
520    }
521
522    println!("\n🎉 Custom function examples completed!");
523
524    Ok(())
525}
Source

pub fn register_task_function( &mut self, name: String, handler: Box<dyn AsyncFunctionHandler + Send + Sync>, )

Registers a custom function handler with the engine.

§Arguments
  • name - The name of the function handler
  • handler - The function handler implementation
Examples found in repository?
examples/custom_function.rs (lines 314-317)
307async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
308    println!("=== Custom Function Example ===\n");
309
310    // Create engine without built-in functions to demonstrate custom ones
311    let mut engine = Engine::new_empty();
312
313    // Register our custom functions
314    engine.register_task_function(
315        "statistics".to_string(),
316        Box::new(StatisticsFunction::new()),
317    );
318
319    engine.register_task_function(
320        "enrich_data".to_string(),
321        Box::new(DataEnrichmentFunction::new()),
322    );
323
324    // Also register built-in map function for data preparation
325    engine.register_task_function(
326        "map".to_string(),
327        Box::new(dataflow_rs::engine::functions::MapFunction::new()),
328    );
329
330    // Define a workflow that uses our custom functions
331    let workflow_json = r#"
332    {
333        "id": "custom_function_demo",
334        "name": "Custom Function Demo",
335        "description": "Demonstrates custom async functions in workflow",
336        "condition": { "==": [true, true] },
337        "tasks": [
338            {
339                "id": "prepare_data",
340                "name": "Prepare Data",
341                "description": "Extract and prepare data for analysis",
342                "function": {
343                    "name": "map",
344                    "input": {
345                        "mappings": [
346                            {
347                                "path": "data.numbers",
348                                "logic": { "var": "temp_data.measurements" }
349                            },
350                            {
351                                "path": "data.user_id",
352                                "logic": { "var": "temp_data.user_id" }
353                            }
354                        ]
355                    }
356                }
357            },
358            {
359                "id": "calculate_stats",
360                "name": "Calculate Statistics",
361                "description": "Calculate statistical measures from numeric data",
362                "function": {
363                    "name": "statistics",
364                    "input": {
365                        "data_path": "data.numbers",
366                        "output_path": "data.stats"
367                    }
368                }
369            },
370            {
371                "id": "enrich_user_data",
372                "name": "Enrich User Data",
373                "description": "Add additional user information",
374                "function": {
375                    "name": "enrich_data",
376                    "input": {
377                        "lookup_field": "user_id",
378                        "lookup_value": "user_123",
379                        "output_path": "data.user_info"
380                    }
381                }
382            }
383        ]
384    }
385    "#;
386
387    // Parse and add the workflow
388    let workflow = Workflow::from_json(workflow_json)?;
389    engine.add_workflow(&workflow);
390
391    // Create sample data
392    let sample_data = json!({
393        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
394        "user_id": "user_123",
395        "timestamp": "2024-01-15T10:30:00Z"
396    });
397
398    // Create and process message
399    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
400    message.temp_data = sample_data;
401    message.data = json!({});
402
403    println!("Processing message with custom functions...\n");
404
405    // Process the message through our custom workflow
406    match engine.process_message(&mut message).await {
407        Ok(_) => {
408            println!("✅ Message processed successfully!\n");
409
410            println!("📊 Final Results:");
411            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
412
413            println!("📋 Audit Trail:");
414            for (i, audit) in message.audit_trail.iter().enumerate() {
415                println!(
416                    "{}. Task: {} (Status: {})",
417                    i + 1,
418                    audit.task_id,
419                    audit.status_code
420                );
421                println!("   Timestamp: {}", audit.timestamp);
422                println!("   Changes: {} field(s) modified", audit.changes.len());
423            }
424
425            if message.has_errors() {
426                println!("\n⚠️  Errors encountered:");
427                for error in &message.errors {
428                    println!(
429                        "   - {}: {:?}",
430                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
431                        error.error_message
432                    );
433                }
434            }
435        }
436        Err(e) => {
437            println!("❌ Error processing message: {:?}", e);
438        }
439    }
440
441    // Demonstrate another example with different data
442    let separator = "=".repeat(50);
443    println!("\n{}", separator);
444    println!("=== Second Example with Different User ===\n");
445
446    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
447    message2.temp_data = json!({
448        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
449        "user_id": "user_456",
450        "timestamp": "2024-01-15T11:00:00Z"
451    });
452    message2.data = json!({});
453
454    // Create a workflow for the second user
455    let workflow2_json = r#"
456    {
457        "id": "custom_function_demo_2",
458        "name": "Custom Function Demo 2",
459        "description": "Second demo with different user",
460        "condition": { "==": [true, true] },
461        "tasks": [
462            {
463                "id": "prepare_data",
464                "name": "Prepare Data",
465                "function": {
466                    "name": "map",
467                    "input": {
468                        "mappings": [
469                            {
470                                "path": "data.numbers",
471                                "logic": { "var": "temp_data.measurements" }
472                            },
473                            {
474                                "path": "data.user_id",
475                                "logic": { "var": "temp_data.user_id" }
476                            }
477                        ]
478                    }
479                }
480            },
481            {
482                "id": "calculate_stats",
483                "name": "Calculate Statistics",
484                "function": {
485                    "name": "statistics",
486                    "input": {
487                        "data_path": "data.numbers",
488                        "output_path": "data.analysis"
489                    }
490                }
491            },
492            {
493                "id": "enrich_user_data",
494                "name": "Enrich User Data",
495                "function": {
496                    "name": "enrich_data",
497                    "input": {
498                        "lookup_field": "user_id",
499                        "lookup_value": "user_456",
500                        "output_path": "data.employee_details"
501                    }
502                }
503            }
504        ]
505    }
506    "#;
507
508    let workflow2 = Workflow::from_json(workflow2_json)?;
509    engine.add_workflow(&workflow2);
510
511    match engine.process_message(&mut message2).await {
512        Ok(_) => {
513            println!("✅ Second message processed successfully!\n");
514            println!("📊 Results for user_456:");
515            println!("{}", serde_json::to_string_pretty(&message2.data)?);
516        }
517        Err(e) => {
518            println!("❌ Error processing second message: {:?}", e);
519        }
520    }
521
522    println!("\n🎉 Custom function examples completed!");
523
524    Ok(())
525}
Source

pub fn has_function(&self, name: &str) -> bool

Check if a function with the given name is registered

Source

pub async fn process_message(&self, message: &mut Message) -> Result<()>

Processes a message through workflows that match their conditions.

This async method:

  1. Iterates through workflows sequentially in deterministic order (sorted by ID)
  2. Evaluates conditions for each workflow right before execution
  3. Executes matching workflows one after another (not concurrently)
  4. Updates the message with processing results and audit trail

Workflows are executed sequentially because later workflows may depend on the results of earlier workflows, and their conditions may change based on modifications made by previous workflows.

§Arguments
  • message - The message to process
§Returns
  • Result<()> - Success or an error if processing failed
Examples found in repository?
examples/benchmark.rs (line 195)
169async fn run_async_benchmark(
170    engine: &Engine,
171    sample_user_data: &Value,
172    num_iterations: usize,
173) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
174    let mut total_duration = Duration::new(0, 0);
175    let mut min_duration = Duration::new(u64::MAX, 0);
176    let mut max_duration = Duration::new(0, 0);
177    let mut all_durations = Vec::with_capacity(num_iterations);
178    let mut error_count = 0;
179
180    println!(
181        "Starting async benchmark with {} iterations...",
182        num_iterations
183    );
184
185    for i in 0..num_iterations {
186        let mut message = Message::new(&json!({}));
187        message.temp_data = sample_user_data.clone();
188        message.data = json!({});
189        message.metadata = json!({
190            "timestamp": chrono::Utc::now().to_rfc3339(),
191            "iteration": i
192        });
193
194        let start = Instant::now();
195        match engine.process_message(&mut message).await {
196            Ok(_) => {
197                let duration = start.elapsed();
198                all_durations.push(duration);
199                total_duration += duration;
200                min_duration = min_duration.min(duration);
201                max_duration = max_duration.max(duration);
202
203                // Check for processing errors
204                if message.has_errors() {
205                    error_count += 1;
206                    if error_count <= 5 {
207                        // Only print first 5 errors
208                        println!("Processing errors in iteration {}: {:?}", i, message.errors);
209                    }
210                }
211            }
212            Err(e) => {
213                error_count += 1;
214                if error_count <= 5 {
215                    println!("Error in iteration {}: {:?}", i, e);
216                }
217                // Still record the time even for errors
218                let duration = start.elapsed();
219                all_durations.push(duration);
220                total_duration += duration;
221                min_duration = min_duration.min(duration);
222                max_duration = max_duration.max(duration);
223            }
224        }
225
226        if (i + 1) % 1000 == 0 {
227            println!("Completed {} async iterations", i + 1);
228        }
229    }
230
231    if error_count > 0 {
232        println!("Total errors encountered: {}", error_count);
233    }
234
235    // Sort durations for percentile calculations
236    all_durations.sort();
237
238    let p95_idx = (num_iterations as f64 * 0.95) as usize;
239    let p99_idx = (num_iterations as f64 * 0.99) as usize;
240    let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
241    let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
242    let avg_duration = total_duration / num_iterations as u32;
243
244    println!("\nAsync Benchmark Results (v{}):", VERSION);
245    println!("  Iterations: {}", num_iterations);
246    println!("  Errors: {}", error_count);
247    println!("  Min time: {:?}", min_duration);
248    println!("  Max time: {:?}", max_duration);
249    println!("  Avg time: {:?}", avg_duration);
250    println!("  95th percentile: {:?}", p95);
251    println!("  99th percentile: {:?}", p99);
252    println!("  Total time: {:?}", total_duration);
253
254    Ok(BenchmarkResults {
255        iterations: num_iterations,
256        min_time: min_duration,
257        max_time: max_duration,
258        avg_time: avg_duration,
259        p95: *p95,
260        p99: *p99,
261        total_time: total_duration,
262    })
263}
264
265async fn run_sync_benchmark(
266    engine: &Engine,
267    sample_user_data: &Value,
268    num_iterations: usize,
269) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
270    let mut total_duration = Duration::new(0, 0);
271    let mut min_duration = Duration::new(u64::MAX, 0);
272    let mut max_duration = Duration::new(0, 0);
273    let mut all_durations = Vec::with_capacity(num_iterations);
274    let mut error_count = 0;
275
276    println!(
277        "Starting sync-style benchmark with {} iterations...",
278        num_iterations
279    );
280
281    for i in 0..num_iterations {
282        let mut message = Message::new(&json!({}));
283        message.temp_data = sample_user_data.clone();
284        message.data = json!({});
285        message.metadata = json!({
286            "timestamp": chrono::Utc::now().to_rfc3339(),
287            "iteration": i
288        });
289
290        let start = Instant::now();
291        // Use tokio::task::block_in_place to simulate sync behavior
292        let result = tokio::task::block_in_place(|| {
293            tokio::runtime::Handle::current().block_on(engine.process_message(&mut message))
294        });
295
296        match result {
297            Ok(_) => {
298                let duration = start.elapsed();
299                all_durations.push(duration);
300                total_duration += duration;
301                min_duration = min_duration.min(duration);
302                max_duration = max_duration.max(duration);
303
304                if message.has_errors() {
305                    error_count += 1;
306                }
307            }
308            Err(e) => {
309                error_count += 1;
310                if error_count <= 5 {
311                    println!("Sync error in iteration {}: {:?}", i, e);
312                }
313                let duration = start.elapsed();
314                all_durations.push(duration);
315                total_duration += duration;
316                min_duration = min_duration.min(duration);
317                max_duration = max_duration.max(duration);
318            }
319        }
320
321        if (i + 1) % 1000 == 0 {
322            println!("Completed {} sync iterations", i + 1);
323        }
324    }
325
326    if error_count > 0 {
327        println!("Total sync errors encountered: {}", error_count);
328    }
329
330    all_durations.sort();
331
332    let p95_idx = (num_iterations as f64 * 0.95) as usize;
333    let p99_idx = (num_iterations as f64 * 0.99) as usize;
334    let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
335    let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
336    let avg_duration = total_duration / num_iterations as u32;
337
338    println!("\nSync Benchmark Results (v{}):", VERSION);
339    println!("  Iterations: {}", num_iterations);
340    println!("  Errors: {}", error_count);
341    println!("  Min time: {:?}", min_duration);
342    println!("  Max time: {:?}", max_duration);
343    println!("  Avg time: {:?}", avg_duration);
344    println!("  95th percentile: {:?}", p95);
345    println!("  99th percentile: {:?}", p99);
346    println!("  Total time: {:?}", total_duration);
347
348    Ok(BenchmarkResults {
349        iterations: num_iterations,
350        min_time: min_duration,
351        max_time: max_duration,
352        avg_time: avg_duration,
353        p95: *p95,
354        p99: *p99,
355        total_time: total_duration,
356    })
357}
More examples
Hide additional examples
examples/complete_workflow.rs (line 141)
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6    // Create the workflow engine (built-in functions are auto-registered)
7    let mut engine = Engine::new();
8
9    // Define a workflow that:
10    // 1. Fetches data from a public API
11    // 2. Enriches the message with transformed data
12    // 3. Validates the enriched data
13    let workflow_json = r#"
14    {
15        "id": "complete_workflow",
16        "name": "Complete Workflow Example",
17        "priority": 0,
18        "description": "Demonstrates fetch -> enrich -> validate flow",
19        "condition": { "==": [true, true] },
20        "tasks": [
21            {
22                "id": "fetch_user_data",
23                "name": "Fetch User Data",
24                "description": "Get user data from a public API",
25                "function": {
26                    "name": "http",
27                    "input": {
28                        "url": "https://jsonplaceholder.typicode.com/users/1",
29                        "method": "GET",
30                        "headers": {
31                            "Accept": "application/json"
32                        }
33                    }
34                }
35            },
36            {
37                "id": "initialize_user",
38                "name": "Initialize User Structure",
39                "description": "Create empty user object in data",
40                "function": {
41                    "name": "map",
42                    "input": {
43                        "mappings": [
44                            {
45                                "path": "data",
46                                "logic": { "preserve": {"user": {}} }
47                            }
48                        ]
49                    }
50                }
51            },
52            {
53                "id": "transform_data",
54                "name": "Transform Data",
55                "description": "Map API response to our data model",
56                "function": {
57                    "name": "map",
58                    "input": {
59                        "mappings": [
60                            {
61                                "path": "data.user.id", 
62                                "logic": { "var": "temp_data.body.id" }
63                            },
64                            {
65                                "path": "data.user.name", 
66                                "logic": { "var": "temp_data.body.name" }
67                            },
68                            {
69                                "path": "data.user.email", 
70                                "logic": { "var": "temp_data.body.email" }
71                            },
72                            {
73                                "path": "data.user.address", 
74                                "logic": {
75                                    "cat": [
76                                        { "var": "temp_data.body.address.street" },
77                                        ", ",
78                                        { "var": "temp_data.body.address.city" }
79                                    ]
80                                }
81                            },
82                            {
83                                "path": "data.user.company", 
84                                "logic": { "var": "temp_data.body.company.name" }
85                            }
86                        ]
87                    }
88                }
89            },
90            {
91                "id": "validate_user_data",
92                "name": "Validate User Data",
93                "description": "Ensure the user data meets our requirements",
94                "function": {
95                    "name": "validate",
96                    "input": {
97                        "rules": [
98                            {
99                                "path": "data",
100                                "logic": { "!!": { "var": "data.user.id" } },
101                                "message": "User ID is required"
102                            },
103                            {
104                                "path": "data",
105                                "logic": { "!!": { "var": "data.user.name" } },
106                                "message": "User name is required"
107                            },
108                            {
109                                "path": "data",
110                                "logic": { "!!": { "var": "data.user.email" } },
111                                "message": "User email is required"
112                            },
113                            {
114                                "path": "data",
115                                "logic": {
116                                    "in": [
117                                        "@",
118                                        { "var": "data.user.email" }
119                                    ]
120                                },
121                                "message": "Email must be valid format"
122                            }
123                        ]
124                    }
125                }
126            }
127        ]
128    }
129    "#;
130
131    // Parse and add the workflow to the engine
132    let workflow = Workflow::from_json(workflow_json)?;
133    engine.add_workflow(&workflow);
134
135    // Create a message to process with properly initialized data structure
136    let mut message = Message::new(&json!({}));
137
138    // Process the message through the workflow asynchronously
139    println!("Processing message through workflow...");
140
141    match engine.process_message(&mut message).await {
142        Ok(_) => {
143            println!("Workflow completed successfully!");
144        }
145        Err(e) => {
146            eprintln!("Error executing workflow: {:?}", e);
147            if !message.errors.is_empty() {
148                println!("\nErrors recorded in message:");
149                for err in &message.errors {
150                    println!(
151                        "- Workflow: {:?}, Task: {:?}, Error: {:?}",
152                        err.workflow_id, err.task_id, err.error_message
153                    );
154                }
155            }
156        }
157    }
158
159    println!(
160        "\nFull message structure:\n{}",
161        serde_json::to_string_pretty(&message)?
162    );
163
164    Ok(())
165}
examples/custom_function.rs (line 406)
307async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
308    println!("=== Custom Function Example ===\n");
309
310    // Create engine without built-in functions to demonstrate custom ones
311    let mut engine = Engine::new_empty();
312
313    // Register our custom functions
314    engine.register_task_function(
315        "statistics".to_string(),
316        Box::new(StatisticsFunction::new()),
317    );
318
319    engine.register_task_function(
320        "enrich_data".to_string(),
321        Box::new(DataEnrichmentFunction::new()),
322    );
323
324    // Also register built-in map function for data preparation
325    engine.register_task_function(
326        "map".to_string(),
327        Box::new(dataflow_rs::engine::functions::MapFunction::new()),
328    );
329
330    // Define a workflow that uses our custom functions
331    let workflow_json = r#"
332    {
333        "id": "custom_function_demo",
334        "name": "Custom Function Demo",
335        "description": "Demonstrates custom async functions in workflow",
336        "condition": { "==": [true, true] },
337        "tasks": [
338            {
339                "id": "prepare_data",
340                "name": "Prepare Data",
341                "description": "Extract and prepare data for analysis",
342                "function": {
343                    "name": "map",
344                    "input": {
345                        "mappings": [
346                            {
347                                "path": "data.numbers",
348                                "logic": { "var": "temp_data.measurements" }
349                            },
350                            {
351                                "path": "data.user_id",
352                                "logic": { "var": "temp_data.user_id" }
353                            }
354                        ]
355                    }
356                }
357            },
358            {
359                "id": "calculate_stats",
360                "name": "Calculate Statistics",
361                "description": "Calculate statistical measures from numeric data",
362                "function": {
363                    "name": "statistics",
364                    "input": {
365                        "data_path": "data.numbers",
366                        "output_path": "data.stats"
367                    }
368                }
369            },
370            {
371                "id": "enrich_user_data",
372                "name": "Enrich User Data",
373                "description": "Add additional user information",
374                "function": {
375                    "name": "enrich_data",
376                    "input": {
377                        "lookup_field": "user_id",
378                        "lookup_value": "user_123",
379                        "output_path": "data.user_info"
380                    }
381                }
382            }
383        ]
384    }
385    "#;
386
387    // Parse and add the workflow
388    let workflow = Workflow::from_json(workflow_json)?;
389    engine.add_workflow(&workflow);
390
391    // Create sample data
392    let sample_data = json!({
393        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
394        "user_id": "user_123",
395        "timestamp": "2024-01-15T10:30:00Z"
396    });
397
398    // Create and process message
399    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
400    message.temp_data = sample_data;
401    message.data = json!({});
402
403    println!("Processing message with custom functions...\n");
404
405    // Process the message through our custom workflow
406    match engine.process_message(&mut message).await {
407        Ok(_) => {
408            println!("✅ Message processed successfully!\n");
409
410            println!("📊 Final Results:");
411            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
412
413            println!("📋 Audit Trail:");
414            for (i, audit) in message.audit_trail.iter().enumerate() {
415                println!(
416                    "{}. Task: {} (Status: {})",
417                    i + 1,
418                    audit.task_id,
419                    audit.status_code
420                );
421                println!("   Timestamp: {}", audit.timestamp);
422                println!("   Changes: {} field(s) modified", audit.changes.len());
423            }
424
425            if message.has_errors() {
426                println!("\n⚠️  Errors encountered:");
427                for error in &message.errors {
428                    println!(
429                        "   - {}: {:?}",
430                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
431                        error.error_message
432                    );
433                }
434            }
435        }
436        Err(e) => {
437            println!("❌ Error processing message: {:?}", e);
438        }
439    }
440
441    // Demonstrate another example with different data
442    let separator = "=".repeat(50);
443    println!("\n{}", separator);
444    println!("=== Second Example with Different User ===\n");
445
446    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
447    message2.temp_data = json!({
448        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
449        "user_id": "user_456",
450        "timestamp": "2024-01-15T11:00:00Z"
451    });
452    message2.data = json!({});
453
454    // Create a workflow for the second user
455    let workflow2_json = r#"
456    {
457        "id": "custom_function_demo_2",
458        "name": "Custom Function Demo 2",
459        "description": "Second demo with different user",
460        "condition": { "==": [true, true] },
461        "tasks": [
462            {
463                "id": "prepare_data",
464                "name": "Prepare Data",
465                "function": {
466                    "name": "map",
467                    "input": {
468                        "mappings": [
469                            {
470                                "path": "data.numbers",
471                                "logic": { "var": "temp_data.measurements" }
472                            },
473                            {
474                                "path": "data.user_id",
475                                "logic": { "var": "temp_data.user_id" }
476                            }
477                        ]
478                    }
479                }
480            },
481            {
482                "id": "calculate_stats",
483                "name": "Calculate Statistics",
484                "function": {
485                    "name": "statistics",
486                    "input": {
487                        "data_path": "data.numbers",
488                        "output_path": "data.analysis"
489                    }
490                }
491            },
492            {
493                "id": "enrich_user_data",
494                "name": "Enrich User Data",
495                "function": {
496                    "name": "enrich_data",
497                    "input": {
498                        "lookup_field": "user_id",
499                        "lookup_value": "user_456",
500                        "output_path": "data.employee_details"
501                    }
502                }
503            }
504        ]
505    }
506    "#;
507
508    let workflow2 = Workflow::from_json(workflow2_json)?;
509    engine.add_workflow(&workflow2);
510
511    match engine.process_message(&mut message2).await {
512        Ok(_) => {
513            println!("✅ Second message processed successfully!\n");
514            println!("📊 Results for user_456:");
515            println!("{}", serde_json::to_string_pretty(&message2.data)?);
516        }
517        Err(e) => {
518            println!("❌ Error processing second message: {:?}", e);
519        }
520    }
521
522    println!("\n🎉 Custom function examples completed!");
523
524    Ok(())
525}

Trait Implementations§

Source§

impl Default for Engine

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

§

impl Freeze for Engine

§

impl !RefUnwindSafe for Engine

§

impl Send for Engine

§

impl Sync for Engine

§

impl Unpin for Engine

§

impl !UnwindSafe for Engine

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,