Engine

Struct Engine 

Source
pub struct Engine { /* private fields */ }
Expand description

Thread-safe engine that processes messages through workflows using non-blocking async IO.

§Architecture

The engine is optimized for both IO-bound and CPU-bound workloads, featuring:

  • Vertical Scalability: Automatically utilizes all available CPU cores
  • Thread-Safe Design: All components are Send + Sync for concurrent access
  • Unified Concurrency: Single parameter controls both DataLogic pool size and max concurrent messages

§Concurrency Model

Each message receives exclusive access to a DataLogic instance for its entire workflow execution, eliminating lock contention between tasks while maintaining thread-safety across messages.

§Performance

The engine achieves linear scalability with CPU cores, capable of processing millions of messages per second with appropriate concurrency settings.

Implementations§

Source§

impl Engine

Source

pub fn new() -> Self

Creates a new Engine instance with built-in function handlers pre-registered.

§Example
use dataflow_rs::Engine;

let engine = Engine::new();
Examples found in repository?
examples/complete_workflow.rs (line 7)
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6    // Create the workflow engine (built-in functions are auto-registered)
7    let engine = Engine::new();
8
9    // Define a workflow that:
10    // 1. Fetches data from a public API
11    // 2. Enriches the message with transformed data
12    // 3. Validates the enriched data
13    let workflow_json = r#"
14    {
15        "id": "complete_workflow",
16        "name": "Complete Workflow Example",
17        "priority": 0,
18        "description": "Demonstrates fetch -> enrich -> validate flow",
19        "condition": { "==": [true, true] },
20        "tasks": [
21            {
22                "id": "fetch_user_data",
23                "name": "Fetch User Data",
24                "description": "Get user data from a public API",
25                "function": {
26                    "name": "http",
27                    "input": {
28                        "url": "https://jsonplaceholder.typicode.com/users/1",
29                        "method": "GET",
30                        "headers": {
31                            "Accept": "application/json"
32                        }
33                    }
34                }
35            },
36            {
37                "id": "initialize_user",
38                "name": "Initialize User Structure",
39                "description": "Create empty user object in data",
40                "function": {
41                    "name": "map",
42                    "input": {
43                        "mappings": [
44                            {
45                                "path": "data",
46                                "logic": { "preserve": {"user": {}} }
47                            }
48                        ]
49                    }
50                }
51            },
52            {
53                "id": "transform_data",
54                "name": "Transform Data",
55                "description": "Map API response to our data model",
56                "function": {
57                    "name": "map",
58                    "input": {
59                        "mappings": [
60                            {
61                                "path": "data.user.id", 
62                                "logic": { "var": "temp_data.body.id" }
63                            },
64                            {
65                                "path": "data.user.name", 
66                                "logic": { "var": "temp_data.body.name" }
67                            },
68                            {
69                                "path": "data.user.email", 
70                                "logic": { "var": "temp_data.body.email" }
71                            },
72                            {
73                                "path": "data.user.address", 
74                                "logic": {
75                                    "cat": [
76                                        { "var": "temp_data.body.address.street" },
77                                        ", ",
78                                        { "var": "temp_data.body.address.city" }
79                                    ]
80                                }
81                            },
82                            {
83                                "path": "data.user.company", 
84                                "logic": { "var": "temp_data.body.company.name" }
85                            }
86                        ]
87                    }
88                }
89            },
90            {
91                "id": "validate_user_data",
92                "name": "Validate User Data",
93                "description": "Ensure the user data meets our requirements",
94                "function": {
95                    "name": "validate",
96                    "input": {
97                        "rules": [
98                            {
99                                "path": "data",
100                                "logic": { "!!": { "var": "data.user.id" } },
101                                "message": "User ID is required"
102                            },
103                            {
104                                "path": "data",
105                                "logic": { "!!": { "var": "data.user.name" } },
106                                "message": "User name is required"
107                            },
108                            {
109                                "path": "data",
110                                "logic": { "!!": { "var": "data.user.email" } },
111                                "message": "User email is required"
112                            },
113                            {
114                                "path": "data",
115                                "logic": {
116                                    "in": [
117                                        "@",
118                                        { "var": "data.user.email" }
119                                    ]
120                                },
121                                "message": "Email must be valid format"
122                            }
123                        ]
124                    }
125                }
126            }
127        ]
128    }
129    "#;
130
131    // Parse and add the workflow to the engine
132    let workflow = Workflow::from_json(workflow_json)?;
133    engine.add_workflow(&workflow);
134
135    // Create a message to process with properly initialized data structure
136    let mut message = Message::new(&json!({}));
137
138    // Process the message through the workflow asynchronously
139    println!("Processing message through workflow...");
140
141    match engine.process_message(&mut message).await {
142        Ok(_) => {
143            println!("Workflow completed successfully!");
144        }
145        Err(e) => {
146            eprintln!("Error executing workflow: {e:?}");
147            if !message.errors.is_empty() {
148                println!("\nErrors recorded in message:");
149                for err in &message.errors {
150                    println!(
151                        "- Workflow: {:?}, Task: {:?}, Error: {:?}",
152                        err.workflow_id, err.task_id, err.error_message
153                    );
154                }
155            }
156        }
157    }
158
159    println!(
160        "\nFull message structure:\n{}",
161        serde_json::to_string_pretty(&message)?
162    );
163
164    Ok(())
165}
Source

pub fn with_concurrency(concurrency: usize) -> Self

Create a new engine with a specific concurrency level. This sets both the DataLogic pool size and the maximum concurrent messages.

§Arguments
  • concurrency - Maximum number of messages that can be processed concurrently
