benchmark/
benchmark.rs

1use dataflow_rs::{engine::message::Message, Engine, Workflow};
2use serde_json::{json, Value};
3use std::fs::{File, OpenOptions};
4use std::io::{self, Read};
5use std::path::Path;
6use std::time::{Duration, Instant};
7
8const BENCHMARK_LOG_FILE: &str = "benchmark_results.json";
9const VERSION: &str = env!("CARGO_PKG_VERSION");
10
11#[tokio::main]
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13    // Create the workflow engine (built-in functions are auto-registered)
14    let mut engine = Engine::new();
15
16    // Define a workflow that:
17    // 1. Uses pre-loaded data instead of HTTP fetch
18    // 2. Enriches the message with transformed data
19    // 3. Demonstrates proper async workflow execution
20    let workflow_json = r#"
21    {
22        "id": "benchmark_workflow",
23        "name": "Benchmark Workflow Example",
24        "description": "Demonstrates async workflow execution with data transformation",
25        "condition": { "==": [true, true] },
26        "tasks": [
27            {
28                "id": "transform_data",
29                "name": "Transform Data",
30                "description": "Map API response to our data model",
31                "function": {
32                    "name": "map",
33                    "input": {
34                        "mappings": [
35                            {
36                                "path": "data.user.id", 
37                                "logic": { "var": "temp_data.body.id" }
38                            },
39                            {
40                                "path": "data.user.name", 
41                                "logic": { "var": "temp_data.body.name" }
42                            },
43                            {
44                                "path": "data.user.email", 
45                                "logic": { "var": "temp_data.body.email" }
46                            },
47                            {
48                                "path": "data.user.address", 
49                                "logic": {
50                                    "cat": [
51                                        { "var": "temp_data.body.address.street" },
52                                        ", ",
53                                        { "var": "temp_data.body.address.city" }
54                                    ]
55                                }
56                            },
57                            {
58                                "path": "data.user.company", 
59                                "logic": { "var": "temp_data.body.company.name" }
60                            },
61                            {
62                                "path": "data.processed_at", 
63                                "logic": { "cat": ["Processed at ", { "var": "metadata.timestamp" }] }
64                            }
65                        ]
66                    }
67                }
68            }
69        ]
70    }
71    "#;
72
73    // Parse and add the workflow to the engine
74    let workflow = Workflow::from_json(workflow_json)?;
75    engine.add_workflow(&workflow);
76
77    // Create sample user data (similar to what the HTTP endpoint would return)
78    let sample_user_data = json!({
79        "body": {
80            "id": 1,
81            "name": "Leanne Graham",
82            "username": "Bret",
83            "email": "Sincere@april.biz",
84            "address": {
85                "street": "Kulas Light",
86                "suite": "Apt. 556",
87                "city": "Gwenborough",
88                "zipcode": "92998-3874",
89                "geo": {
90                    "lat": "-37.3159",
91                    "lng": "81.1496"
92                }
93            },
94            "phone": "1-770-736-8031 x56442",
95            "website": "hildegard.org",
96            "company": {
97                "name": "Romaguera-Crona",
98                "catchPhrase": "Multi-layered client-server neural-net",
99                "bs": "harness real-time e-markets"
100            }
101        }
102    });
103
104    // Run async benchmark
105    println!("=== ASYNC BENCHMARK ===");
106    let async_results = run_async_benchmark(&engine, &sample_user_data, 1000).await?;
107
108    // Run sync benchmark for comparison (using blocking approach)
109    println!("\n=== SYNC BENCHMARK (for comparison) ===");
110    let sync_results = run_sync_benchmark(&engine, &sample_user_data, 1000).await?;
111
112    // Compare results
113    println!("\n=== COMPARISON ===");
114    println!("Async avg: {:?}", async_results.avg_time);
115    println!("Sync avg:  {:?}", sync_results.avg_time);
116    println!(
117        "Async is {:.2}x {} than sync",
118        if async_results.avg_time < sync_results.avg_time {
119            sync_results.avg_time.as_nanos() as f64 / async_results.avg_time.as_nanos() as f64
120        } else {
121            async_results.avg_time.as_nanos() as f64 / sync_results.avg_time.as_nanos() as f64
122        },
123        if async_results.avg_time < sync_results.avg_time {
124            "faster"
125        } else {
126            "slower"
127        }
128    );
129
130    // Log results to file
131    log_benchmark_results(
132        async_results.iterations,
133        async_results.min_time,
134        async_results.max_time,
135        async_results.avg_time,
136        async_results.p95,
137        async_results.p99,
138        async_results.total_time,
139        "async".to_string(),
140    )?;
141
142    log_benchmark_results(
143        sync_results.iterations,
144        sync_results.min_time,
145        sync_results.max_time,
146        sync_results.avg_time,
147        sync_results.p95,
148        sync_results.p99,
149        sync_results.total_time,
150        "sync".to_string(),
151    )?;
152
153    println!("\nBenchmark results saved to '{BENCHMARK_LOG_FILE}'");
154
155    Ok(())
156}
157
158#[derive(Debug)]
159struct BenchmarkResults {
160    iterations: usize,
161    min_time: Duration,
162    max_time: Duration,
163    avg_time: Duration,
164    p95: Duration,
165    p99: Duration,
166    total_time: Duration,
167}
168
169async fn run_async_benchmark(
170    engine: &Engine,
171    sample_user_data: &Value,
172    num_iterations: usize,
173) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
174    let mut total_duration = Duration::new(0, 0);
175    let mut min_duration = Duration::new(u64::MAX, 0);
176    let mut max_duration = Duration::new(0, 0);
177    let mut all_durations = Vec::with_capacity(num_iterations);
178    let mut error_count = 0;
179
180    println!("Starting async benchmark with {num_iterations} iterations...");
181
182    for i in 0..num_iterations {
183        let mut message = Message::new(&json!({}));
184        message.temp_data = sample_user_data.clone();
185        message.data = json!({});
186        message.metadata = json!({
187            "timestamp": chrono::Utc::now().to_rfc3339(),
188            "iteration": i
189        });
190
191        let start = Instant::now();
192        match engine.process_message(&mut message).await {
193            Ok(_) => {
194                let duration = start.elapsed();
195                all_durations.push(duration);
196                total_duration += duration;
197                min_duration = min_duration.min(duration);
198                max_duration = max_duration.max(duration);
199
200                // Check for processing errors
201                if message.has_errors() {
202                    error_count += 1;
203                    if error_count <= 5 {
204                        // Only print first 5 errors
205                        println!("Processing errors in iteration {}: {:?}", i, message.errors);
206                    }
207                }
208            }
209            Err(e) => {
210                error_count += 1;
211                if error_count <= 5 {
212                    println!("Error in iteration {i}: {e:?}");
213                }
214                // Still record the time even for errors
215                let duration = start.elapsed();
216                all_durations.push(duration);
217                total_duration += duration;
218                min_duration = min_duration.min(duration);
219                max_duration = max_duration.max(duration);
220            }
221        }
222
223        if (i + 1) % 1000 == 0 {
224            println!("Completed {} async iterations", i + 1);
225        }
226    }
227
228    if error_count > 0 {
229        println!("Total errors encountered: {error_count}");
230    }
231
232    // Sort durations for percentile calculations
233    all_durations.sort();
234
235    let p95_idx = (num_iterations as f64 * 0.95) as usize;
236    let p99_idx = (num_iterations as f64 * 0.99) as usize;
237    let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
238    let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
239    let avg_duration = total_duration / num_iterations as u32;
240
241    println!("\nAsync Benchmark Results (v{VERSION}):");
242    println!("  Iterations: {num_iterations}");
243    println!("  Errors: {error_count}");
244    println!("  Min time: {min_duration:?}");
245    println!("  Max time: {max_duration:?}");
246    println!("  Avg time: {avg_duration:?}");
247    println!("  95th percentile: {p95:?}");
248    println!("  99th percentile: {p99:?}");
249    println!("  Total time: {total_duration:?}");
250
251    Ok(BenchmarkResults {
252        iterations: num_iterations,
253        min_time: min_duration,
254        max_time: max_duration,
255        avg_time: avg_duration,
256        p95: *p95,
257        p99: *p99,
258        total_time: total_duration,
259    })
260}
261
262async fn run_sync_benchmark(
263    engine: &Engine,
264    sample_user_data: &Value,
265    num_iterations: usize,
266) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
267    let mut total_duration = Duration::new(0, 0);
268    let mut min_duration = Duration::new(u64::MAX, 0);
269    let mut max_duration = Duration::new(0, 0);
270    let mut all_durations = Vec::with_capacity(num_iterations);
271    let mut error_count = 0;
272
273    println!("Starting sync-style benchmark with {num_iterations} iterations...");
274
275    for i in 0..num_iterations {
276        let mut message = Message::new(&json!({}));
277        message.temp_data = sample_user_data.clone();
278        message.data = json!({});
279        message.metadata = json!({
280            "timestamp": chrono::Utc::now().to_rfc3339(),
281            "iteration": i
282        });
283
284        let start = Instant::now();
285        // Use tokio::task::block_in_place to simulate sync behavior
286        let result = tokio::task::block_in_place(|| {
287            tokio::runtime::Handle::current().block_on(engine.process_message(&mut message))
288        });
289
290        match result {
291            Ok(_) => {
292                let duration = start.elapsed();
293                all_durations.push(duration);
294                total_duration += duration;
295                min_duration = min_duration.min(duration);
296                max_duration = max_duration.max(duration);
297
298                if message.has_errors() {
299                    error_count += 1;
300                }
301            }
302            Err(e) => {
303                error_count += 1;
304                if error_count <= 5 {
305                    println!("Sync error in iteration {i}: {e:?}");
306                }
307                let duration = start.elapsed();
308                all_durations.push(duration);
309                total_duration += duration;
310                min_duration = min_duration.min(duration);
311                max_duration = max_duration.max(duration);
312            }
313        }
314
315        if (i + 1) % 1000 == 0 {
316            println!("Completed {} sync iterations", i + 1);
317        }
318    }
319
320    if error_count > 0 {
321        println!("Total sync errors encountered: {error_count}");
322    }
323
324    all_durations.sort();
325
326    let p95_idx = (num_iterations as f64 * 0.95) as usize;
327    let p99_idx = (num_iterations as f64 * 0.99) as usize;
328    let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
329    let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
330    let avg_duration = total_duration / num_iterations as u32;
331
332    println!("\nSync Benchmark Results (v{VERSION}):");
333    println!("  Iterations: {num_iterations}");
334    println!("  Errors: {error_count}");
335    println!("  Min time: {min_duration:?}");
336    println!("  Max time: {max_duration:?}");
337    println!("  Avg time: {avg_duration:?}");
338    println!("  95th percentile: {p95:?}");
339    println!("  99th percentile: {p99:?}");
340    println!("  Total time: {total_duration:?}");
341
342    Ok(BenchmarkResults {
343        iterations: num_iterations,
344        min_time: min_duration,
345        max_time: max_duration,
346        avg_time: avg_duration,
347        p95: *p95,
348        p99: *p99,
349        total_time: total_duration,
350    })
351}
352
353fn log_benchmark_results(
354    iterations: usize,
355    min_time: Duration,
356    max_time: Duration,
357    avg_time: Duration,
358    p95: Duration,
359    p99: Duration,
360    total_time: Duration,
361    benchmark_type: String,
362) -> io::Result<()> {
363    let mut benchmark_data = read_benchmark_file().unwrap_or_else(|_| json!({}));
364
365    let benchmark_entry = json!({
366        "iterations": iterations,
367        "min_time_ns": min_time.as_nanos(),
368        "max_time_ns": max_time.as_nanos(),
369        "avg_time_ns": avg_time.as_nanos(),
370        "p95_ns": p95.as_nanos(),
371        "p99_ns": p99.as_nanos(),
372        "total_time_ns": total_time.as_nanos(),
373        "timestamp": chrono::Utc::now().to_rfc3339(),
374        "benchmark_type": benchmark_type,
375    });
376
377    let version_key = format!("{VERSION}_{benchmark_type}");
378
379    if let Some(obj) = benchmark_data.as_object_mut() {
380        obj.insert(version_key, benchmark_entry);
381    }
382
383    let file = OpenOptions::new()
384        .write(true)
385        .create(true)
386        .truncate(true)
387        .open(BENCHMARK_LOG_FILE)?;
388
389    serde_json::to_writer_pretty(file, &benchmark_data)?;
390
391    Ok(())
392}
393
394fn read_benchmark_file() -> io::Result<Value> {
395    if !Path::new(BENCHMARK_LOG_FILE).exists() {
396        return Ok(json!({}));
397    }
398
399    let mut file = File::open(BENCHMARK_LOG_FILE)?;
400    let mut contents = String::new();
401    file.read_to_string(&mut contents)?;
402
403    match serde_json::from_str(&contents) {
404        Ok(value) => Ok(value),
405        Err(e) => {
406            eprintln!("Warning: Could not parse benchmark file: {e}");
407            Ok(json!({}))
408        }
409    }
410}