1use dataflow_rs::{Engine, Workflow, engine::message::Message};
2use serde_json::json;
3use std::time::Instant;
4
5const ITERATIONS: usize = 1_000_000;
6
7fn main() -> Result<(), Box<dyn std::error::Error>> {
8 println!("========================================");
9 println!("DATAFLOW ENGINE BENCHMARK");
10 println!("========================================\n");
11 println!(
12 "Running {} iterations on single-threaded engine\n",
13 ITERATIONS
14 );
15
16 let workflow_json = r#"
18 {
19 "id": "benchmark_workflow",
20 "name": "Benchmark Workflow",
21 "description": "Simple workflow for performance testing",
22 "priority": 1,
23 "tasks": [
24 {
25 "id": "transform_data",
26 "name": "Transform Data",
27 "description": "Map data fields",
28 "function": {
29 "name": "map",
30 "input": {
31 "mappings": [
32 {
33 "path": "data.user.id",
34 "logic": { "var": "temp_data.id" }
35 },
36 {
37 "path": "data.user.name",
38 "logic": { "var": "temp_data.name" }
39 },
40 {
41 "path": "data.user.email",
42 "logic": { "var": "temp_data.email" }
43 },
44 {
45 "path": "data.user.age",
46 "logic": { "+": [{ "var": "temp_data.age" }, 1] }
47 },
48 {
49 "path": "data.user.status",
50 "logic": {
51 "if": [
52 { ">": [{ "var": "temp_data.age" }, 18] },
53 "adult",
54 "minor"
55 ]
56 }
57 }
58 ]
59 }
60 }
61 },
62 {
63 "id": "validate_data",
64 "name": "Validate Data",
65 "description": "Validate transformed data",
66 "function": {
67 "name": "validate",
68 "input": {
69 "rules": [
70 {
71 "path": "data",
72 "logic": { "!!": { "var": "data.user.id" } },
73 "message": "User ID is required"
74 },
75 {
76 "path": "data",
77 "logic": { "!!": { "var": "data.user.email" } },
78 "message": "User email is required"
79 }
80 ]
81 }
82 }
83 }
84 ]
85 }
86 "#;
87
88 let workflow = Workflow::from_json(workflow_json)?;
90
91 let engine = Engine::new(vec![workflow], None, None);
93
94 let sample_data = json!({
96 "id": 12345,
97 "name": "John Doe",
98 "email": "john.doe@example.com",
99 "age": 25,
100 "department": "Engineering"
101 });
102
103 println!("Warming up...");
105 for _ in 0..1000 {
106 let mut message = Message::new(&json!({}));
107 message.temp_data = sample_data.clone();
108 let _ = engine.process_message(&mut message);
109 }
110
111 println!("Starting benchmark...\n");
113
114 let mut all_durations = Vec::with_capacity(ITERATIONS);
115 let mut success_count = 0;
116 let mut error_count = 0;
117
118 let benchmark_start = Instant::now();
119
120 for i in 0..ITERATIONS {
121 let mut message = Message::new(&json!({}));
122 message.temp_data = sample_data.clone();
123 message.metadata = json!({
124 "iteration": i,
125 "timestamp": chrono::Utc::now().to_rfc3339()
126 });
127
128 let iteration_start = Instant::now();
129 match engine.process_message(&mut message) {
130 Ok(_) => {
131 success_count += 1;
132 if message.has_errors() {
133 error_count += 1;
134 }
135 }
136 Err(_) => {
137 error_count += 1;
138 }
139 }
140 let iteration_duration = iteration_start.elapsed();
141 all_durations.push(iteration_duration);
142
143 if (i + 1) % 10000 == 0 {
145 print!(".");
146 use std::io::Write;
147 std::io::stdout().flush()?;
148 }
149 }
150
151 let total_time = benchmark_start.elapsed();
152 println!("\n\nBenchmark Complete!");
153 println!("==========================================\n");
154
155 all_durations.sort_unstable();
157 let p50 = all_durations[ITERATIONS * 50 / 100];
158 let p90 = all_durations[ITERATIONS * 90 / 100];
159 let p95 = all_durations[ITERATIONS * 95 / 100];
160 let p99 = all_durations[ITERATIONS * 99 / 100];
161 let throughput = ITERATIONS as f64 / total_time.as_secs_f64();
162
163 println!("ð PERFORMANCE METRICS");
165 println!("ââââââââââââââââââââââââââââââââââââââââââ");
166 println!("Total iterations: {:>10}", ITERATIONS);
167 println!("Successful: {:>10}", success_count);
168 println!("Errors: {:>10}", error_count);
169 println!(
170 "Total time: {:>10.3} seconds",
171 total_time.as_secs_f64()
172 );
173 println!();
174
175 println!("Messages/second: {:>10.0}", throughput);
176 println!();
177
178 println!("ð LATENCY PERCENTILES");
179 println!("ââââââââââââââââââââââââââââââââââââââââââ");
180 println!("P50: {:>10.3} Ξs", p50.as_micros() as f64);
181 println!("P90: {:>10.3} Ξs", p90.as_micros() as f64);
182 println!("P95: {:>10.3} Ξs", p95.as_micros() as f64);
183 println!("P99: {:>10.3} Ξs", p99.as_micros() as f64);
184 println!();
185
186 Ok(())
187}