Examples found in repository?
examples/benchmark.rs (line 117)
14async fn main() -> Result<(), Box<dyn std::error::Error>> {
15    println!("========================================");
16    println!("DATAFLOW ENGINE BENCHMARK");
17    println!("========================================\n");
18
19    // Define a workflow that:
20    // 1. Uses pre-loaded data instead of HTTP fetch
21    // 2. Enriches the message with transformed data
22    // 3. Demonstrates proper async workflow execution
23    let workflow_json = r#"
24    {
25        "id": "benchmark_workflow",
26        "name": "Benchmark Workflow Example",
27        "description": "Demonstrates async workflow execution with data transformation",
28        "priority": 1,
29        "condition": { "==": [true, true] },
30        "tasks": [
31            {
32                "id": "transform_data",
33                "name": "Transform Data",
34                "description": "Map API response to our data model",
35                "function": {
36                    "name": "map",
37                    "input": {
38                        "mappings": [
39                            {
40                                "path": "data.user.id", 
41                                "logic": { "var": "temp_data.body.id" }
42                            },
43                            {
44                                "path": "data.user.name", 
45                                "logic": { "var": "temp_data.body.name" }
46                            },
47                            {
48                                "path": "data.user.email", 
49                                "logic": { "var": "temp_data.body.email" }
50                            },
51                            {
52                                "path": "data.user.address", 
53                                "logic": {
54                                    "cat": [
55                                        { "var": "temp_data.body.address.street" },
56                                        ", ",
57                                        { "var": "temp_data.body.address.city" }
58                                    ]
59                                }
60                            },
61                            {
62                                "path": "data.user.company", 
63                                "logic": { "var": "temp_data.body.company.name" }
64                            },
65                            {
66                                "path": "data.processed_at", 
67                                "logic": { "cat": ["Processed at ", { "var": "metadata.timestamp" }] }
68                            }
69                        ]
70                    }
71                }
72            }
73        ]
74    }
75    "#;
76
77    // Parse the workflow
78    let workflow = Workflow::from_json(workflow_json)?;
79
80    // Create sample user data (similar to what the HTTP endpoint would return)
81    let sample_user_data = json!({
82        "body": {
83            "id": 1,
84            "name": "Leanne Graham",
85            "username": "Bret",
86            "email": "Sincere@april.biz",
87            "address": {
88                "street": "Kulas Light",
89                "suite": "Apt. 556",
90                "city": "Gwenborough",
91                "zipcode": "92998-3874",
92                "geo": {
93                    "lat": "-37.3159",
94                    "lng": "81.1496"
95                }
96            },
97            "phone": "1-770-736-8031 x56442",
98            "website": "hildegard.org",
99            "company": {
100                "name": "Romaguera-Crona",
101                "catchPhrase": "Multi-layered client-server neural-net",
102                "bs": "harness real-time e-markets"
103            }
104        }
105    });
106
107    let iterations = 100000;
108
109    println!("Testing with {} iterations\n", iterations);
110
111    // Test sequential performance with concurrency 1
112    println!("--- Sequential Performance (Baseline) ---");
113    println!("Concurrency | Avg Time per Message | Total Time | Messages/sec");
114    println!("------------|---------------------|------------|-------------");
115
116    // Sequential baseline
117    let engine = Arc::new(Engine::with_concurrency(1));
118    engine.add_workflow(&workflow);
119
120    let seq_results = run_sequential_benchmark(&*engine, &sample_user_data, iterations).await?;
121    let throughput = (iterations as f64) / seq_results.total_time.as_secs_f64();
122
123    println!(
124        "{:^11} | {:>19.3}μs | {:>10.2}ms | {:>12.0}",
125        1,
126        seq_results.avg_time.as_secs_f64() * 1_000_000.0,
127        seq_results.total_time.as_secs_f64() * 1000.0,
128        throughput
129    );
130
131    log_benchmark_results(
132        iterations,
133        seq_results.min_time,
134        seq_results.max_time,
135        seq_results.avg_time,
136        seq_results.p95,
137        seq_results.p99,
138        seq_results.total_time,
139        format!("seq_x1"),
140    )?;
141
142    // Test concurrent performance
143    println!("\n--- Concurrent Performance ---");
144    println!("Concurrency | Avg Time per Message | Total Time | Messages/sec | Speedup");
145    println!("------------|---------------------|------------|--------------|--------");
146
147    // Test configurations with different concurrency levels
148    let test_configs = vec![
149        1, 16,  // 16 concurrent messages with 16 DataLogic instances
150        32,  // 32 concurrent messages with 32 DataLogic instances
151        64,  // 64 concurrent messages with 64 DataLogic instances
152        128, // 128 concurrent messages with 128 DataLogic instances
153    ];
154
155    let baseline_throughput = throughput;
156
157    for concurrency in test_configs {
158        let engine = Arc::new(Engine::with_concurrency(concurrency));
159        engine.add_workflow(&workflow);
160
161        let con_results = run_concurrent_benchmark(
162            engine.clone(),
163            &sample_user_data,
164            iterations,
165            concurrency, // Use same value for concurrent tasks
166        )
167        .await?;
168
169        let throughput = (iterations as f64) / con_results.total_time.as_secs_f64();
170        let speedup = throughput / baseline_throughput;
171
172        println!(
173            "{:^11} | {:>19.3}μs | {:>10.2}ms | {:>12.0} | {:>7.2}x",
174            concurrency,
175            con_results.avg_time.as_secs_f64() * 1_000_000.0,
176            con_results.total_time.as_secs_f64() * 1000.0,
177            throughput,
178            speedup
179        );
180
181        log_benchmark_results(
182            iterations,
183            con_results.min_time,
184            con_results.max_time,
185            con_results.avg_time,
186            con_results.p95,
187            con_results.p99,
188            con_results.total_time,
189            format!("con_x{}", concurrency),
190        )?;
191    }
192
193    println!("\n========================================");
194    println!("Benchmark results saved to '{}'", BENCHMARK_LOG_FILE);
195    println!("========================================");
196
197    Ok(())
198}
Source

pub fn with_pool_size(pool_size: usize) -> Self

👎Deprecated since 1.0.0: Use with_concurrency instead

Create a new engine with a specific pool size (deprecated, use with_concurrency)

Source

pub fn new_empty() -> Self

Create a new engine instance without any pre-registered functions

