benchmark/
benchmark.rs

1use dataflow_rs::{Engine, Workflow, engine::message::Message};
2use serde_json::json;
3use std::time::Instant;
4
5const ITERATIONS: usize = 1_000_000;
6
7fn main() -> Result<(), Box<dyn std::error::Error>> {
8    println!("========================================");
9    println!("DATAFLOW ENGINE BENCHMARK");
10    println!("========================================\n");
11    println!(
12        "Running {} iterations on single-threaded engine\n",
13        ITERATIONS
14    );
15
16    // Define a simple workflow with data transformation
17    let workflow_json = r#"
18    {
19        "id": "benchmark_workflow",
20        "name": "Benchmark Workflow",
21        "description": "Simple workflow for performance testing",
22        "priority": 1,
23        "tasks": [
24            {
25                "id": "transform_data",
26                "name": "Transform Data",
27                "description": "Map data fields",
28                "function": {
29                    "name": "map",
30                    "input": {
31                        "mappings": [
32                            {
33                                "path": "data.user.id", 
34                                "logic": { "var": "temp_data.id" }
35                            },
36                            {
37                                "path": "data.user.name", 
38                                "logic": { "var": "temp_data.name" }
39                            },
40                            {
41                                "path": "data.user.email", 
42                                "logic": { "var": "temp_data.email" }
43                            },
44                            {
45                                "path": "data.user.age",
46                                "logic": { "+": [{ "var": "temp_data.age" }, 1] }
47                            },
48                            {
49                                "path": "data.user.status",
50                                "logic": { 
51                                    "if": [
52                                        { ">": [{ "var": "temp_data.age" }, 18] },
53                                        "adult",
54                                        "minor"
55                                    ]
56                                }
57                            }
58                        ]
59                    }
60                }
61            },
62            {
63                "id": "validate_data",
64                "name": "Validate Data",
65                "description": "Validate transformed data",
66                "function": {
67                    "name": "validate",
68                    "input": {
69                        "rules": [
70                            {
71                                "path": "data",
72                                "logic": { "!!": { "var": "data.user.id" } },
73                                "message": "User ID is required"
74                            },
75                            {
76                                "path": "data",
77                                "logic": { "!!": { "var": "data.user.email" } },
78                                "message": "User email is required"
79                            }
80                        ]
81                    }
82                }
83            }
84        ]
85    }
86    "#;
87
88    // Parse the workflow
89    let workflow = Workflow::from_json(workflow_json)?;
90
91    // Create the engine with built-in functions
92    let engine = Engine::new(vec![workflow], None, None);
93
94    // Sample data for benchmarking
95    let sample_data = json!({
96        "id": 12345,
97        "name": "John Doe",
98        "email": "john.doe@example.com",
99        "age": 25,
100        "department": "Engineering"
101    });
102
103    // Warm-up run
104    println!("Warming up...");
105    for _ in 0..1000 {
106        let mut message = Message::new(&json!({}));
107        message.temp_data = sample_data.clone();
108        let _ = engine.process_message(&mut message);
109    }
110
111    // Benchmark run
112    println!("Starting benchmark...\n");
113
114    let mut all_durations = Vec::with_capacity(ITERATIONS);
115    let mut success_count = 0;
116    let mut error_count = 0;
117
118    let benchmark_start = Instant::now();
119
120    for i in 0..ITERATIONS {
121        let mut message = Message::new(&json!({}));
122        message.temp_data = sample_data.clone();
123        message.metadata = json!({
124            "iteration": i,
125            "timestamp": chrono::Utc::now().to_rfc3339()
126        });
127
128        let iteration_start = Instant::now();
129        match engine.process_message(&mut message) {
130            Ok(_) => {
131                success_count += 1;
132                if message.has_errors() {
133                    error_count += 1;
134                }
135            }
136            Err(_) => {
137                error_count += 1;
138            }
139        }
140        let iteration_duration = iteration_start.elapsed();
141        all_durations.push(iteration_duration);
142
143        // Progress indicator every 10k iterations
144        if (i + 1) % 10000 == 0 {
145            print!(".");
146            use std::io::Write;
147            std::io::stdout().flush()?;
148        }
149    }
150
151    let total_time = benchmark_start.elapsed();
152    println!("\n\nBenchmark Complete!");
153    println!("==========================================\n");
154
155    // Calculate statistics
156    all_durations.sort_unstable();
157    let p50 = all_durations[ITERATIONS * 50 / 100];
158    let p90 = all_durations[ITERATIONS * 90 / 100];
159    let p95 = all_durations[ITERATIONS * 95 / 100];
160    let p99 = all_durations[ITERATIONS * 99 / 100];
161    let throughput = ITERATIONS as f64 / total_time.as_secs_f64();
162
163    // Display results
164    println!("📊 PERFORMANCE METRICS");
165    println!("──────────────────────────────────────────");
166    println!("Total iterations:    {:>10}", ITERATIONS);
167    println!("Successful:          {:>10}", success_count);
168    println!("Errors:              {:>10}", error_count);
169    println!(
170        "Total time:          {:>10.3} seconds",
171        total_time.as_secs_f64()
172    );
173    println!();
174
175    println!("Messages/second:     {:>10.0}", throughput);
176    println!();
177
178    println!("📉 LATENCY PERCENTILES");
179    println!("──────────────────────────────────────────");
180    println!("P50:                 {:>10.3} Ξs", p50.as_micros() as f64);
181    println!("P90:                 {:>10.3} Ξs", p90.as_micros() as f64);
182    println!("P95:                 {:>10.3} Ξs", p95.as_micros() as f64);
183    println!("P99:                 {:>10.3} Ξs", p99.as_micros() as f64);
184    println!();
185
186    Ok(())
187}