Engine

Struct Engine 

Source
pub struct Engine { /* private fields */ }
Expand description

High-performance workflow engine for message processing.

§Architecture

The engine features a modular design optimized for both IO-bound and CPU-bound workloads:

  • Separation of Concerns: Compiler handles pre-compilation, Executor handles runtime
  • Direct DataLogic: Single DataLogic instance per engine for zero contention
  • Immutable Workflows: All workflows compiled and cached at initialization
  • Pre-compiled Logic: JSONLogic expressions compiled once for optimal performance

§Performance Characteristics

  • Zero Runtime Compilation: All logic compiled during initialization
  • Cache-Friendly: Compiled logic stored in contiguous memory
  • Predictable Latency: No runtime allocations for logic evaluation
  • Thread-Safe Design: Applications can safely use multiple engine instances across threads

Implementations§

Source§

impl Engine

Source

pub fn new( workflows: Vec<Workflow>, custom_functions: Option<HashMap<String, Box<dyn FunctionHandler + Send + Sync>>>, retry_config: Option<RetryConfig>, ) -> Self

Creates a new Engine instance with configurable parameters.

§Arguments
  • workflows - The workflows to use for processing messages
  • custom_functions - Optional custom function handlers (None uses empty map)
  • include_builtins - Optional flag to include built-in functions (defaults to true if None)
  • retry_config - Optional retry configuration (uses default if None)
§Example
use dataflow_rs::{Engine, Workflow};