Examples found in repository?
examples/custom_function.rs (line 322)
318async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
319    println!("=== Custom Function Example ===\n");
320
321    // Create engine without built-in functions to demonstrate custom ones
322    let mut engine = Engine::new_empty();
323
324    // Register our custom functions
325    engine.register_task_function(
326        "statistics".to_string(),
327        Box::new(StatisticsFunction::new()),
328    );
329
330    engine.register_task_function(
331        "enrich_data".to_string(),
332        Box::new(DataEnrichmentFunction::new()),
333    );
334
335    // Also register built-in map function for data preparation
336    engine.register_task_function(
337        "map".to_string(),
338        Box::new(dataflow_rs::engine::functions::MapFunction::new()),
339    );
340
341    // Define a workflow that uses our custom functions
342    let workflow_json = r#"
343    {
344        "id": "custom_function_demo",
345        "name": "Custom Function Demo",
346        "description": "Demonstrates custom async functions in workflow",
347        "condition": { "==": [true, true] },
348        "tasks": [
349            {
350                "id": "prepare_data",
351                "name": "Prepare Data",
352                "description": "Extract and prepare data for analysis",
353                "function": {
354                    "name": "map",
355                    "input": {
356                        "mappings": [
357                            {
358                                "path": "data.numbers",
359                                "logic": { "var": "temp_data.measurements" }
360                            },
361                            {
362                                "path": "data.user_id",
363                                "logic": { "var": "temp_data.user_id" }
364                            }
365                        ]
366                    }
367                }
368            },
369            {
370                "id": "calculate_stats",
371                "name": "Calculate Statistics",
372                "description": "Calculate statistical measures from numeric data",
373                "function": {
374                    "name": "statistics",
375                    "input": {
376                        "data_path": "data.numbers",
377                        "output_path": "data.stats"
378                    }
379                }
380            },
381            {
382                "id": "enrich_user_data",
383                "name": "Enrich User Data",
384                "description": "Add additional user information",
385                "function": {
386                    "name": "enrich_data",
387                    "input": {
388                        "lookup_field": "user_id",
389                        "lookup_value": "user_123",
390                        "output_path": "data.user_info"
391                    }
392                }
393            }
394        ]
395    }
396    "#;
397
398    // Parse and add the workflow
399    let workflow = Workflow::from_json(workflow_json)?;
400    engine.add_workflow(&workflow);
401
402    // Create sample data
403    let sample_data = json!({
404        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
405        "user_id": "user_123",
406        "timestamp": "2024-01-15T10:30:00Z"
407    });
408
409    // Create and process message
410    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
411    message.temp_data = sample_data;
412    message.data = json!({});
413
414    println!("Processing message with custom functions...\n");
415
416    // Process the message through our custom workflow
417    match engine.process_message(&mut message).await {
418        Ok(_) => {
419            println!("✅ Message processed successfully!\n");
420
421            println!("📊 Final Results:");
422            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
423
424            println!("📋 Audit Trail:");
425            for (i, audit) in message.audit_trail.iter().enumerate() {
426                println!(
427                    "{}. Task: {} (Status: {})",
428                    i + 1,
429                    audit.task_id,
430                    audit.status_code
431                );
432                println!("   Timestamp: {}", audit.timestamp);
433                println!("   Changes: {} field(s) modified", audit.changes.len());
434            }
435
436            if message.has_errors() {
437                println!("\n⚠️  Errors encountered:");
438                for error in &message.errors {
439                    println!(
440                        "   - {}: {:?}",
441                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
442                        error.error_message
443                    );
444                }
445            }
446        }
447        Err(e) => {
448            println!("❌ Error processing message: {e:?}");
449        }
450    }
451
452    // Demonstrate another example with different data
453    let separator = "=".repeat(50);
454    println!("\n{separator}");
455    println!("=== Second Example with Different User ===\n");
456
457    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
458    message2.temp_data = json!({
459        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
460        "user_id": "user_456",
461        "timestamp": "2024-01-15T11:00:00Z"
462    });
463    message2.data = json!({});
464
465    // Create a workflow for the second user
466    let workflow2_json = r#"
467    {
468        "id": "custom_function_demo_2",
469        "name": "Custom Function Demo 2",
470        "description": "Second demo with different user",
471        "condition": { "==": [true, true] },
472        "tasks": [
473            {
474                "id": "prepare_data",
475                "name": "Prepare Data",
476                "function": {
477                    "name": "map",
478                    "input": {
479                        "mappings": [
480                            {
481                                "path": "data.numbers",
482                                "logic": { "var": "temp_data.measurements" }
483                            },
484                            {
485                                "path": "data.user_id",
486                                "logic": { "var": "temp_data.user_id" }
487                            }
488                        ]
489                    }
490                }
491            },
492            {
493                "id": "calculate_stats",
494                "name": "Calculate Statistics",
495                "function": {
496                    "name": "statistics",
497                    "input": {
498                        "data_path": "data.numbers",
499                        "output_path": "data.analysis"
500                    }
501                }
502            },
503            {
504                "id": "enrich_user_data",
505                "name": "Enrich User Data",
506                "function": {
507                    "name": "enrich_data",
508                    "input": {
509                        "lookup_field": "user_id",
510                        "lookup_value": "user_456",
511                        "output_path": "data.employee_details"
512                    }
513                }
514            }
515        ]
516    }
517    "#;
518
519    let workflow2 = Workflow::from_json(workflow2_json)?;
520    engine.add_workflow(&workflow2);
521
522    match engine.process_message(&mut message2).await {
523        Ok(_) => {
524            println!("✅ Second message processed successfully!\n");
525            println!("📊 Results for user_456:");
526            println!("{}", serde_json::to_string_pretty(&message2.data)?);
527        }
528        Err(e) => {
529            println!("❌ Error processing second message: {e:?}");
530        }
531    }
532
533    println!("\n🎉 Custom function examples completed!");
534
535    Ok(())
536}
Source

pub fn concurrency(&self) -> usize

Get the configured concurrency level

Source

pub fn with_retry_config(self, config: RetryConfig) -> Self

Configure retry behavior

Source

pub fn add_workflow(&self, workflow: &Workflow)

Adds a workflow to the engine.

§Arguments
  • workflow - The workflow to add
