benchmark/
benchmark.rs

1use dataflow_rs::{engine::message::Message, Engine, Workflow};
2use serde_json::{json, Value};
3use std::fs::{File, OpenOptions};
4use std::io::{self, Read};
5use std::path::Path;
6use std::time::{Duration, Instant};
7
8const BENCHMARK_LOG_FILE: &str = "benchmark_results.json";
9const VERSION: &str = env!("CARGO_PKG_VERSION");
10
11#[tokio::main]
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13    // Create the workflow engine (built-in functions are auto-registered)
14    let mut engine = Engine::new();
15
16    // Define a workflow that:
17    // 1. Uses pre-loaded data instead of HTTP fetch
18    // 2. Enriches the message with transformed data
19    // 3. Demonstrates proper async workflow execution
20    let workflow_json = r#"
21    {
22        "id": "benchmark_workflow",
23        "name": "Benchmark Workflow Example",
24        "description": "Demonstrates async workflow execution with data transformation",
25        "condition": { "==": [true, true] },
26        "tasks": [
27            {
28                "id": "transform_data",
29                "name": "Transform Data",
30                "description": "Map API response to our data model",
31                "function": {
32                    "name": "map",
33                    "input": {
34                        "mappings": [
35                            {
36                                "path": "data.user.id", 
37                                "logic": { "var": "temp_data.body.id" }
38                            },
39                            {
40                                "path": "data.user.name", 
41                                "logic": { "var": "temp_data.body.name" }
42                            },
43                            {
44                                "path": "data.user.email", 
45                                "logic": { "var": "temp_data.body.email" }
46                            },
47                            {
48                                "path": "data.user.address", 
49                                "logic": {
50                                    "cat": [
51                                        { "var": "temp_data.body.address.street" },
52                                        ", ",
53                                        { "var": "temp_data.body.address.city" }
54                                    ]
55                                }
56                            },
57                            {
58                                "path": "data.user.company", 
59                                "logic": { "var": "temp_data.body.company.name" }
60                            },
61                            {
62                                "path": "data.processed_at", 
63                                "logic": { "cat": ["Processed at ", { "var": "metadata.timestamp" }] }
64                            }
65                        ]
66                    }
67                }
68            }
69        ]
70    }
71    "#;
72
73    // Parse and add the workflow to the engine
74    let workflow = Workflow::from_json(workflow_json)?;
75    engine.add_workflow(&workflow);
76
77    // Create sample user data (similar to what the HTTP endpoint would return)
78    let sample_user_data = json!({
79        "body": {
80            "id": 1,
81            "name": "Leanne Graham",
82            "username": "Bret",
83            "email": "Sincere@april.biz",
84            "address": {
85                "street": "Kulas Light",
86                "suite": "Apt. 556",
87                "city": "Gwenborough",
88                "zipcode": "92998-3874",
89                "geo": {
90                    "lat": "-37.3159",
91                    "lng": "81.1496"
92                }
93            },
94            "phone": "1-770-736-8031 x56442",
95            "website": "hildegard.org",
96            "company": {
97                "name": "Romaguera-Crona",
98                "catchPhrase": "Multi-layered client-server neural-net",
99                "bs": "harness real-time e-markets"
100            }
101        }
102    });
103
104    // Run async benchmark
105    println!("=== ASYNC BENCHMARK ===");
106    let async_results = run_async_benchmark(&engine, &sample_user_data, 1000).await?;
107
108    // Run sync benchmark for comparison (using blocking approach)
109    println!("\n=== SYNC BENCHMARK (for comparison) ===");
110    let sync_results = run_sync_benchmark(&engine, &sample_user_data, 1000).await?;
111
112    // Compare results
113    println!("\n=== COMPARISON ===");
114    println!("Async avg: {:?}", async_results.avg_time);
115    println!("Sync avg:  {:?}", sync_results.avg_time);
116    println!(
117        "Async is {:.2}x {} than sync",
118        if async_results.avg_time < sync_results.avg_time {
119            sync_results.avg_time.as_nanos() as f64 / async_results.avg_time.as_nanos() as f64
120        } else {
121            async_results.avg_time.as_nanos() as f64 / sync_results.avg_time.as_nanos() as f64
122        },
123        if async_results.avg_time < sync_results.avg_time {
124            "faster"
125        } else {
126            "slower"
127        }
128    );
129
130    // Log results to file
131    log_benchmark_results(
132        async_results.iterations,
133        async_results.min_time,
134        async_results.max_time,
135        async_results.avg_time,
136        async_results.p95,
137        async_results.p99,
138        async_results.total_time,
139        "async".to_string(),
140    )?;
141
142    log_benchmark_results(
143        sync_results.iterations,
144        sync_results.min_time,
145        sync_results.max_time,
146        sync_results.avg_time,
147        sync_results.p95,
148        sync_results.p99,
149        sync_results.total_time,
150        "sync".to_string(),
151    )?;
152
153    println!("\nBenchmark results saved to '{}'", BENCHMARK_LOG_FILE);
154
155    Ok(())
156}
157
158#[derive(Debug)]
159struct BenchmarkResults {
160    iterations: usize,
161    min_time: Duration,
162    max_time: Duration,
163    avg_time: Duration,
164    p95: Duration,
165    p99: Duration,
166    total_time: Duration,
167}
168
169async fn run_async_benchmark(
170    engine: &Engine,
171    sample_user_data: &Value,
172    num_iterations: usize,
173) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
174    let mut total_duration = Duration::new(0, 0);
175    let mut min_duration = Duration::new(u64::MAX, 0);
176    let mut max_duration = Duration::new(0, 0);
177    let mut all_durations = Vec::with_capacity(num_iterations);
178    let mut error_count = 0;
179
180    println!(
181        "Starting async benchmark with {} iterations...",
182        num_iterations
183    );
184
185    for i in 0..num_iterations {
186        let mut message = Message::new(&json!({}));
187        message.temp_data = sample_user_data.clone();
188        message.data = json!({});
189        message.metadata = json!({
190            "timestamp": chrono::Utc::now().to_rfc3339(),
191            "iteration": i
192        });
193
194        let start = Instant::now();
195        match engine.process_message(&mut message).await {
196            Ok(_) => {
197                let duration = start.elapsed();
198                all_durations.push(duration);
199                total_duration += duration;
200                min_duration = min_duration.min(duration);
201                max_duration = max_duration.max(duration);
202
203                // Check for processing errors
204                if message.has_errors() {
205                    error_count += 1;
206                    if error_count <= 5 {
207                        // Only print first 5 errors
208                        println!("Processing errors in iteration {}: {:?}", i, message.errors);
209                    }
210                }
211            }
212            Err(e) => {
213                error_count += 1;
214                if error_count <= 5 {
215                    println!("Error in iteration {}: {:?}", i, e);
216                }
217                // Still record the time even for errors
218                let duration = start.elapsed();
219                all_durations.push(duration);
220                total_duration += duration;
221                min_duration = min_duration.min(duration);
222                max_duration = max_duration.max(duration);
223            }
224        }
225
226        if (i + 1) % 1000 == 0 {
227            println!("Completed {} async iterations", i + 1);
228        }
229    }
230
231    if error_count > 0 {
232        println!("Total errors encountered: {}", error_count);
233    }
234
235    // Sort durations for percentile calculations
236    all_durations.sort();
237
238    let p95_idx = (num_iterations as f64 * 0.95) as usize;
239    let p99_idx = (num_iterations as f64 * 0.99) as usize;
240    let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
241    let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
242    let avg_duration = total_duration / num_iterations as u32;
243
244    println!("\nAsync Benchmark Results (v{}):", VERSION);
245    println!("  Iterations: {}", num_iterations);
246    println!("  Errors: {}", error_count);
247    println!("  Min time: {:?}", min_duration);
248    println!("  Max time: {:?}", max_duration);
249    println!("  Avg time: {:?}", avg_duration);
250    println!("  95th percentile: {:?}", p95);
251    println!("  99th percentile: {:?}", p99);
252    println!("  Total time: {:?}", total_duration);
253
254    Ok(BenchmarkResults {
255        iterations: num_iterations,
256        min_time: min_duration,
257        max_time: max_duration,
258        avg_time: avg_duration,
259        p95: *p95,
260        p99: *p99,
261        total_time: total_duration,
262    })
263}
264
265async fn run_sync_benchmark(
266    engine: &Engine,
267    sample_user_data: &Value,
268    num_iterations: usize,
269) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
270    let mut total_duration = Duration::new(0, 0);
271    let mut min_duration = Duration::new(u64::MAX, 0);
272    let mut max_duration = Duration::new(0, 0);
273    let mut all_durations = Vec::with_capacity(num_iterations);
274    let mut error_count = 0;
275
276    println!(
277        "Starting sync-style benchmark with {} iterations...",
278        num_iterations
279    );
280
281    for i in 0..num_iterations {
282        let mut message = Message::new(&json!({}));
283        message.temp_data = sample_user_data.clone();
284        message.data = json!({});
285        message.metadata = json!({
286            "timestamp": chrono::Utc::now().to_rfc3339(),
287            "iteration": i
288        });
289
290        let start = Instant::now();
291        // Use tokio::task::block_in_place to simulate sync behavior
292        let result = tokio::task::block_in_place(|| {
293            tokio::runtime::Handle::current().block_on(engine.process_message(&mut message))
294        });
295
296        match result {
297            Ok(_) => {
298                let duration = start.elapsed();
299                all_durations.push(duration);
300                total_duration += duration;
301                min_duration = min_duration.min(duration);
302                max_duration = max_duration.max(duration);
303
304                if message.has_errors() {
305                    error_count += 1;
306                }
307            }
308            Err(e) => {
309                error_count += 1;
310                if error_count <= 5 {
311                    println!("Sync error in iteration {}: {:?}", i, e);
312                }
313                let duration = start.elapsed();
314                all_durations.push(duration);
315                total_duration += duration;
316                min_duration = min_duration.min(duration);
317                max_duration = max_duration.max(duration);
318            }
319        }
320
321        if (i + 1) % 1000 == 0 {
322            println!("Completed {} sync iterations", i + 1);
323        }
324    }
325
326    if error_count > 0 {
327        println!("Total sync errors encountered: {}", error_count);
328    }
329
330    all_durations.sort();
331
332    let p95_idx = (num_iterations as f64 * 0.95) as usize;
333    let p99_idx = (num_iterations as f64 * 0.99) as usize;
334    let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
335    let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
336    let avg_duration = total_duration / num_iterations as u32;
337
338    println!("\nSync Benchmark Results (v{}):", VERSION);
339    println!("  Iterations: {}", num_iterations);
340    println!("  Errors: {}", error_count);
341    println!("  Min time: {:?}", min_duration);
342    println!("  Max time: {:?}", max_duration);
343    println!("  Avg time: {:?}", avg_duration);
344    println!("  95th percentile: {:?}", p95);
345    println!("  99th percentile: {:?}", p99);
346    println!("  Total time: {:?}", total_duration);
347
348    Ok(BenchmarkResults {
349        iterations: num_iterations,
350        min_time: min_duration,
351        max_time: max_duration,
352        avg_time: avg_duration,
353        p95: *p95,
354        p99: *p99,
355        total_time: total_duration,
356    })
357}
358
359fn log_benchmark_results(
360    iterations: usize,
361    min_time: Duration,
362    max_time: Duration,
363    avg_time: Duration,
364    p95: Duration,
365    p99: Duration,
366    total_time: Duration,
367    benchmark_type: String,
368) -> io::Result<()> {
369    let mut benchmark_data = read_benchmark_file().unwrap_or_else(|_| json!({}));
370
371    let benchmark_entry = json!({
372        "iterations": iterations,
373        "min_time_ns": min_time.as_nanos(),
374        "max_time_ns": max_time.as_nanos(),
375        "avg_time_ns": avg_time.as_nanos(),
376        "p95_ns": p95.as_nanos(),
377        "p99_ns": p99.as_nanos(),
378        "total_time_ns": total_time.as_nanos(),
379        "timestamp": chrono::Utc::now().to_rfc3339(),
380        "benchmark_type": benchmark_type,
381    });
382
383    let version_key = format!("{}_{}", VERSION, benchmark_type);
384
385    if let Some(obj) = benchmark_data.as_object_mut() {
386        obj.insert(version_key, benchmark_entry);
387    }
388
389    let file = OpenOptions::new()
390        .write(true)
391        .create(true)
392        .truncate(true)
393        .open(BENCHMARK_LOG_FILE)?;
394
395    serde_json::to_writer_pretty(file, &benchmark_data)?;
396
397    Ok(())
398}
399
400fn read_benchmark_file() -> io::Result<Value> {
401    if !Path::new(BENCHMARK_LOG_FILE).exists() {
402        return Ok(json!({}));
403    }
404
405    let mut file = File::open(BENCHMARK_LOG_FILE)?;
406    let mut contents = String::new();
407    file.read_to_string(&mut contents)?;
408
409    match serde_json::from_str(&contents) {
410        Ok(value) => Ok(value),
411        Err(e) => {
412            eprintln!("Warning: Could not parse benchmark file: {}", e);
413            Ok(json!({}))
414        }
415    }
416}