let workflows = vec![Workflow::from_json(r#"{"id": "test", "name": "Test", "priority": 0, "tasks": []}"#).unwrap()];

// Simple usage with defaults
let mut engine = Engine::new(workflows.clone(), None, None);
Examples found in repository?
examples/complete_workflow.rs (line 115)
4fn main() -> Result<(), Box<dyn std::error::Error>> {
5    // Define a workflow that:
6    // 1. Prepares sample user data
7    // 2. Enriches the message with transformed data
8    // 3. Validates the enriched data
9    let workflow_json = r#"
10    {
11        "id": "complete_workflow",
12        "name": "Complete Workflow Example",
13        "priority": 0,
14        "description": "Demonstrates enrich -> validate flow",
15        "tasks": [
16            {
17                "id": "initialize_user",
18                "name": "Initialize User Structure",
19                "description": "Create empty user object in data",
20                "function": {
21                    "name": "map",
22                    "input": {
23                        "mappings": [
24                            {
25                                "path": "data.user",
26                                "logic": {}
27                            }
28                        ]
29                    }
30                }
31            },
32            {
33                "id": "transform_data",
34                "name": "Transform Data",
35                "description": "Map API response to our data model",
36                "function": {
37                    "name": "map",
38                    "input": {
39                        "mappings": [
40                            {
41                                "path": "data.user.id", 
42                                "logic": { "var": "temp_data.body.id" }
43                            },
44                            {
45                                "path": "data.user.name", 
46                                "logic": { "var": "temp_data.body.name" }
47                            },
48                            {
49                                "path": "data.user.email", 
50                                "logic": { "var": "temp_data.body.email" }
51                            },
52                            {
53                                "path": "data.user.address", 
54                                "logic": {
55                                    "cat": [
56                                        { "var": "temp_data.body.address.street" },
57                                        ", ",
58                                        { "var": "temp_data.body.address.city" }
59                                    ]
60                                }
61                            },
62                            {
63                                "path": "data.user.company", 
64                                "logic": { "var": "temp_data.body.company.name" }
65                            }
66                        ]
67                    }
68                }
69            },
70            {
71                "id": "validate_user_data",
72                "name": "Validate User Data",
73                "description": "Ensure the user data meets our requirements",
74                "function": {
75                    "name": "validate",
76                    "input": {
77                        "rules": [
78                            {
79                                "path": "data",
80                                "logic": { "!!": { "var": "data.user.id" } },
81                                "message": "User ID is required"
82                            },
83                            {
84                                "path": "data",
85                                "logic": { "!!": { "var": "data.user.name" } },
86                                "message": "User name is required"
87                            },
88                            {
89                                "path": "data",
90                                "logic": { "!!": { "var": "data.user.email" } },
91                                "message": "User email is required"
92                            },
93                            {
94                                "path": "data",
95                                "logic": {
96                                    "in": [
97                                        "@",
98                                        { "var": "data.user.email" }
99                                    ]
100                                },
101                                "message": "Email must be valid format"
102                            }
103                        ]
104                    }
105                }
106            }
107        ]
108    }
109    "#;
110
111    // Parse the workflow
112    let workflow = Workflow::from_json(workflow_json)?;
113
114    // Create the workflow engine with the workflow (built-in functions are auto-registered by default)
115    let mut engine = Engine::new(vec![workflow], None, None);
116
117    // Create a message to process with sample user data
118    let mut message = Message::new(&json!({}));
119
120    // Add sample user data to temp_data (simulating what would come from an API)
121    message.temp_data = json!({
122        "body": {
123            "id": 1,
124            "name": "John Doe",
125            "email": "john.doe@example.com",
126            "address": {
127                "street": "123 Main St",
128                "city": "New York"
129            },
130            "company": {
131                "name": "Acme Corp"
132            }
133        }
134    });
135
136    // Process the message through the workflow
137    println!("Processing message through workflow...");
138
139    match engine.process_message(&mut message) {
140        Ok(_) => {
141            println!("Workflow completed successfully!");
142        }
143        Err(e) => {
144            eprintln!("Error executing workflow: {e:?}");
145            if !message.errors.is_empty() {
146                println!("\nErrors recorded in message:");
147                for err in &message.errors {
148                    println!(
149                        "- Workflow: {:?}, Task: {:?}, Error: {:?}",
150                        err.workflow_id, err.task_id, err.error_message
151                    );
152                }
153            }
154        }
155    }
156
157    println!(
158        "\nFull message structure:\n{}",
159        serde_json::to_string_pretty(&message)?
160    );
161
162    Ok(())
163}
More examples
Hide additional examples
examples/benchmark.rs (line 92)
7fn main() -> Result<(), Box<dyn std::error::Error>> {
8    println!("========================================");
9    println!("DATAFLOW ENGINE BENCHMARK");
10    println!("========================================\n");
11    println!(
12        "Running {} iterations on single-threaded engine\n",
13        ITERATIONS
14    );
15
16    // Define a simple workflow with data transformation
17    let workflow_json = r#"
18    {
19        "id": "benchmark_workflow",
20        "name": "Benchmark Workflow",
21        "description": "Simple workflow for performance testing",
22        "priority": 1,
23        "tasks": [
24            {
25                "id": "transform_data",
26                "name": "Transform Data",
27                "description": "Map data fields",
28                "function": {
29                    "name": "map",
30                    "input": {
31                        "mappings": [
32                            {
33                                "path": "data.user.id", 
34                                "logic": { "var": "temp_data.id" }
35                            },
36                            {
37                                "path": "data.user.name", 
38                                "logic": { "var": "temp_data.name" }
39                            },
40                            {
41                                "path": "data.user.email", 
42                                "logic": { "var": "temp_data.email" }
43                            },
44                            {
45                                "path": "data.user.age",
46                                "logic": { "+": [{ "var": "temp_data.age" }, 1] }
47                            },
48                            {
49                                "path": "data.user.status",
50                                "logic": { 
51                                    "if": [
52                                        { ">": [{ "var": "temp_data.age" }, 18] },
53                                        "adult",
54                                        "minor"
55                                    ]
56                                }
57                            }
58                        ]
59                    }
60                }
61            },
62            {
63                "id": "validate_data",
64                "name": "Validate Data",
65                "description": "Validate transformed data",
66                "function": {
67                    "name": "validate",
68                    "input": {
69                        "rules": [
70                            {
71                                "path": "data",
72                                "logic": { "!!": { "var": "data.user.id" } },
73                                "message": "User ID is required"
74                            },
75                            {
76                                "path": "data",
77                                "logic": { "!!": { "var": "data.user.email" } },
78                                "message": "User email is required"
79                            }
80                        ]
81                    }
82                }
83            }
84        ]
85    }
86    "#;
87
88    // Parse the workflow
89    let workflow = Workflow::from_json(workflow_json)?;
90
91    // Create the engine with built-in functions
92    let mut engine = Engine::new(vec![workflow], None, None);
93
94    // Sample data for benchmarking
95    let sample_data = json!({
96        "id": 12345,
97        "name": "John Doe",
98        "email": "john.doe@example.com",
99        "age": 25,
100        "department": "Engineering"
101    });
102
103    // Warm-up run
104    println!("Warming up...");
105    for _ in 0..1000 {
106        let mut message = Message::new(&json!({}));
107        message.temp_data = sample_data.clone();
108        let _ = engine.process_message(&mut message);
109    }
110
111    // Benchmark run
112    println!("Starting benchmark...\n");
113
114    let mut all_durations = Vec::with_capacity(ITERATIONS);
115    let mut success_count = 0;
116    let mut error_count = 0;
117
118    let benchmark_start = Instant::now();
119
120    for i in 0..ITERATIONS {
121        let mut message = Message::new(&json!({}));
122        message.temp_data = sample_data.clone();
123        message.metadata = json!({
124            "iteration": i,
125            "timestamp": chrono::Utc::now().to_rfc3339()
126        });
127
128        let iteration_start = Instant::now();
129        match engine.process_message(&mut message) {
130            Ok(_) => {
131                success_count += 1;
132                if message.has_errors() {
133                    error_count += 1;
134                }
135            }
136            Err(_) => {
137                error_count += 1;
138            }
139        }
140        let iteration_duration = iteration_start.elapsed();
141        all_durations.push(iteration_duration);
142
143        // Progress indicator every 10k iterations
144        if (i + 1) % 10000 == 0 {
145            print!(".");
146            use std::io::Write;
147            std::io::stdout().flush()?;
148        }
149    }
150
151    let total_time = benchmark_start.elapsed();
152    println!("\n\nBenchmark Complete!");
153    println!("==========================================\n");
154
155    // Calculate statistics
156    all_durations.sort_unstable();
157    let p50 = all_durations[ITERATIONS * 50 / 100];
158    let p90 = all_durations[ITERATIONS * 90 / 100];
159    let p95 = all_durations[ITERATIONS * 95 / 100];
160    let p99 = all_durations[ITERATIONS * 99 / 100];
161    let throughput = ITERATIONS as f64 / total_time.as_secs_f64();
162
163    // Display results
164    println!("📊 PERFORMANCE METRICS");
165    println!("──────────────────────────────────────────");
166    println!("Total iterations:    {:>10}", ITERATIONS);
167    println!("Successful:          {:>10}", success_count);
168    println!("Errors:              {:>10}", error_count);
169    println!(
170        "Total time:          {:>10.3} seconds",
171        total_time.as_secs_f64()
172    );
173    println!();
174
175    println!("Messages/second:     {:>10.0}", throughput);
176    println!();
177
178    println!("📉 LATENCY PERCENTILES");
179    println!("──────────────────────────────────────────");
180    println!("P50:                 {:>10.3} μs", p50.as_micros() as f64);
181    println!("P90:                 {:>10.3} μs", p90.as_micros() as f64);
182    println!("P95:                 {:>10.3} μs", p95.as_micros() as f64);
183    println!("P99:                 {:>10.3} μs", p99.as_micros() as f64);
184    println!();
185
186    Ok(())
187}
examples/custom_function.rs (lines 477-481)
334fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
335    println!("=== Custom Function Example ===\n");
336
337    // Define a workflow that uses our custom functions
338    let workflow_json = r#"
339    {
340        "id": "custom_function_demo",
341        "name": "Custom Function Demo",
342        "description": "Demonstrates custom functions in workflow",
343        "tasks": [
344            {
345                "id": "prepare_data",
346                "name": "Prepare Data",
347                "description": "Extract and prepare data for analysis",
348                "function": {
349                    "name": "map",
350                    "input": {
351                        "mappings": [
352                            {
353                                "path": "data.numbers",
354                                "logic": { "var": "temp_data.measurements" }
355                            },
356                            {
357                                "path": "data.user_id",
358                                "logic": { "var": "temp_data.user_id" }
359                            }
360                        ]
361                    }
362                }
363            },
364            {
365                "id": "calculate_stats",
366                "name": "Calculate Statistics",
367                "description": "Calculate statistical measures from numeric data",
368                "function": {
369                    "name": "statistics",
370                    "input": {
371                        "data_path": "data.numbers",
372                        "output_path": "data.stats"
373                    }
374                }
375            },
376            {
377                "id": "enrich_user_data",
378                "name": "Enrich User Data",
379                "description": "Add additional user information",
380                "function": {
381                    "name": "enrich_data",
382                    "input": {
383                        "lookup_field": "user_id",
384                        "lookup_value": "user_123",
385                        "output_path": "data.user_info"
386                    }
387                }
388            }
389        ]
390    }
391    "#;
392
393    // Parse the first workflow
394    let workflow = Workflow::from_json(workflow_json)?;
395
396    // Demonstrate another example with different data
397    let separator = "=".repeat(50);
398    println!("\n{separator}");
399    println!("=== Second Example with Different User ===\n");
400
401    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
402    message2.temp_data = json!({
403        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
404        "user_id": "user_456",
405        "timestamp": "2024-01-15T11:00:00Z"
406    });
407    message2.data = json!({});
408
409    // Create a workflow for the second user
410    let workflow2_json = r#"
411    {
412        "id": "custom_function_demo_2",
413        "name": "Custom Function Demo 2",
414        "description": "Second demo with different user",
415        "tasks": [
416            {
417                "id": "prepare_data",
418                "name": "Prepare Data",
419                "function": {
420                    "name": "map",
421                    "input": {
422                        "mappings": [
423                            {
424                                "path": "data.numbers",
425                                "logic": { "var": "temp_data.measurements" }
426                            },
427                            {
428                                "path": "data.user_id",
429                                "logic": { "var": "temp_data.user_id" }
430                            }
431                        ]
432                    }
433                }
434            },
435            {
436                "id": "calculate_stats",
437                "name": "Calculate Statistics",
438                "function": {
439                    "name": "statistics",
440                    "input": {
441                        "data_path": "data.numbers",
442                        "output_path": "data.analysis"
443                    }
444                }
445            },
446            {
447                "id": "enrich_user_data",
448                "name": "Enrich User Data",
449                "function": {
450                    "name": "enrich_data",
451                    "input": {
452                        "lookup_field": "user_id",
453                        "lookup_value": "user_456",
454                        "output_path": "data.employee_details"
455                    }
456                }
457            }
458        ]
459    }
460    "#;
461
462    let workflow2 = Workflow::from_json(workflow2_json)?;
463
464    // Prepare custom functions
465    let mut custom_functions = HashMap::new();
466    custom_functions.insert(
467        "statistics".to_string(),
468        Box::new(StatisticsFunction::new()) as Box<dyn FunctionHandler + Send + Sync>,
469    );
470    custom_functions.insert(
471        "enrich_data".to_string(),
472        Box::new(DataEnrichmentFunction::new()) as Box<dyn FunctionHandler + Send + Sync>,
473    );
474    // Note: map and validate are now built-in to the Engine and will be used automatically
475
476    // Create engine with custom functions and built-ins (map/validate are always included internally)
477    let mut engine = Engine::new(
478        vec![workflow, workflow2],
479        Some(custom_functions),
480        None, // Use default (includes built-ins)
481    );
482
483    // Create sample data for first message
484    let sample_data = json!({
485        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
486        "user_id": "user_123",
487        "timestamp": "2024-01-15T10:30:00Z"
488    });
489
490    // Create and process first message
491    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
492    message.temp_data = sample_data;
493    message.data = json!({});
494
495    println!("Processing message with custom functions...\n");
496
497    // Process the message through our custom workflow
498    match engine.process_message(&mut message) {
499        Ok(_) => {
500            println!("✅ Message processed successfully!\n");
501
502            println!("📊 Final Results:");
503            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
504
505            println!("📋 Audit Trail:");
506            for (i, audit) in message.audit_trail.iter().enumerate() {
507                println!(
508                    "{}. Task: {} (Status: {})",
509                    i + 1,
510                    audit.task_id,
511                    audit.status_code
512                );
513                println!("   Timestamp: {}", audit.timestamp);
514                println!("   Changes: {} field(s) modified", audit.changes.len());
515            }
516
517            if message.has_errors() {
518                println!("\n⚠️  Errors encountered:");
519                for error in &message.errors {
520                    println!(
521                        "   - {}: {:?}",
522                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
523                        error.error_message
524                    );
525                }
526            }
527        }
528        Err(e) => {
529            println!("❌ Error processing message: {e:?}");
530        }
531    }
532
533    // Process second message
534    match engine.process_message(&mut message2) {
535        Ok(_) => {
536            println!("✅ Second message processed successfully!\n");
537            println!("📊 Results for user_456:");
538            println!("{}", serde_json::to_string_pretty(&message2.data)?);
539        }
540        Err(e) => {
541            println!("❌ Error processing second message: {e:?}");
542        }
543    }
544
545    println!("\n🎉 Custom function examples completed!");
546
547    Ok(())
548}
examples/memory_leak_analysis.rs (line 193)
56fn main() -> Result<(), Box<dyn std::error::Error>> {
57    println!("========================================");
58    println!("MEMORY LEAK ANALYSIS");
59    println!("========================================\n");
60    println!("Warmup iterations: {}", WARMUP_ITERATIONS);
61    println!("Test iterations: {}", TEST_ITERATIONS);
62    println!("Sample interval: every {} iterations\n", SAMPLE_INTERVAL);
63
64    // Define a workflow with memory-intensive operations
65    let workflow_json = r#"
66    {
67        "id": "memory_test_workflow",
68        "name": "Memory Test Workflow",
69        "description": "Workflow for memory leak detection",
70        "priority": 1,
71        "tasks": [
72            {
73                "id": "transform_data",
74                "name": "Transform Data",
75                "description": "Map and transform data fields",
76                "function": {
77                    "name": "map",
78                    "input": {
79                        "mappings": [
80                            {
81                                "path": "data.processed.id", 
82                                "logic": { "var": "temp_data.id" }
83                            },
84                            {
85                                "path": "data.processed.name", 
86                                "logic": { "var": "temp_data.name" }
87                            },
88                            {
89                                "path": "data.processed.email", 
90                                "logic": { "var": "temp_data.email" }
91                            },
92                            {
93                                "path": "data.processed.large_text",
94                                "logic": { "var": "temp_data.large_text_field" }
95                            },
96                            {
97                                "path": "data.processed.description",
98                                "logic": { "var": "temp_data.description" }
99                            },
100                            {
101                                "path": "data.processed.computed",
102                                "logic": { 
103                                    "cat": [
104                                        { "var": "temp_data.name" },
105                                        " - ",
106                                        { "var": "temp_data.department" },
107                                        " - ",
108                                        { "var": "temp_data.id" },
109                                        " - ",
110                                        { "var": "temp_data.description" }
111                                    ]
112                                }
113                            },
114                            {
115                                "path": "data.processed.tags",
116                                "logic": {
117                                    "map": [
118                                        { "var": "temp_data.tags" },
119                                        {
120                                            "cat": [
121                                                "processed_tag_",
122                                                { "var": "" },
123                                                "_with_suffix"
124                                            ]
125                                        }
126                                    ]
127                                }
128                            },
129                            {
130                                "path": "data.processed.nested_objects",
131                                "logic": { "var": "temp_data.nested_objects" }
132                            },
133                            {
134                                "path": "data.processed.binary_data",
135                                "logic": { "var": "temp_data.binary_data" }
136                            },
137                            {
138                                "path": "data.processed.floats",
139                                "logic": { "var": "temp_data.floats" }
140                            },
141                            {
142                                "path": "data.processed.additional_fields",
143                                "logic": { "var": "temp_data.additional_fields" }
144                            },
145                            {
146                                "path": "data.processed.metadata",
147                                "logic": { "var": "temp_data.metadata" }
148                            }
149                        ]
150                    }
151                }
152            },
153            {
154                "id": "validate_data",
155                "name": "Validate Data",
156                "description": "Validate transformed data",
157                "function": {
158                    "name": "validate",
159                    "input": {
160                        "rules": [
161                            {
162                                "path": "data",
163                                "logic": { "!!" : { "var": "data.processed.id" } },
164                                "message": "ID is required"
165                            },
166                            {
167                                "path": "data",
168                                "logic": { "!!" : { "var": "data.processed.email" } },
169                                "message": "Email is required"
170                            },
171                            {
172                                "path": "data",
173                                "logic": { "!!" : { "var": "data.processed.tags" } },
174                                "message": "Tags are required"
175                            },
176                            {
177                                "path": "data",
178                                "logic": { "!!" : { "var": "data.processed.nested_objects" } },
179                                "message": "Nested objects are required"
180                            }
181                        ]
182                    }
183                }
184            }
185        ]
186    }
187    "#;
188
189    // Parse the workflow
190    let workflow = Workflow::from_json(workflow_json)?;
191
192    // Create the engine
193    let mut engine = Engine::new(vec![workflow], None, None);
194
195    // Create large sample data to make memory leaks more visible
196    let large_text = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. ".repeat(100);
197    let large_array: Vec<String> = (0..1000)
198        .map(|i| format!("item_{}_with_some_longer_text_content", i))
199        .collect();
200    let nested_objects: Vec<serde_json::Value> = (0..100).map(|i| {
201        json!({
202            "id": i,
203            "name": format!("Object {}", i),
204            "description": format!("This is a description for object {} with some additional text", i),
205            "attributes": {
206                "color": format!("color_{}", i),
207                "size": format!("size_{}", i),
208                "weight": i * 100,
209                "metadata": {
210                    "created": "2024-01-01T00:00:00Z",
211                    "modified": "2024-01-01T00:00:00Z",
212                    "version": i
213                }
214            },
215            "tags": vec![format!("tag_{}", i), format!("category_{}", i % 10), format!("type_{}", i % 5)]
216        })
217    }).collect();
218
219    let sample_data = json!({
220        "id": 12345,
221        "name": "John Doe",
222        "email": "john.doe@example.com",
223        "age": 25,
224        "department": "Engineering",
225        "description": large_text.clone(),
226        "large_text_field": large_text,
227        "tags": large_array,
228        "nested_objects": nested_objects,
229        "metadata": {
230            "created": "2024-01-01T00:00:00Z",
231            "updated": "2024-01-01T00:00:00Z",
232            "complex_data": {
233                "level1": {
234                    "level2": {
235                        "level3": {
236                            "data": (0..100).map(|i| format!("nested_value_{}", i)).collect::<Vec<_>>()
237                        }
238                    }
239                }
240            }
241        },
242        "binary_data": (0..10000).map(|i| i % 256).collect::<Vec<u32>>(),
243        "floats": (0..1000).map(|i| i as f64 * 3.14159).collect::<Vec<f64>>(),
244        "additional_fields": (0..50).map(|i| {
245            (format!("field_{}", i), format!("value_{}_with_additional_content", i))
246        }).collect::<std::collections::HashMap<_, _>>()
247    });
248
249    // Memory tracking
250    let initial_memory = get_current_memory();
251    println!("Initial memory usage: {}", format_bytes(initial_memory));
252
253    // Warmup phase
254    println!("\n📊 WARMUP PHASE");
255    println!("──────────────────────────────────────────");
256    let warmup_start = Instant::now();
257    let warmup_initial_memory = get_current_memory();
258
259    for i in 0..WARMUP_ITERATIONS {
260        let mut message = Message::new(&json!({}));
261        message.temp_data = sample_data.clone();
262        message.metadata = json!({
263            "iteration": i,
264            "phase": "warmup"
265        });
266
267        let _ = engine.process_message(&mut message);
268
269        if (i + 1) % 1000 == 0 {
270            print!(".");
271            use std::io::Write;
272            std::io::stdout().flush()?;
273        }
274    }
275
276    let warmup_duration = warmup_start.elapsed();
277    let warmup_final_memory = get_current_memory();
278    let warmup_memory_growth = warmup_final_memory - warmup_initial_memory;
279
280    println!("\n");
281    println!(
282        "Warmup complete in: {:.2} seconds",
283        warmup_duration.as_secs_f64()
284    );
285    println!("Memory after warmup: {}", format_bytes(warmup_final_memory));
286    println!(
287        "Memory growth during warmup: {}",
288        format_bytes(warmup_memory_growth)
289    );
290
291    // Give time for any deferred deallocations
292    std::thread::sleep(std::time::Duration::from_millis(100));
293
294    // Test phase - looking for memory leaks
295    println!("\n📊 MEMORY LEAK TEST PHASE");
296    println!("──────────────────────────────────────────");
297    let test_start = Instant::now();
298    let test_initial_memory = get_current_memory();
299    let mut memory_samples: Vec<(usize, isize)> = Vec::new();
300    let mut max_memory = test_initial_memory;
301    let mut min_memory = test_initial_memory;
302
303    println!("Starting memory: {}", format_bytes(test_initial_memory));
304    println!("\nRunning iterations...");
305
306    for i in 0..TEST_ITERATIONS {
307        let mut message = Message::new(&json!({}));
308        message.temp_data = sample_data.clone();
309        message.metadata = json!({
310            "iteration": i,
311            "phase": "test",
312            "timestamp": chrono::Utc::now().to_rfc3339()
313        });
314
315        match engine.process_message(&mut message) {
316            Ok(_) => {}
317            Err(e) => {
318                eprintln!("Error at iteration {}: {:?}", i, e);
319            }
320        }
321
322        // Sample memory usage periodically
323        if (i + 1) % SAMPLE_INTERVAL == 0 {
324            let current_memory = get_current_memory();
325            memory_samples.push((i + 1, current_memory));
326            max_memory = max_memory.max(current_memory);
327            min_memory = min_memory.min(current_memory);
328
329            let memory_diff = current_memory - test_initial_memory;
330            println!(
331                "Iteration {:6}: Memory = {} (Δ = {})",
332                i + 1,
333                format_bytes(current_memory),
334                format_bytes(memory_diff)
335            );
336        }
337    }
338
339    let test_duration = test_start.elapsed();
340    let test_final_memory = get_current_memory();
341    let test_memory_growth = test_final_memory - test_initial_memory;
342
343    println!("\n========================================");
344    println!("📈 ANALYSIS RESULTS");
345    println!("========================================\n");
346
347    println!("Test Duration: {:.2} seconds", test_duration.as_secs_f64());
348    println!(
349        "Iterations/second: {:.0}",
350        TEST_ITERATIONS as f64 / test_duration.as_secs_f64()
351    );
352    println!();
353
354    println!("MEMORY STATISTICS:");
355    println!("──────────────────────────────────────────");
356    println!(
357        "Initial memory (after warmup): {}",
358        format_bytes(test_initial_memory)
359    );
360    println!("Final memory: {}", format_bytes(test_final_memory));
361    println!("Memory growth: {}", format_bytes(test_memory_growth));
362    println!("Peak memory: {}", format_bytes(max_memory));
363    println!("Minimum memory: {}", format_bytes(min_memory));
364    println!("Memory range: {}", format_bytes(max_memory - min_memory));
365    println!();
366
367    // Calculate memory growth rate
368    let growth_per_iteration = test_memory_growth as f64 / TEST_ITERATIONS as f64;
369    println!(
370        "Average growth per iteration: {:.2} bytes",
371        growth_per_iteration
372    );
373
374    // Analyze trend
375    if memory_samples.len() >= 2 {
376        let first_half_avg = memory_samples[..memory_samples.len() / 2]
377            .iter()
378            .map(|(_, m)| *m)
379            .sum::<isize>() as f64
380            / (memory_samples.len() / 2) as f64;
381
382        let second_half_avg = memory_samples[memory_samples.len() / 2..]
383            .iter()
384            .map(|(_, m)| *m)
385            .sum::<isize>() as f64
386            / (memory_samples.len() - memory_samples.len() / 2) as f64;
387
388        let trend = second_half_avg - first_half_avg;
389
390        println!("\nMEMORY TREND ANALYSIS:");
391        println!("──────────────────────────────────────────");
392        println!(
393            "First half average: {}",
394            format_bytes(first_half_avg as isize)
395        );
396        println!(
397            "Second half average: {}",
398            format_bytes(second_half_avg as isize)
399        );
400        println!("Trend: {}", format_bytes(trend as isize));
401
402        // Verdict
403        println!("\n🔍 VERDICT:");
404        println!("──────────────────────────────────────────");
405
406        let threshold_bytes = 1024 * 100; // 100 KB threshold
407        if test_memory_growth.abs() < threshold_bytes {
408            println!("✅ PASS: Memory usage is stable (growth < 100 KB)");
409            println!("   No significant memory leak detected.");
410        } else if growth_per_iteration < 10.0 {
411            println!("⚠️  WARNING: Small memory growth detected");
412            println!(
413                "   Growth rate: {:.2} bytes/iteration",
414                growth_per_iteration
415            );
416            println!("   This may be acceptable for your use case.");
417        } else {
418            println!("❌ FAIL: Potential memory leak detected!");
419            println!(
420                "   Growth rate: {:.2} bytes/iteration",
421                growth_per_iteration
422            );
423            println!("   Total growth: {}", format_bytes(test_memory_growth));
424            println!("   This indicates a possible memory leak that should be investigated.");
425        }
426
427        if trend.abs() as isize > threshold_bytes {
428            println!("\n⚠️  Increasing memory trend detected between first and second half!");
429        }
430    }
431
432    println!("\n========================================");
433    println!("Analysis complete!");
434    println!("========================================");
435
436    Ok(())
437}
Source

pub fn retry_config(&self) -> &RetryConfig

Get the configured retry configuration

Source

pub fn workflows(&self) -> &HashMap<String, Workflow>

Get the configured workflows

Source

pub fn task_functions( &self, ) -> &HashMap<String, Box<dyn FunctionHandler + Send + Sync>>

Get the registered task functions

Source

pub fn has_function(&self, name: &str) -> bool

Check if a function with the given name is registered

Source

pub fn process_message(&mut self, message: &mut Message) -> Result<()>

Processes a message through workflows that match their conditions.

This method:

  1. Iterates through workflows sequentially in deterministic order (sorted by ID)
  2. Evaluates conditions for each workflow right before execution
  3. Executes matching workflows one after another (not concurrently)
  4. Updates the message with processing results and audit trail
  5. Clears the evaluation arena after processing to prevent memory leaks

Workflows are executed sequentially because later workflows may depend on the results of earlier workflows, and their conditions may change based on modifications made by previous workflows.

§Arguments
  • message - The message to process
§Returns
  • Result<()> - Success or an error if processing failed
Examples found in repository?
examples/complete_workflow.rs (line 139)
4fn main() -> Result<(), Box<dyn std::error::Error>> {
5    // Define a workflow that:
6    // 1. Prepares sample user data
7    // 2. Enriches the message with transformed data
8    // 3. Validates the enriched data
9    let workflow_json = r#"
10    {
11        "id": "complete_workflow",
12        "name": "Complete Workflow Example",
13        "priority": 0,
14        "description": "Demonstrates enrich -> validate flow",
15        "tasks": [
16            {
17                "id": "initialize_user",
18                "name": "Initialize User Structure",
19                "description": "Create empty user object in data",
20                "function": {
21                    "name": "map",
22                    "input": {
23                        "mappings": [
24                            {
25                                "path": "data.user",
26                                "logic": {}
27                            }
28                        ]
29                    }
30                }
31            },
32            {
33                "id": "transform_data",
34                "name": "Transform Data",
35                "description": "Map API response to our data model",
36                "function": {
37                    "name": "map",
38                    "input": {
39                        "mappings": [
40                            {
41                                "path": "data.user.id", 
42                                "logic": { "var": "temp_data.body.id" }
43                            },
44                            {
45                                "path": "data.user.name", 
46                                "logic": { "var": "temp_data.body.name" }
47                            },
48                            {
49                                "path": "data.user.email", 
50                                "logic": { "var": "temp_data.body.email" }
51                            },
52                            {
53                                "path": "data.user.address", 
54                                "logic": {
55                                    "cat": [
56                                        { "var": "temp_data.body.address.street" },
57                                        ", ",
58                                        { "var": "temp_data.body.address.city" }
59                                    ]
60                                }
61                            },
62                            {
63                                "path": "data.user.company", 
64                                "logic": { "var": "temp_data.body.company.name" }
65                            }
66                        ]
67                    }
68                }
69            },
70            {
71                "id": "validate_user_data",
72                "name": "Validate User Data",
73                "description": "Ensure the user data meets our requirements",
74                "function": {
75                    "name": "validate",
76                    "input": {
77                        "rules": [
78                            {
79                                "path": "data",
80                                "logic": { "!!": { "var": "data.user.id" } },
81                                "message": "User ID is required"
82                            },
83                            {
84                                "path": "data",
85                                "logic": { "!!": { "var": "data.user.name" } },
86                                "message": "User name is required"
87                            },
88                            {
89                                "path": "data",
90                                "logic": { "!!": { "var": "data.user.email" } },
91                                "message": "User email is required"
92                            },
93                            {
94                                "path": "data",
95                                "logic": {
96                                    "in": [
97                                        "@",
98                                        { "var": "data.user.email" }
99                                    ]
100                                },
101                                "message": "Email must be valid format"
102                            }
103                        ]
104                    }
105                }
106            }
107        ]
108    }
109    "#;
110
111    // Parse the workflow
112    let workflow = Workflow::from_json(workflow_json)?;
113
114    // Create the workflow engine with the workflow (built-in functions are auto-registered by default)
115    let mut engine = Engine::new(vec![workflow], None, None);
116
117    // Create a message to process with sample user data
118    let mut message = Message::new(&json!({}));
119
120    // Add sample user data to temp_data (simulating what would come from an API)
121    message.temp_data = json!({
122        "body": {
123            "id": 1,
124            "name": "John Doe",
125            "email": "john.doe@example.com",
126            "address": {
127                "street": "123 Main St",
128                "city": "New York"
129            },
130            "company": {
131                "name": "Acme Corp"
132            }
133        }
134    });
135
136    // Process the message through the workflow
137    println!("Processing message through workflow...");
138
139    match engine.process_message(&mut message) {
140        Ok(_) => {
141            println!("Workflow completed successfully!");
142        }
143        Err(e) => {
144            eprintln!("Error executing workflow: {e:?}");
145            if !message.errors.is_empty() {
146                println!("\nErrors recorded in message:");
147                for err in &message.errors {
148                    println!(
149                        "- Workflow: {:?}, Task: {:?}, Error: {:?}",
150                        err.workflow_id, err.task_id, err.error_message
151                    );
152                }
153            }
154        }
155    }
156
157    println!(
158        "\nFull message structure:\n{}",
159        serde_json::to_string_pretty(&message)?
160    );
161
162    Ok(())
163}
More examples
Hide additional examples
examples/benchmark.rs (line 108)
7fn main() -> Result<(), Box<dyn std::error::Error>> {
8    println!("========================================");
9    println!("DATAFLOW ENGINE BENCHMARK");
10    println!("========================================\n");
11    println!(
12        "Running {} iterations on single-threaded engine\n",
13        ITERATIONS
14    );
15
16    // Define a simple workflow with data transformation
17    let workflow_json = r#"
18    {
19        "id": "benchmark_workflow",
20        "name": "Benchmark Workflow",
21        "description": "Simple workflow for performance testing",
22        "priority": 1,
23        "tasks": [
24            {
25                "id": "transform_data",
26                "name": "Transform Data",
27                "description": "Map data fields",
28                "function": {
29                    "name": "map",
30                    "input": {
31                        "mappings": [
32                            {
33                                "path": "data.user.id", 
34                                "logic": { "var": "temp_data.id" }
35                            },
36                            {
37                                "path": "data.user.name", 
38                                "logic": { "var": "temp_data.name" }
39                            },
40                            {
41                                "path": "data.user.email", 
42                                "logic": { "var": "temp_data.email" }
43                            },
44                            {
45                                "path": "data.user.age",
46                                "logic": { "+": [{ "var": "temp_data.age" }, 1] }
47                            },
48                            {
49                                "path": "data.user.status",
50                                "logic": { 
51                                    "if": [
52                                        { ">": [{ "var": "temp_data.age" }, 18] },
53                                        "adult",
54                                        "minor"
55                                    ]
56                                }
57                            }
58                        ]
59                    }
60                }
61            },
62            {
63                "id": "validate_data",
64                "name": "Validate Data",
65                "description": "Validate transformed data",
66                "function": {
67                    "name": "validate",
68                    "input": {
69                        "rules": [
70                            {
71                                "path": "data",
72                                "logic": { "!!": { "var": "data.user.id" } },
73                                "message": "User ID is required"
74                            },
75                            {
76                                "path": "data",
77                                "logic": { "!!": { "var": "data.user.email" } },
78                                "message": "User email is required"
79                            }
80                        ]
81                    }
82                }
83            }
84        ]
85    }
86    "#;
87
88    // Parse the workflow
89    let workflow = Workflow::from_json(workflow_json)?;
90
91    // Create the engine with built-in functions
92    let mut engine = Engine::new(vec![workflow], None, None);
93
94    // Sample data for benchmarking
95    let sample_data = json!({
96        "id": 12345,
97        "name": "John Doe",
98        "email": "john.doe@example.com",
99        "age": 25,
100        "department": "Engineering"
101    });
102
103    // Warm-up run
104    println!("Warming up...");
105    for _ in 0..1000 {
106        let mut message = Message::new(&json!({}));
107        message.temp_data = sample_data.clone();
108        let _ = engine.process_message(&mut message);
109    }
110
111    // Benchmark run
112    println!("Starting benchmark...\n");
113
114    let mut all_durations = Vec::with_capacity(ITERATIONS);
115    let mut success_count = 0;
116    let mut error_count = 0;
117
118    let benchmark_start = Instant::now();
119
120    for i in 0..ITERATIONS {
121        let mut message = Message::new(&json!({}));
122        message.temp_data = sample_data.clone();
123        message.metadata = json!({
124            "iteration": i,
125            "timestamp": chrono::Utc::now().to_rfc3339()
126        });
127
128        let iteration_start = Instant::now();
129        match engine.process_message(&mut message) {
130            Ok(_) => {
131                success_count += 1;
132                if message.has_errors() {
133                    error_count += 1;
134                }
135            }
136            Err(_) => {
137                error_count += 1;
138            }
139        }
140        let iteration_duration = iteration_start.elapsed();
141        all_durations.push(iteration_duration);
142
143        // Progress indicator every 10k iterations
144        if (i + 1) % 10000 == 0 {
145            print!(".");
146            use std::io::Write;
147            std::io::stdout().flush()?;
148        }
149    }
150
151    let total_time = benchmark_start.elapsed();
152    println!("\n\nBenchmark Complete!");
153    println!("==========================================\n");
154
155    // Calculate statistics
156    all_durations.sort_unstable();
157    let p50 = all_durations[ITERATIONS * 50 / 100];
158    let p90 = all_durations[ITERATIONS * 90 / 100];
159    let p95 = all_durations[ITERATIONS * 95 / 100];
160    let p99 = all_durations[ITERATIONS * 99 / 100];
161    let throughput = ITERATIONS as f64 / total_time.as_secs_f64();
162
163    // Display results
164    println!("📊 PERFORMANCE METRICS");
165    println!("──────────────────────────────────────────");
166    println!("Total iterations:    {:>10}", ITERATIONS);
167    println!("Successful:          {:>10}", success_count);
168    println!("Errors:              {:>10}", error_count);
169    println!(
170        "Total time:          {:>10.3} seconds",
171        total_time.as_secs_f64()
172    );
173    println!();
174
175    println!("Messages/second:     {:>10.0}", throughput);
176    println!();
177
178    println!("📉 LATENCY PERCENTILES");
179    println!("──────────────────────────────────────────");
180    println!("P50:                 {:>10.3} μs", p50.as_micros() as f64);
181    println!("P90:                 {:>10.3} μs", p90.as_micros() as f64);
182    println!("P95:                 {:>10.3} μs", p95.as_micros() as f64);
183    println!("P99:                 {:>10.3} μs", p99.as_micros() as f64);
184    println!();
185
186    Ok(())
187}
examples/custom_function.rs (line 498)
334fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
335    println!("=== Custom Function Example ===\n");
336
337    // Define a workflow that uses our custom functions
338    let workflow_json = r#"
339    {
340        "id": "custom_function_demo",
341        "name": "Custom Function Demo",
342        "description": "Demonstrates custom functions in workflow",
343        "tasks": [
344            {
345                "id": "prepare_data",
346                "name": "Prepare Data",
347                "description": "Extract and prepare data for analysis",
348                "function": {
349                    "name": "map",
350                    "input": {
351                        "mappings": [
352                            {
353                                "path": "data.numbers",
354                                "logic": { "var": "temp_data.measurements" }
355                            },
356                            {
357                                "path": "data.user_id",
358                                "logic": { "var": "temp_data.user_id" }
359                            }
360                        ]
361                    }
362                }
363            },
364            {
365                "id": "calculate_stats",
366                "name": "Calculate Statistics",
367                "description": "Calculate statistical measures from numeric data",
368                "function": {
369                    "name": "statistics",
370                    "input": {
371                        "data_path": "data.numbers",
372                        "output_path": "data.stats"
373                    }
374                }
375            },
376            {
377                "id": "enrich_user_data",
378                "name": "Enrich User Data",
379                "description": "Add additional user information",
380                "function": {
381                    "name": "enrich_data",
382                    "input": {
383                        "lookup_field": "user_id",
384                        "lookup_value": "user_123",
385                        "output_path": "data.user_info"
386                    }
387                }
388            }
389        ]
390    }
391    "#;
392
393    // Parse the first workflow
394    let workflow = Workflow::from_json(workflow_json)?;
395
396    // Demonstrate another example with different data
397    let separator = "=".repeat(50);
398    println!("\n{separator}");
399    println!("=== Second Example with Different User ===\n");
400
401    let mut message2 = dataflow_rs::engine::message::Message::new(&json!({}));
402    message2.temp_data = json!({
403        "measurements": [5.1, 7.3, 9.8, 6.2, 8.5],
404        "user_id": "user_456",
405        "timestamp": "2024-01-15T11:00:00Z"
406    });
407    message2.data = json!({});
408
409    // Create a workflow for the second user
410    let workflow2_json = r#"
411    {
412        "id": "custom_function_demo_2",
413        "name": "Custom Function Demo 2",
414        "description": "Second demo with different user",
415        "tasks": [
416            {
417                "id": "prepare_data",
418                "name": "Prepare Data",
419                "function": {
420                    "name": "map",
421                    "input": {
422                        "mappings": [
423                            {
424                                "path": "data.numbers",
425                                "logic": { "var": "temp_data.measurements" }
426                            },
427                            {
428                                "path": "data.user_id",
429                                "logic": { "var": "temp_data.user_id" }
430                            }
431                        ]
432                    }
433                }
434            },
435            {
436                "id": "calculate_stats",
437                "name": "Calculate Statistics",
438                "function": {
439                    "name": "statistics",
440                    "input": {
441                        "data_path": "data.numbers",
442                        "output_path": "data.analysis"
443                    }
444                }
445            },
446            {
447                "id": "enrich_user_data",
448                "name": "Enrich User Data",
449                "function": {
450                    "name": "enrich_data",
451                    "input": {
452                        "lookup_field": "user_id",
453                        "lookup_value": "user_456",
454                        "output_path": "data.employee_details"
455                    }
456                }
457            }
458        ]
459    }
460    "#;
461
462    let workflow2 = Workflow::from_json(workflow2_json)?;
463
464    // Prepare custom functions
465    let mut custom_functions = HashMap::new();
466    custom_functions.insert(
467        "statistics".to_string(),
468        Box::new(StatisticsFunction::new()) as Box<dyn FunctionHandler + Send + Sync>,
469    );
470    custom_functions.insert(
471        "enrich_data".to_string(),
472        Box::new(DataEnrichmentFunction::new()) as Box<dyn FunctionHandler + Send + Sync>,
473    );
474    // Note: map and validate are now built-in to the Engine and will be used automatically
475
476    // Create engine with custom functions and built-ins (map/validate are always included internally)
477    let mut engine = Engine::new(
478        vec![workflow, workflow2],
479        Some(custom_functions),
480        None, // Use default (includes built-ins)
481    );
482
483    // Create sample data for first message
484    let sample_data = json!({
485        "measurements": [10.5, 15.2, 8.7, 22.1, 18.9, 12.3, 25.6, 14.8, 19.4, 16.7],
486        "user_id": "user_123",
487        "timestamp": "2024-01-15T10:30:00Z"
488    });
489
490    // Create and process first message
491    let mut message = dataflow_rs::engine::message::Message::new(&json!({}));
492    message.temp_data = sample_data;
493    message.data = json!({});
494
495    println!("Processing message with custom functions...\n");
496
497    // Process the message through our custom workflow
498    match engine.process_message(&mut message) {
499        Ok(_) => {
500            println!("✅ Message processed successfully!\n");
501
502            println!("📊 Final Results:");
503            println!("{}\n", serde_json::to_string_pretty(&message.data)?);
504
505            println!("📋 Audit Trail:");
506            for (i, audit) in message.audit_trail.iter().enumerate() {
507                println!(
508                    "{}. Task: {} (Status: {})",
509                    i + 1,
510                    audit.task_id,
511                    audit.status_code
512                );
513                println!("   Timestamp: {}", audit.timestamp);
514                println!("   Changes: {} field(s) modified", audit.changes.len());
515            }
516
517            if message.has_errors() {
518                println!("\n⚠️  Errors encountered:");
519                for error in &message.errors {
520                    println!(
521                        "   - {}: {:?}",
522                        error.task_id.as_ref().unwrap_or(&"unknown".to_string()),
523                        error.error_message
524                    );
525                }
526            }
527        }
528        Err(e) => {
529            println!("❌ Error processing message: {e:?}");
530        }
531    }
532
533    // Process second message
534    match engine.process_message(&mut message2) {
535        Ok(_) => {
536            println!("✅ Second message processed successfully!\n");
537            println!("📊 Results for user_456:");
538            println!("{}", serde_json::to_string_pretty(&message2.data)?);
539        }
540        Err(e) => {
541            println!("❌ Error processing second message: {e:?}");
542        }
543    }
544
545    println!("\n🎉 Custom function examples completed!");
546
547    Ok(())
548}
examples/memory_leak_analysis.rs (line 267)
56fn main() -> Result<(), Box<dyn std::error::Error>> {
57    println!("========================================");
58    println!("MEMORY LEAK ANALYSIS");
59    println!("========================================\n");
60    println!("Warmup iterations: {}", WARMUP_ITERATIONS);
61    println!("Test iterations: {}", TEST_ITERATIONS);
62    println!("Sample interval: every {} iterations\n", SAMPLE_INTERVAL);
63
64    // Define a workflow with memory-intensive operations
65    let workflow_json = r#"
66    {
67        "id": "memory_test_workflow",
68        "name": "Memory Test Workflow",
69        "description": "Workflow for memory leak detection",
70        "priority": 1,
71        "tasks": [
72            {
73                "id": "transform_data",
74                "name": "Transform Data",
75                "description": "Map and transform data fields",
76                "function": {
77                    "name": "map",
78                    "input": {
79                        "mappings": [
80                            {
81                                "path": "data.processed.id", 
82                                "logic": { "var": "temp_data.id" }
83                            },
84                            {
85                                "path": "data.processed.name", 
86                                "logic": { "var": "temp_data.name" }
87                            },
88                            {
89                                "path": "data.processed.email", 
90                                "logic": { "var": "temp_data.email" }
91                            },
92                            {
93                                "path": "data.processed.large_text",
94                                "logic": { "var": "temp_data.large_text_field" }
95                            },
96                            {
97                                "path": "data.processed.description",
98                                "logic": { "var": "temp_data.description" }
99                            },
100                            {
101                                "path": "data.processed.computed",
102                                "logic": { 
103                                    "cat": [
104                                        { "var": "temp_data.name" },
105                                        " - ",
106                                        { "var": "temp_data.department" },
107                                        " - ",
108                                        { "var": "temp_data.id" },
109                                        " - ",
110                                        { "var": "temp_data.description" }
111                                    ]
112                                }
113                            },
114                            {
115                                "path": "data.processed.tags",
116                                "logic": {
117                                    "map": [
118                                        { "var": "temp_data.tags" },
119                                        {
120                                            "cat": [
121                                                "processed_tag_",
122                                                { "var": "" },
123                                                "_with_suffix"
124                                            ]
125                                        }
126                                    ]
127                                }
128                            },
129                            {
130                                "path": "data.processed.nested_objects",
131                                "logic": { "var": "temp_data.nested_objects" }
132                            },
133                            {
134                                "path": "data.processed.binary_data",
135                                "logic": { "var": "temp_data.binary_data" }
136                            },
137                            {
138                                "path": "data.processed.floats",
139                                "logic": { "var": "temp_data.floats" }
140                            },
141                            {
142                                "path": "data.processed.additional_fields",
143                                "logic": { "var": "temp_data.additional_fields" }
144                            },
145                            {
146                                "path": "data.processed.metadata",
147                                "logic": { "var": "temp_data.metadata" }
148                            }
149                        ]
150                    }
151                }
152            },
153            {
154                "id": "validate_data",
155                "name": "Validate Data",
156                "description": "Validate transformed data",
157                "function": {
158                    "name": "validate",
159                    "input": {
160                        "rules": [
161                            {
162                                "path": "data",
163                                "logic": { "!!" : { "var": "data.processed.id" } },
164                                "message": "ID is required"
165                            },
166                            {
167                                "path": "data",
168                                "logic": { "!!" : { "var": "data.processed.email" } },
169                                "message": "Email is required"
170                            },
171                            {
172                                "path": "data",
173                                "logic": { "!!" : { "var": "data.processed.tags" } },
174                                "message": "Tags are required"
175                            },
176                            {
177                                "path": "data",
178                                "logic": { "!!" : { "var": "data.processed.nested_objects" } },
179                                "message": "Nested objects are required"
180                            }
181                        ]
182                    }
183                }
184            }
185        ]
186    }
187    "#;
188
189    // Parse the workflow
190    let workflow = Workflow::from_json(workflow_json)?;
191
192    // Create the engine
193    let mut engine = Engine::new(vec![workflow], None, None);
194
195    // Create large sample data to make memory leaks more visible
196    let large_text = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. ".repeat(100);
197    let large_array: Vec<String> = (0..1000)
198        .map(|i| format!("item_{}_with_some_longer_text_content", i))
199        .collect();
200    let nested_objects: Vec<serde_json::Value> = (0..100).map(|i| {
201        json!({
202            "id": i,
203            "name": format!("Object {}", i),
204            "description": format!("This is a description for object {} with some additional text", i),
205            "attributes": {
206                "color": format!("color_{}", i),
207                "size": format!("size_{}", i),
208                "weight": i * 100,
209                "metadata": {
210                    "created": "2024-01-01T00:00:00Z",
211                    "modified": "2024-01-01T00:00:00Z",
212                    "version": i
213                }
214            },
215            "tags": vec![format!("tag_{}", i), format!("category_{}", i % 10), format!("type_{}", i % 5)]
216        })
217    }).collect();
218
219    let sample_data = json!({
220        "id": 12345,
221        "name": "John Doe",
222        "email": "john.doe@example.com",
223        "age": 25,
224        "department": "Engineering",
225        "description": large_text.clone(),
226        "large_text_field": large_text,
227        "tags": large_array,
228        "nested_objects": nested_objects,
229        "metadata": {
230            "created": "2024-01-01T00:00:00Z",
231            "updated": "2024-01-01T00:00:00Z",
232            "complex_data": {
233                "level1": {
234                    "level2": {
235                        "level3": {
236                            "data": (0..100).map(|i| format!("nested_value_{}", i)).collect::<Vec<_>>()
237                        }
238                    }
239                }
240            }
241        },
242        "binary_data": (0..10000).map(|i| i % 256).collect::<Vec<u32>>(),
243        "floats": (0..1000).map(|i| i as f64 * 3.14159).collect::<Vec<f64>>(),
244        "additional_fields": (0..50).map(|i| {
245            (format!("field_{}", i), format!("value_{}_with_additional_content", i))
246        }).collect::<std::collections::HashMap<_, _>>()
247    });
248
249    // Memory tracking
250    let initial_memory = get_current_memory();
251    println!("Initial memory usage: {}", format_bytes(initial_memory));
252
253    // Warmup phase
254    println!("\n📊 WARMUP PHASE");
255    println!("──────────────────────────────────────────");
256    let warmup_start = Instant::now();
257    let warmup_initial_memory = get_current_memory();
258
259    for i in 0..WARMUP_ITERATIONS {
260        let mut message = Message::new(&json!({}));
261        message.temp_data = sample_data.clone();
262        message.metadata = json!({
263            "iteration": i,
264            "phase": "warmup"
265        });
266
267        let _ = engine.process_message(&mut message);
268
269        if (i + 1) % 1000 == 0 {
270            print!(".");
271            use std::io::Write;
272            std::io::stdout().flush()?;
273        }
274    }
275
276    let warmup_duration = warmup_start.elapsed();
277    let warmup_final_memory = get_current_memory();
278    let warmup_memory_growth = warmup_final_memory - warmup_initial_memory;
279
280    println!("\n");
281    println!(
282        "Warmup complete in: {:.2} seconds",
283        warmup_duration.as_secs_f64()
284    );
285    println!("Memory after warmup: {}", format_bytes(warmup_final_memory));
286    println!(
287        "Memory growth during warmup: {}",
288        format_bytes(warmup_memory_growth)
289    );
290
291    // Give time for any deferred deallocations
292    std::thread::sleep(std::time::Duration::from_millis(100));
293
294    // Test phase - looking for memory leaks
295    println!("\n📊 MEMORY LEAK TEST PHASE");
296    println!("──────────────────────────────────────────");
297    let test_start = Instant::now();
298    let test_initial_memory = get_current_memory();
299    let mut memory_samples: Vec<(usize, isize)> = Vec::new();
300    let mut max_memory = test_initial_memory;
301    let mut min_memory = test_initial_memory;
302
303    println!("Starting memory: {}", format_bytes(test_initial_memory));
304    println!("\nRunning iterations...");
305
306    for i in 0..TEST_ITERATIONS {
307        let mut message = Message::new(&json!({}));
308        message.temp_data = sample_data.clone();
309        message.metadata = json!({
310            "iteration": i,
311            "phase": "test",
312            "timestamp": chrono::Utc::now().to_rfc3339()
313        });
314
315        match engine.process_message(&mut message) {
316            Ok(_) => {}
317            Err(e) => {
318                eprintln!("Error at iteration {}: {:?}", i, e);
319            }
320        }
321
322        // Sample memory usage periodically
323        if (i + 1) % SAMPLE_INTERVAL == 0 {
324            let current_memory = get_current_memory();
325            memory_samples.push((i + 1, current_memory));
326            max_memory = max_memory.max(current_memory);
327            min_memory = min_memory.min(current_memory);
328
329            let memory_diff = current_memory - test_initial_memory;
330            println!(
331                "Iteration {:6}: Memory = {} (Δ = {})",
332                i + 1,
333                format_bytes(current_memory),
334                format_bytes(memory_diff)
335            );
336        }
337    }
338
339    let test_duration = test_start.elapsed();
340    let test_final_memory = get_current_memory();
341    let test_memory_growth = test_final_memory - test_initial_memory;
342
343    println!("\n========================================");
344    println!("📈 ANALYSIS RESULTS");
345    println!("========================================\n");
346
347    println!("Test Duration: {:.2} seconds", test_duration.as_secs_f64());
348    println!(
349        "Iterations/second: {:.0}",
350        TEST_ITERATIONS as f64 / test_duration.as_secs_f64()
351    );
352    println!();
353
354    println!("MEMORY STATISTICS:");
355    println!("──────────────────────────────────────────");
356    println!(
357        "Initial memory (after warmup): {}",
358        format_bytes(test_initial_memory)
359    );
360    println!("Final memory: {}", format_bytes(test_final_memory));
361    println!("Memory growth: {}", format_bytes(test_memory_growth));
362    println!("Peak memory: {}", format_bytes(max_memory));
363    println!("Minimum memory: {}", format_bytes(min_memory));
364    println!("Memory range: {}", format_bytes(max_memory - min_memory));
365    println!();
366
367    // Calculate memory growth rate
368    let growth_per_iteration = test_memory_growth as f64 / TEST_ITERATIONS as f64;
369    println!(
370        "Average growth per iteration: {:.2} bytes",
371        growth_per_iteration
372    );
373
374    // Analyze trend
375    if memory_samples.len() >= 2 {
376        let first_half_avg = memory_samples[..memory_samples.len() / 2]
377            .iter()
378            .map(|(_, m)| *m)
379            .sum::<isize>() as f64
380            / (memory_samples.len() / 2) as f64;
381
382        let second_half_avg = memory_samples[memory_samples.len() / 2..]
383            .iter()
384            .map(|(_, m)| *m)
385            .sum::<isize>() as f64
386            / (memory_samples.len() - memory_samples.len() / 2) as f64;
387
388        let trend = second_half_avg - first_half_avg;
389
390        println!("\nMEMORY TREND ANALYSIS:");
391        println!("──────────────────────────────────────────");
392        println!(
393            "First half average: {}",
394            format_bytes(first_half_avg as isize)
395        );
396        println!(
397            "Second half average: {}",
398            format_bytes(second_half_avg as isize)
399        );
400        println!("Trend: {}", format_bytes(trend as isize));
401
402        // Verdict
403        println!("\n🔍 VERDICT:");
404        println!("──────────────────────────────────────────");
405
406        let threshold_bytes = 1024 * 100; // 100 KB threshold
407        if test_memory_growth.abs() < threshold_bytes {
408            println!("✅ PASS: Memory usage is stable (growth < 100 KB)");
409            println!("   No significant memory leak detected.");
410        } else if growth_per_iteration < 10.0 {
411            println!("⚠️  WARNING: Small memory growth detected");
412            println!(
413                "   Growth rate: {:.2} bytes/iteration",
414                growth_per_iteration
415            );
416            println!("   This may be acceptable for your use case.");
417        } else {
418            println!("❌ FAIL: Potential memory leak detected!");
419            println!(
420                "   Growth rate: {:.2} bytes/iteration",
421                growth_per_iteration
422            );
423            println!("   Total growth: {}", format_bytes(test_memory_growth));
424            println!("   This indicates a possible memory leak that should be investigated.");
425        }
426
427        if trend.abs() as isize > threshold_bytes {
428            println!("\n⚠️  Increasing memory trend detected between first and second half!");
429        }
430    }
431
432    println!("\n========================================");
433    println!("Analysis complete!");
434    println!("========================================");
435
436    Ok(())
437}
Source

pub fn evaluate_logic( &self, logic_index: Option<usize>, logic: &Value, data: &Value, ) -> Result<Value>

Evaluate logic using compiled index or direct evaluation (public for testing)

Trait Implementations§

Source§

impl Default for Engine

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

§

impl !Freeze for Engine

§

impl !RefUnwindSafe for Engine

§

impl !Send for Engine

§

impl !Sync for Engine

§

impl Unpin for Engine

§

impl !UnwindSafe for Engine

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.