Examples found in repository?
examples/complete_workflow.rs (line 133)
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6    // Create the workflow engine (built-in functions are auto-registered)
7    let engine = Engine::new();
8
9    // Define a workflow that:
10    // 1. Fetches data from a public API
11    // 2. Enriches the message with transformed data
12    // 3. Validates the enriched data
13    let workflow_json = r#"
14    {
15        "id": "complete_workflow",
16        "name": "Complete Workflow Example",
17        "priority": 0,
18        "description": "Demonstrates fetch -> enrich -> validate flow",
19        "condition": { "==": [true, true] },
20        "tasks": [
21            {
22                "id": "fetch_user_data",
23                "name": "Fetch User Data",
24                "description": "Get user data from a public API",
25                "function": {
26                    "name": "http",
27                    "input": {
28                        "url": "https://jsonplaceholder.typicode.com/users/1",
29                        "method": "GET",
30                        "headers": {
31                            "Accept": "application/json"
32                        }
33                    }
34                }
35            },
36            {
37                "id": "initialize_user",
38                "name": "Initialize User Structure",
39                "description": "Create empty user object in data",
40                "function": {
41                    "name": "map",
42                    "input": {
43                        "mappings": [
44                            {
45                                "path": "data",
46                                "logic": { "preserve": {"user": {}} }
47                            }
48                        ]
49                    }
50                }
51            },
52            {
53                "id": "transform_data",
54                "name": "Transform Data",
55                "description": "Map API response to our data model",
56                "function": {
57                    "name": "map",
58                    "input": {
59                        "mappings": [
60                            {
61                                "path": "data.user.id", 
62                                "logic": { "var": "temp_data.body.id" }
63                            },
64                            {
65                                "path": "data.user.name", 
66                                "logic": { "var": "temp_data.body.name" }
67                            },
68                            {
69                                "path": "data.user.email", 
70                                "logic": { "var": "temp_data.body.email" }
71                            },
72                            {
73                                "path": "data.user.address", 
74                                "logic": {
75                                    "cat": [
76                                        { "var": "temp_data.body.address.street" },
77                                        ", ",
78                                        { "var": "temp_data.body.address.city" }
79                                    ]
80                                }
81                            },
82                            {
83                                "path": "data.user.company", 
84                                "logic": { "var": "temp_data.body.company.name" }
85                            }
86                        ]
87                    }
88                }
89            },
90            {
91                "id": "validate_user_data",
92                "name": "Validate User Data",
93                "description": "Ensure the user data meets our requirements",
94                "function": {
95                    "name": "validate",
96                    "input": {
97                        "rules": [
98                            {
99                                "path": "data",
100                                "logic": { "!!": { "var": "data.user.id" } },
101                                "message": "User ID is required"
102                            },
103                            {
104                                "path": "data",
105                                "logic": { "!!": { "var": "data.user.name" } },
106                                "message": "User name is required"
107                            },
108                            {
109                                "path": "data",
110                                "logic": { "!!": { "var": "data.user.email" } },
111                                "message": "User email is required"
112                            },
113                            {
114                                "path": "data",
115                                "logic": {
116                                    "in": [
117                                        "@",
118                                        { "var": "data.user.email" }
119                                    ]
120                                },
121                                "message": "Email must be valid format"
122                            }
123                        ]
124                    }
125                }
126            }
127        ]
128    }
129    "#;
130
131    // Parse and add the workflow to the engine
132    let workflow = Workflow::from_json(workflow_json)?;
133    engine.add_workflow(&workflow);
134
135    // Create a message to process with properly initialized data structure
136    let mut message = Message::new(&json!({}));
137
138    // Process the message through the workflow asynchronously
139    println!("Processing message through workflow...");
140
141    match engine.process_message(&mut message).await {
142        Ok(_) => {
143            println!("Workflow completed successfully!");
144        }
145        Err(e) => {
146            eprintln!("Error executing workflow: {e:?}");
147            if !message.errors.is_empty() {
148                println!("\nErrors recorded in message:");
149                for err in &message.errors {
150                    println!(
151                        "- Workflow: {:?}, Task: {:?}, Error: {:?}",
152                        err.workflow_id, err.task_id, err.error_message
153                    );
154                }
155            }
156        }
157    }
158
159    println!(
160        "\nFull message structure:\n{}",
161        serde_json::to_string_pretty(&message)?
162    );
163
164    Ok(())
165}
More examples
Hide additional examples
examples/benchmark.rs (line 118)
14async fn main() -> Result<(), Box<dyn std::error::Error>> {
15    println!("========================================");
16    println!("DATAFLOW ENGINE BENCHMARK");
17    println!("========================================\n");
18
19    // Define a workflow that:
20    // 1. Uses pre-loaded data instead of HTTP fetch
21    // 2. Enriches the message with transformed data
22    // 3. Demonstrates proper async workflow execution
23    let workflow_json = r#"
24    {
25        "id": "benchmark_workflow",
26        "name": "Benchmark Workflow Example",
27        "description": "Demonstrates async workflow execution with data transformation",
28        "priority": 1,
29        "condition": { "==": [true, true] },
30        "tasks": [
31            {
32                "id": "transform_data",
33                "name": "Transform Data",
34                "description": "Map API response to our data model",
35                "function": {
36                    "name": "map",
37                    "input": {
38                        "mappings": [
39                            {
40                                "path": "data.user.id", 
41                                "logic": { "var": "temp_data.body.id" }
42                            },
43                            {
44                                "path": "data.user.name", 
45                                "logic": { "var": "temp_data.body.name" }
46                            },
47                            {
48                                "path": "data.user.email", 
49                                "logic": { "var": "temp_data.body.email" }
50                            },
51                            {
52                                "path": "data.user.address", 
53                                "logic": {
54                                    "cat": [
55                                        { "var": "temp_data.body.address.street" },
56                                        ", ",
57                                        { "var": "temp_data.body.address.city" }
58                                    ]
59                                }
60                            },
61                            {
62                                "path": "data.user.company", 
63                                "logic": { "var": "temp_data.body.company.name" }
64                            },
65                            {
66                                "path": "data.processed_at", 
67                                "logic": { "cat": ["Processed at ", { "var": "metadata.timestamp" }] }
68                            }
69                        ]
70                    }
71                }
72            }
73        ]
74    }
75    "#;
76
77    // Parse the workflow
78    let workflow = Workflow::from_json(workflow_json)?;
79
80    // Create sample user data (similar to what the HTTP endpoint would return)
81    let sample_user_data = json!({
82        "body": {
83            "id": 1,
84            "name": "Leanne Graham",
85            "username": "Bret",
86            "email": "Sincere@april.biz",
87            "address": {
88                "street": "Kulas Light",
89                "suite": "Apt. 556",
90                "city": "Gwenborough",
91                "zipcode": "92998-3874",
92                "geo": {
93                    "lat": "-37.3159",
94                    "lng": "81.1496"
95                }
96            },
97            "phone": "1-770-736-8031 x56442",
98            "website": "hildegard.org",
99            "company": {
100                "name": "Romaguera-Crona",
101                "catchPhrase": "Multi-layered client-server neural-net",
102                "bs": "harness real-time e-markets"
103            }
104        }
105    });
106
107    let iterations = 100000;
108
109    println!("Testing with {} iterations\n", iterations);
110
111    // Test sequential performance with concurrency 1
112    println!("--- Sequential Performance (Baseline) ---");
113    println!("Concurrency | Avg Time per Message | Total Time | Messages/sec");
114    println!("------------|---------------------|------------|-------------");
115
116    // Sequential baseline
117    let engine = Arc::new(Engine::with_concurrency(1));
118    engine.add_workflow(&workflow);
119
120    let seq_results = run_sequential_benchmark(&*engine, &sample_user_data, iterations).await?;
121    let throughput = (iterations as f64) / seq_results.total_time.as_secs_f64();
122
123    println!(
124        "{:^11} | {:>19.3}μs | {:>10.2}ms | {:>12.0}",
125        1,
126        seq_results.avg_time.as_secs_f64() * 1_000_000.0,
127        seq_results.total_time.as_secs_f64() * 1000.0,
128        throughput
129    );
130
131    log_benchmark_results(
132        iterations,
133        seq_results.min_time,
134        seq_results.max_time,
135        seq_results.avg_time,
136        seq_results.p95,
137        seq_results.p99,
138        seq_results.total_time,
139        format!("seq_x1"),
140    )?;
141
142    // Test concurrent performance
143    println!("\n--- Concurrent Performance ---");
144    println!("Concurrency | Avg Time per Message | Total Time | Messages/sec | Speedup");
145    println!("------------|---------------------|------------|--------------|--------");
146
147    // Test configurations with different concurrency levels
148    let test_configs = vec![
149        1, 16,  // 16 concurrent messages with 16 DataLogic instances
150        32,  // 32 concurrent messages with 32 DataLogic instances
151        64,  // 64 concurrent messages with 64 DataLogic instances
152        128, // 128 concurrent messages with 128 DataLogic instances
153    ];
154
155    let baseline_throughput = throughput;
156
157    for concurrency in test_configs {
158        let engine = Arc::new(Engine::with_concurrency(concurrency));
159        engine.add_workflow(&workflow);
160
161        let con_results = run_concurrent_benchmark(
162            engine.clone(),
163            &sample_user_data,
164            iterations,
165            concurrency, // Use same value for concurrent tasks
166        )
167        .await?;
168
169        let throughput = (iterations as f64) / con_results.total_time.as_secs_f64();
170        let speedup = throughput / baseline_throughput;
171
172        println!(
173            "{:^11} | {:>19.3}μs | {:>10.2}ms | {:>12.0} | {:>7.2}x",
174            concurrency,
175            con_results.avg_time.as_secs_f64() * 1_000_000.0,
176            con_results.total_time.as_secs_f64() * 1000.0,
177            throughput,
178            speedup
179        );
180
181        log_benchmark_results(
182            iterations,
183            con_results.min_time,
184            con_results.max_time,
185            con_results.avg_time,
186            con_results.p95,
187            con_results.p99,
188            con_results.total_time,
189            format!("con_x{}", concurrency),
190        )?;
191    }
192
193    println!("\n========================================");
194    println!("Benchmark results saved to '{}'", BENCHMARK_LOG_FILE);
195    println!("========================================");
196
197    Ok(())
198}
examples/custom_function.rs (line 400)
318async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
319    println!("=== Custom Function Example ===\n");
320
321    // Create engine without built-in functions to demonstrate custom ones
322    let mut engine = Engine::new_empty();
323
324    // Register our custom functions
325    engine.register_task_function(
326        "statistics".to_string(),
327        Box::new(StatisticsFunction::new()),
328    );
329
330    engine.register_task_function(
331        "enrich_data".to_string(),
332        Box::new(DataEnrichmentFunction::new()),
333    );
334
335    // Also register built-in map function for data preparation
336    engine.register_task_function(
337        "map".to_string(),
338        Box::new(dataflow_rs::engine::functions::MapFunction::new()),
339    );
340
341    // Define a workflow that uses our custom functions
342    let workflow_json = r#"
343    {
344        "id": "custom_function_demo",
345        "name": "Custom Function Demo",
346        "description": "Demonstrates custom async functions in workflow",
347        "condition": { "==": [true, true] },
348        "tasks": [
349            {
350                "id": "prepare_data",
351                "name": "Prepare Data",
352                "description": "Extract and prepare data for analysis",
353                "function": {
354                    "name": "map",
355                    "input": {
356                        "mappings": [
357                            {
358                                "path": "data.numbers",
359                                "logic": { "var": "temp_data.measurements" }
360                            },
361                            {
362                                "path": "data.user_id",
363                                "logic": { "var": "temp_data.user_id" }
364                            }
365                        ]
366                    }
367                }
368            },
369            {
370                "id": "calculate_stats",
371                "name": "Calculate Statistics",
372                "description": "Calculate statistical measures from numeric data",
373                "function": {
374                    "name": "statistics",
375                    "input": {
376                        "data_path": "data.numbers",
377                        "output_path": "data.stats"
378                    }
379                }
380            },
381            {
382                "id": "enrich_user_data",
383                "name": "Enrich User Data",
384                "description": "Add additional user information",
385                "function": {
386                    "name": "enrich_data",
387                    "input": {
388                        "lookup_field": "user_id",
389                        "lookup_value": "user_123",
390                        "output_path": "data.user_info"
391                    }
392                }
393            }
394        ]
395    }
396    "#;
397
398    // Parse and add the workflow
399    let workflow = Workflow::from_json(workflow_json)?;
400    engine.add_workflow(&workflow);
401
402    // Create sample data
403    let sample_data = json!({
404        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
405        "user_id": "user_123",
406        "timestamp": "2024-01-15T10:30:00Z"
407    });
408
409    // Create and process message
410    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
411    message.temp_data = sample_data;
412    message.data = json!({});
413
414    println!("Processing message with custom functions...\n");
415
416    // Process the message through our custom workflow
417    match engine.process_message(&mut message).await {
418        Ok(_) => {
419            println!("✅ Message processed successfully!\n");
420
421            println!("📊 Final Results:");
422            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
423
424            println!("📋 Audit Trail:");
425            for (i, audit) in message.audit_trail.iter().enumerate() {
426                println!(
427                    "{}. Task: {} (Status: {})",
428                    i + 1,
429                    audit.task_id,
430                    audit.status_code
431                );
432                println!("   Timestamp: {}", audit.timestamp);
433                println!("   Changes: {} field(s) modified", audit.changes.len());
434            }
435
436            if message.has_errors() {
437                println!("\n⚠️  Errors encountered:");
438                for error in &message.errors {
439                    println!(
440                        "   - {}: {:?}",
441                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
442                        error.error_message
443                    );
444                }
445            }
446        }
447        Err(e) => {
448            println!("❌ Error processing message: {e:?}");
449        }
450    }
451
452    // Demonstrate another example with different data
453    let separator = "=".repeat(50);
454    println!("\n{separator}");
455    println!("=== Second Example with Different User ===\n");
456
457    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
458    message2.temp_data = json!({
459        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
460        "user_id": "user_456",
461        "timestamp": "2024-01-15T11:00:00Z"
462    });
463    message2.data = json!({});
464
465    // Create a workflow for the second user
466    let workflow2_json = r#"
467    {
468        "id": "custom_function_demo_2",
469        "name": "Custom Function Demo 2",
470        "description": "Second demo with different user",
471        "condition": { "==": [true, true] },
472        "tasks": [
473            {
474                "id": "prepare_data",
475                "name": "Prepare Data",
476                "function": {
477                    "name": "map",
478                    "input": {
479                        "mappings": [
480                            {
481                                "path": "data.numbers",
482                                "logic": { "var": "temp_data.measurements" }
483                            },
484                            {
485                                "path": "data.user_id",
486                                "logic": { "var": "temp_data.user_id" }
487                            }
488                        ]
489                    }
490                }
491            },
492            {
493                "id": "calculate_stats",
494                "name": "Calculate Statistics",
495                "function": {
496                    "name": "statistics",
497                    "input": {
498                        "data_path": "data.numbers",
499                        "output_path": "data.analysis"
500                    }
501                }
502            },
503            {
504                "id": "enrich_user_data",
505                "name": "Enrich User Data",
506                "function": {
507                    "name": "enrich_data",
508                    "input": {
509                        "lookup_field": "user_id",
510                        "lookup_value": "user_456",
511                        "output_path": "data.employee_details"
512                    }
513                }
514            }
515        ]
516    }
517    "#;
518
519    let workflow2 = Workflow::from_json(workflow2_json)?;
520    engine.add_workflow(&workflow2);
521
522    match engine.process_message(&mut message2).await {
523        Ok(_) => {
524            println!("✅ Second message processed successfully!\n");
525            println!("📊 Results for user_456:");
526            println!("{}", serde_json::to_string_pretty(&message2.data)?);
527        }
528        Err(e) => {
529            println!("❌ Error processing second message: {e:?}");
530        }
531    }
532
533    println!("\n🎉 Custom function examples completed!");
534
535    Ok(())
536}
Source

pub fn reload_workflows(&self, new_workflows: Vec<Workflow>) -> Result<()>

Reload all workflows atomically

Source

pub fn register_task_function( &mut self, name: String, handler: Box<dyn AsyncFunctionHandler + Send + Sync>, )

Registers a custom function handler with the engine.

§Arguments
  • name - The name of the function handler
  • handler - The function handler implementation
Examples found in repository?
examples/custom_function.rs (lines 325-328)
318async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
319    println!("=== Custom Function Example ===\n");
320
321    // Create engine without built-in functions to demonstrate custom ones
322    let mut engine = Engine::new_empty();
323
324    // Register our custom functions
325    engine.register_task_function(
326        "statistics".to_string(),
327        Box::new(StatisticsFunction::new()),
328    );
329
330    engine.register_task_function(
331        "enrich_data".to_string(),
332        Box::new(DataEnrichmentFunction::new()),
333    );
334
335    // Also register built-in map function for data preparation
336    engine.register_task_function(
337        "map".to_string(),
338        Box::new(dataflow_rs::engine::functions::MapFunction::new()),
339    );
340
341    // Define a workflow that uses our custom functions
342    let workflow_json = r#"
343    {
344        "id": "custom_function_demo",
345        "name": "Custom Function Demo",
346        "description": "Demonstrates custom async functions in workflow",
347        "condition": { "==": [true, true] },
348        "tasks": [
349            {
350                "id": "prepare_data",
351                "name": "Prepare Data",
352                "description": "Extract and prepare data for analysis",
353                "function": {
354                    "name": "map",
355                    "input": {
356                        "mappings": [
357                            {
358                                "path": "data.numbers",
359                                "logic": { "var": "temp_data.measurements" }
360                            },
361                            {
362                                "path": "data.user_id",
363                                "logic": { "var": "temp_data.user_id" }
364                            }
365                        ]
366                    }
367                }
368            },
369            {
370                "id": "calculate_stats",
371                "name": "Calculate Statistics",
372                "description": "Calculate statistical measures from numeric data",
373                "function": {
374                    "name": "statistics",
375                    "input": {
376                        "data_path": "data.numbers",
377                        "output_path": "data.stats"
378                    }
379                }
380            },
381            {
382                "id": "enrich_user_data",
383                "name": "Enrich User Data",
384                "description": "Add additional user information",
385                "function": {
386                    "name": "enrich_data",
387                    "input": {
388                        "lookup_field": "user_id",
389                        "lookup_value": "user_123",
390                        "output_path": "data.user_info"
391                    }
392                }
393            }
394        ]
395    }
396    "#;
397
398    // Parse and add the workflow
399    let workflow = Workflow::from_json(workflow_json)?;
400    engine.add_workflow(&workflow);
401
402    // Create sample data
403    let sample_data = json!({
404        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
405        "user_id": "user_123",
406        "timestamp": "2024-01-15T10:30:00Z"
407    });
408
409    // Create and process message
410    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
411    message.temp_data = sample_data;
412    message.data = json!({});
413
414    println!("Processing message with custom functions...\n");
415
416    // Process the message through our custom workflow
417    match engine.process_message(&mut message).await {
418        Ok(_) => {
419            println!("✅ Message processed successfully!\n");
420
421            println!("📊 Final Results:");
422            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
423
424            println!("📋 Audit Trail:");
425            for (i, audit) in message.audit_trail.iter().enumerate() {
426                println!(
427                    "{}. Task: {} (Status: {})",
428                    i + 1,
429                    audit.task_id,
430                    audit.status_code
431                );
432                println!("   Timestamp: {}", audit.timestamp);
433                println!("   Changes: {} field(s) modified", audit.changes.len());
434            }
435
436            if message.has_errors() {
437                println!("\n⚠️  Errors encountered:");
438                for error in &message.errors {
439                    println!(
440                        "   - {}: {:?}",
441                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
442                        error.error_message
443                    );
444                }
445            }
446        }
447        Err(e) => {
448            println!("❌ Error processing message: {e:?}");
449        }
450    }
451
452    // Demonstrate another example with different data
453    let separator = "=".repeat(50);
454    println!("\n{separator}");
455    println!("=== Second Example with Different User ===\n");
456
457    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
458    message2.temp_data = json!({
459        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
460        "user_id": "user_456",
461        "timestamp": "2024-01-15T11:00:00Z"
462    });
463    message2.data = json!({});
464
465    // Create a workflow for the second user
466    let workflow2_json = r#"
467    {
468        "id": "custom_function_demo_2",
469        "name": "Custom Function Demo 2",
470        "description": "Second demo with different user",
471        "condition": { "==": [true, true] },
472        "tasks": [
473            {
474                "id": "prepare_data",
475                "name": "Prepare Data",
476                "function": {
477                    "name": "map",
478                    "input": {
479                        "mappings": [
480                            {
481                                "path": "data.numbers",
482                                "logic": { "var": "temp_data.measurements" }
483                            },
484                            {
485                                "path": "data.user_id",
486                                "logic": { "var": "temp_data.user_id" }
487                            }
488                        ]
489                    }
490                }
491            },
492            {
493                "id": "calculate_stats",
494                "name": "Calculate Statistics",
495                "function": {
496                    "name": "statistics",
497                    "input": {
498                        "data_path": "data.numbers",
499                        "output_path": "data.analysis"
500                    }
501                }
502            },
503            {
504                "id": "enrich_user_data",
505                "name": "Enrich User Data",
506                "function": {
507                    "name": "enrich_data",
508                    "input": {
509                        "lookup_field": "user_id",
510                        "lookup_value": "user_456",
511                        "output_path": "data.employee_details"
512                    }
513                }
514            }
515        ]
516    }
517    "#;
518
519    let workflow2 = Workflow::from_json(workflow2_json)?;
520    engine.add_workflow(&workflow2);
521
522    match engine.process_message(&mut message2).await {
523        Ok(_) => {
524            println!("✅ Second message processed successfully!\n");
525            println!("📊 Results for user_456:");
526            println!("{}", serde_json::to_string_pretty(&message2.data)?);
527        }
528        Err(e) => {
529            println!("❌ Error processing second message: {e:?}");
530        }
531    }
532
533    println!("\n🎉 Custom function examples completed!");
534
535    Ok(())
536}
Source

pub fn has_function(&self, name: &str) -> bool

Check if a function with the given name is registered

Source

pub async fn process_message(&self, message: &mut Message) -> Result<()>

Processes a message through workflows that match their conditions.

This async method:

  1. Iterates through workflows sequentially in deterministic order (sorted by ID)
  2. Evaluates conditions for each workflow right before execution
  3. Executes matching workflows one after another (not concurrently)
  4. Updates the message with processing results and audit trail

Workflows are executed sequentially because later workflows may depend on the results of earlier workflows, and their conditions may change based on modifications made by previous workflows.

§Arguments
  • message - The message to process
§Returns
  • Result<()> - Success or an error if processing failed
Examples found in repository?
examples/benchmark.rs (line 232)
210async fn run_sequential_benchmark(
211    engine: &Engine,
212    sample_user_data: &Value,
213    num_iterations: usize,
214) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
215    let mut total_duration = Duration::new(0, 0);
216    let mut min_duration = Duration::new(u64::MAX, 0);
217    let mut max_duration = Duration::new(0, 0);
218    let mut all_durations = Vec::with_capacity(num_iterations);
219    let mut error_count = 0;
220
221    // Sequential processing - one message at a time
222    for i in 0..num_iterations {
223        let mut message = Message::new(&json!({}));
224        message.temp_data = sample_user_data.clone();
225        message.data = json!({});
226        message.metadata = json!({
227            "timestamp": chrono::Utc::now().to_rfc3339(),
228            "iteration": i
229        });
230
231        let start = Instant::now();
232        match engine.process_message(&mut message).await {
233            Ok(_) => {
234                let duration = start.elapsed();
235                all_durations.push(duration);
236                total_duration += duration;
237                min_duration = min_duration.min(duration);
238                max_duration = max_duration.max(duration);
239
240                // Check for processing errors
241                if message.has_errors() {
242                    error_count += 1;
243                    if error_count <= 5 {
244                        // Only print first 5 errors
245                        println!("Processing errors in iteration {}: {:?}", i, message.errors);
246                    }
247                }
248            }
249            Err(e) => {
250                error_count += 1;
251                if error_count <= 5 {
252                    println!("Error in iteration {i}: {e:?}");
253                }
254                // Still record the time even for errors
255                let duration = start.elapsed();
256                all_durations.push(duration);
257                total_duration += duration;
258                min_duration = min_duration.min(duration);
259                max_duration = max_duration.max(duration);
260            }
261        }
262    }
263
264    if error_count > 0 {
265        println!("Total errors encountered: {error_count}");
266    }
267
268    // Sort durations for percentile calculations
269    all_durations.sort();
270
271    let p95_idx = (num_iterations as f64 * 0.95) as usize;
272    let p99_idx = (num_iterations as f64 * 0.99) as usize;
273    let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
274    let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
275    let avg_duration = total_duration / num_iterations as u32;
276
277    Ok(BenchmarkResults {
278        min_time: min_duration,
279        max_time: max_duration,
280        avg_time: avg_duration,
281        p95: *p95,
282        p99: *p99,
283        total_time: total_duration,
284    })
285}
286
287async fn run_concurrent_benchmark(
288    engine: Arc<Engine>,
289    sample_user_data: &Value,
290    num_iterations: usize,
291    concurrent_tasks: usize,
292) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
293    let start_time = Instant::now();
294    let mut all_durations = Vec::with_capacity(num_iterations);
295    let mut error_count = 0;
296
297    // Concurrent processing using JoinSet
298    let mut tasks = JoinSet::new();
299
300    for i in 0..num_iterations {
301        let engine_clone = engine.clone();
302        let data_clone = sample_user_data.clone();
303
304        // Spawn concurrent tasks
305        tasks.spawn(async move {
306            let mut message = Message::new(&json!({}));
307            message.temp_data = data_clone;
308            message.data = json!({});
309            message.metadata = json!({
310                "timestamp": chrono::Utc::now().to_rfc3339(),
311                "iteration": i
312            });
313
314            let msg_start = Instant::now();
315            let result = engine_clone.process_message(&mut message).await;
316            let duration = msg_start.elapsed();
317
318            (duration, result.is_ok(), message.has_errors())
319        });
320
321        // Limit concurrent tasks
322        while tasks.len() >= concurrent_tasks {
323            // Wait for at least one task to complete
324            if let Some(Ok((duration, ok, has_errors))) = tasks.join_next().await {
325                all_durations.push(duration);
326                if !ok || has_errors {
327                    error_count += 1;
328                }
329            }
330        }
331    }
332
333    // Wait for remaining tasks
334    while let Some(Ok((duration, ok, has_errors))) = tasks.join_next().await {
335        all_durations.push(duration);
336        if !ok || has_errors {
337            error_count += 1;
338        }
339    }
340
341    let total_time = start_time.elapsed();
342
343    if error_count > 0 {
344        println!("Total errors encountered: {error_count}");
345    }
346
347    // Calculate statistics
348    all_durations.sort();
349    let min_duration = *all_durations.first().unwrap_or(&Duration::ZERO);
350    let max_duration = *all_durations.last().unwrap_or(&Duration::ZERO);
351    let sum: Duration = all_durations.iter().sum();
352    let avg_duration = sum / all_durations.len() as u32;
353
354    let p95_idx = (all_durations.len() as f64 * 0.95) as usize;
355    let p99_idx = (all_durations.len() as f64 * 0.99) as usize;
356    let p95 = *all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
357    let p99 = *all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
358
359    Ok(BenchmarkResults {
360        min_time: min_duration,
361        max_time: max_duration,
362        avg_time: avg_duration,
363        p95,
364        p99,
365        total_time,
366    })
367}
More examples
Hide additional examples
examples/complete_workflow.rs (line 141)
5async fn main() -> Result<(), Box<dyn std::error::Error>> {
6    // Create the workflow engine (built-in functions are auto-registered)
7    let engine = Engine::new();
8
9    // Define a workflow that:
10    // 1. Fetches data from a public API
11    // 2. Enriches the message with transformed data
12    // 3. Validates the enriched data
13    let workflow_json = r#"
14    {
15        "id": "complete_workflow",
16        "name": "Complete Workflow Example",
17        "priority": 0,
18        "description": "Demonstrates fetch -> enrich -> validate flow",
19        "condition": { "==": [true, true] },
20        "tasks": [
21            {
22                "id": "fetch_user_data",
23                "name": "Fetch User Data",
24                "description": "Get user data from a public API",
25                "function": {
26                    "name": "http",
27                    "input": {
28                        "url": "https://jsonplaceholder.typicode.com/users/1",
29                        "method": "GET",
30                        "headers": {
31                            "Accept": "application/json"
32                        }
33                    }
34                }
35            },
36            {
37                "id": "initialize_user",
38                "name": "Initialize User Structure",
39                "description": "Create empty user object in data",
40                "function": {
41                    "name": "map",
42                    "input": {
43                        "mappings": [
44                            {
45                                "path": "data",
46                                "logic": { "preserve": {"user": {}} }
47                            }
48                        ]
49                    }
50                }
51            },
52            {
53                "id": "transform_data",
54                "name": "Transform Data",
55                "description": "Map API response to our data model",
56                "function": {
57                    "name": "map",
58                    "input": {
59                        "mappings": [
60                            {
61                                "path": "data.user.id", 
62                                "logic": { "var": "temp_data.body.id" }
63                            },
64                            {
65                                "path": "data.user.name", 
66                                "logic": { "var": "temp_data.body.name" }
67                            },
68                            {
69                                "path": "data.user.email", 
70                                "logic": { "var": "temp_data.body.email" }
71                            },
72                            {
73                                "path": "data.user.address", 
74                                "logic": {
75                                    "cat": [
76                                        { "var": "temp_data.body.address.street" },
77                                        ", ",
78                                        { "var": "temp_data.body.address.city" }
79                                    ]
80                                }
81                            },
82                            {
83                                "path": "data.user.company", 
84                                "logic": { "var": "temp_data.body.company.name" }
85                            }
86                        ]
87                    }
88                }
89            },
90            {
91                "id": "validate_user_data",
92                "name": "Validate User Data",
93                "description": "Ensure the user data meets our requirements",
94                "function": {
95                    "name": "validate",
96                    "input": {
97                        "rules": [
98                            {
99                                "path": "data",
100                                "logic": { "!!": { "var": "data.user.id" } },
101                                "message": "User ID is required"
102                            },
103                            {
104                                "path": "data",
105                                "logic": { "!!": { "var": "data.user.name" } },
106                                "message": "User name is required"
107                            },
108                            {
109                                "path": "data",
110                                "logic": { "!!": { "var": "data.user.email" } },
111                                "message": "User email is required"
112                            },
113                            {
114                                "path": "data",
115                                "logic": {
116                                    "in": [
117                                        "@",
118                                        { "var": "data.user.email" }
119                                    ]
120                                },
121                                "message": "Email must be valid format"
122                            }
123                        ]
124                    }
125                }
126            }
127        ]
128    }
129    "#;
130
131    // Parse and add the workflow to the engine
132    let workflow = Workflow::from_json(workflow_json)?;
133    engine.add_workflow(&workflow);
134
135    // Create a message to process with properly initialized data structure
136    let mut message = Message::new(&json!({}));
137
138    // Process the message through the workflow asynchronously
139    println!("Processing message through workflow...");
140
141    match engine.process_message(&mut message).await {
142        Ok(_) => {
143            println!("Workflow completed successfully!");
144        }
145        Err(e) => {
146            eprintln!("Error executing workflow: {e:?}");
147            if !message.errors.is_empty() {
148                println!("\nErrors recorded in message:");
149                for err in &message.errors {
150                    println!(
151                        "- Workflow: {:?}, Task: {:?}, Error: {:?}",
152                        err.workflow_id, err.task_id, err.error_message
153                    );
154                }
155            }
156        }
157    }
158
159    println!(
160        "\nFull message structure:\n{}",
161        serde_json::to_string_pretty(&message)?
162    );
163
164    Ok(())
165}
examples/custom_function.rs (line 417)
318async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
319    println!("=== Custom Function Example ===\n");
320
321    // Create engine without built-in functions to demonstrate custom ones
322    let mut engine = Engine::new_empty();
323
324    // Register our custom functions
325    engine.register_task_function(
326        "statistics".to_string(),
327        Box::new(StatisticsFunction::new()),
328    );
329
330    engine.register_task_function(
331        "enrich_data".to_string(),
332        Box::new(DataEnrichmentFunction::new()),
333    );
334
335    // Also register built-in map function for data preparation
336    engine.register_task_function(
337        "map".to_string(),
338        Box::new(dataflow_rs::engine::functions::MapFunction::new()),
339    );
340
341    // Define a workflow that uses our custom functions
342    let workflow_json = r#"
343    {
344        "id": "custom_function_demo",
345        "name": "Custom Function Demo",
346        "description": "Demonstrates custom async functions in workflow",
347        "condition": { "==": [true, true] },
348        "tasks": [
349            {
350                "id": "prepare_data",
351                "name": "Prepare Data",
352                "description": "Extract and prepare data for analysis",
353                "function": {
354                    "name": "map",
355                    "input": {
356                        "mappings": [
357                            {
358                                "path": "data.numbers",
359                                "logic": { "var": "temp_data.measurements" }
360                            },
361                            {
362                                "path": "data.user_id",
363                                "logic": { "var": "temp_data.user_id" }
364                            }
365                        ]
366                    }
367                }
368            },
369            {
370                "id": "calculate_stats",
371                "name": "Calculate Statistics",
372                "description": "Calculate statistical measures from numeric data",
373                "function": {
374                    "name": "statistics",
375                    "input": {
376                        "data_path": "data.numbers",
377                        "output_path": "data.stats"
378                    }
379                }
380            },
381            {
382                "id": "enrich_user_data",
383                "name": "Enrich User Data",
384                "description": "Add additional user information",
385                "function": {
386                    "name": "enrich_data",
387                    "input": {
388                        "lookup_field": "user_id",
389                        "lookup_value": "user_123",
390                        "output_path": "data.user_info"
391                    }
392                }
393            }
394        ]
395    }
396    "#;
397
398    // Parse and add the workflow
399    let workflow = Workflow::from_json(workflow_json)?;
400    engine.add_workflow(&workflow);
401
402    // Create sample data
403    let sample_data = json!({
404        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
405        "user_id": "user_123",
406        "timestamp": "2024-01-15T10:30:00Z"
407    });
408
409    // Create and process message
410    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
411    message.temp_data = sample_data;
412    message.data = json!({});
413
414    println!("Processing message with custom functions...\n");
415
416    // Process the message through our custom workflow
417    match engine.process_message(&mut message).await {
418        Ok(_) => {
419            println!("✅ Message processed successfully!\n");
420
421            println!("📊 Final Results:");
422            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
423
424            println!("📋 Audit Trail:");
425            for (i, audit) in message.audit_trail.iter().enumerate() {
426                println!(
427                    "{}. Task: {} (Status: {})",
428                    i + 1,
429                    audit.task_id,
430                    audit.status_code
431                );
432                println!("   Timestamp: {}", audit.timestamp);
433                println!("   Changes: {} field(s) modified", audit.changes.len());
434            }
435
436            if message.has_errors() {
437                println!("\n⚠️  Errors encountered:");
438                for error in &message.errors {
439                    println!(
440                        "   - {}: {:?}",
441                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
442                        error.error_message
443                    );
444                }
445            }
446        }
447        Err(e) => {
448            println!("❌ Error processing message: {e:?}");
449        }
450    }
451
452    // Demonstrate another example with different data
453    let separator = "=".repeat(50);
454    println!("\n{separator}");
455    println!("=== Second Example with Different User ===\n");
456
457    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
458    message2.temp_data = json!({
459        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
460        "user_id": "user_456",
461        "timestamp": "2024-01-15T11:00:00Z"
462    });
463    message2.data = json!({});
464
465    // Create a workflow for the second user
466    let workflow2_json = r#"
467    {
468        "id": "custom_function_demo_2",
469        "name": "Custom Function Demo 2",
470        "description": "Second demo with different user",
471        "condition": { "==": [true, true] },
472        "tasks": [
473            {
474                "id": "prepare_data",
475                "name": "Prepare Data",
476                "function": {
477                    "name": "map",
478                    "input": {
479                        "mappings": [
480                            {
481                                "path": "data.numbers",
482                                "logic": { "var": "temp_data.measurements" }
483                            },
484                            {
485                                "path": "data.user_id",
486                                "logic": { "var": "temp_data.user_id" }
487                            }
488                        ]
489                    }
490                }
491            },
492            {
493                "id": "calculate_stats",
494                "name": "Calculate Statistics",
495                "function": {
496                    "name": "statistics",
497                    "input": {
498                        "data_path": "data.numbers",
499                        "output_path": "data.analysis"
500                    }
501                }
502            },
503            {
504                "id": "enrich_user_data",
505                "name": "Enrich User Data",
506                "function": {
507                    "name": "enrich_data",
508                    "input": {
509                        "lookup_field": "user_id",
510                        "lookup_value": "user_456",
511                        "output_path": "data.employee_details"
512                    }
513                }
514            }
515        ]
516    }
517    "#;
518
519    let workflow2 = Workflow::from_json(workflow2_json)?;
520    engine.add_workflow(&workflow2);
521
522    match engine.process_message(&mut message2).await {
523        Ok(_) => {
524            println!("✅ Second message processed successfully!\n");
525            println!("📊 Results for user_456:");
526            println!("{}", serde_json::to_string_pretty(&message2.data)?);
527        }
528        Err(e) => {
529            println!("❌ Error processing second message: {e:?}");
530        }
531    }
532
533    println!("\n🎉 Custom function examples completed!");
534
535    Ok(())
536}
Source

pub async fn process_message_concurrent( &self, message: &mut Message, ) -> Result<()>

Process a message with automatic concurrency control. This method will wait if the maximum concurrency level is reached.

Use this when spawning concurrent tasks to ensure the engine’s concurrency limit is respected.

§Example
use tokio::task::JoinSet;
let mut tasks = JoinSet::new();

for msg in messages {
    let engine = engine.clone();
    tasks.spawn(async move {
        engine.process_message_concurrent(msg).await
    });
}

Trait Implementations§

Source§

impl Default for Engine

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

§

impl Freeze for Engine

§

impl !RefUnwindSafe for Engine

§

impl Send for Engine

§

impl Sync for Engine

§

impl Unpin for Engine

§

impl !UnwindSafe for Engine

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,