1use dataflow_rs::{engine::message::Message, Engine, Workflow};
2use serde_json::{json, Value};
3use std::fs::{File, OpenOptions};
4use std::io::{self, Read};
5use std::path::Path;
6use std::time::{Duration, Instant};
7
8const BENCHMARK_LOG_FILE: &str = "benchmark_results.json";
9const VERSION: &str = env!("CARGO_PKG_VERSION");
10
11#[tokio::main]
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13 let mut engine = Engine::new();
15
16 let workflow_json = r#"
21 {
22 "id": "benchmark_workflow",
23 "name": "Benchmark Workflow Example",
24 "description": "Demonstrates async workflow execution with data transformation",
25 "condition": { "==": [true, true] },
26 "tasks": [
27 {
28 "id": "transform_data",
29 "name": "Transform Data",
30 "description": "Map API response to our data model",
31 "function": {
32 "name": "map",
33 "input": {
34 "mappings": [
35 {
36 "path": "data.user.id",
37 "logic": { "var": "temp_data.body.id" }
38 },
39 {
40 "path": "data.user.name",
41 "logic": { "var": "temp_data.body.name" }
42 },
43 {
44 "path": "data.user.email",
45 "logic": { "var": "temp_data.body.email" }
46 },
47 {
48 "path": "data.user.address",
49 "logic": {
50 "cat": [
51 { "var": "temp_data.body.address.street" },
52 ", ",
53 { "var": "temp_data.body.address.city" }
54 ]
55 }
56 },
57 {
58 "path": "data.user.company",
59 "logic": { "var": "temp_data.body.company.name" }
60 },
61 {
62 "path": "data.processed_at",
63 "logic": { "cat": ["Processed at ", { "var": "metadata.timestamp" }] }
64 }
65 ]
66 }
67 }
68 }
69 ]
70 }
71 "#;
72
73 let workflow = Workflow::from_json(workflow_json)?;
75 engine.add_workflow(&workflow);
76
77 let sample_user_data = json!({
79 "body": {
80 "id": 1,
81 "name": "Leanne Graham",
82 "username": "Bret",
83 "email": "Sincere@april.biz",
84 "address": {
85 "street": "Kulas Light",
86 "suite": "Apt. 556",
87 "city": "Gwenborough",
88 "zipcode": "92998-3874",
89 "geo": {
90 "lat": "-37.3159",
91 "lng": "81.1496"
92 }
93 },
94 "phone": "1-770-736-8031 x56442",
95 "website": "hildegard.org",
96 "company": {
97 "name": "Romaguera-Crona",
98 "catchPhrase": "Multi-layered client-server neural-net",
99 "bs": "harness real-time e-markets"
100 }
101 }
102 });
103
104 println!("=== ASYNC BENCHMARK ===");
106 let async_results = run_async_benchmark(&engine, &sample_user_data, 1000).await?;
107
108 println!("\n=== SYNC BENCHMARK (for comparison) ===");
110 let sync_results = run_sync_benchmark(&engine, &sample_user_data, 1000).await?;
111
112 println!("\n=== COMPARISON ===");
114 println!("Async avg: {:?}", async_results.avg_time);
115 println!("Sync avg: {:?}", sync_results.avg_time);
116 println!(
117 "Async is {:.2}x {} than sync",
118 if async_results.avg_time < sync_results.avg_time {
119 sync_results.avg_time.as_nanos() as f64 / async_results.avg_time.as_nanos() as f64
120 } else {
121 async_results.avg_time.as_nanos() as f64 / sync_results.avg_time.as_nanos() as f64
122 },
123 if async_results.avg_time < sync_results.avg_time {
124 "faster"
125 } else {
126 "slower"
127 }
128 );
129
130 log_benchmark_results(
132 async_results.iterations,
133 async_results.min_time,
134 async_results.max_time,
135 async_results.avg_time,
136 async_results.p95,
137 async_results.p99,
138 async_results.total_time,
139 "async".to_string(),
140 )?;
141
142 log_benchmark_results(
143 sync_results.iterations,
144 sync_results.min_time,
145 sync_results.max_time,
146 sync_results.avg_time,
147 sync_results.p95,
148 sync_results.p99,
149 sync_results.total_time,
150 "sync".to_string(),
151 )?;
152
153 println!("\nBenchmark results saved to '{}'", BENCHMARK_LOG_FILE);
154
155 Ok(())
156}
157
158#[derive(Debug)]
159struct BenchmarkResults {
160 iterations: usize,
161 min_time: Duration,
162 max_time: Duration,
163 avg_time: Duration,
164 p95: Duration,
165 p99: Duration,
166 total_time: Duration,
167}
168
169async fn run_async_benchmark(
170 engine: &Engine,
171 sample_user_data: &Value,
172 num_iterations: usize,
173) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
174 let mut total_duration = Duration::new(0, 0);
175 let mut min_duration = Duration::new(u64::MAX, 0);
176 let mut max_duration = Duration::new(0, 0);
177 let mut all_durations = Vec::with_capacity(num_iterations);
178 let mut error_count = 0;
179
180 println!(
181 "Starting async benchmark with {} iterations...",
182 num_iterations
183 );
184
185 for i in 0..num_iterations {
186 let mut message = Message::new(&json!({}));
187 message.temp_data = sample_user_data.clone();
188 message.data = json!({});
189 message.metadata = json!({
190 "timestamp": chrono::Utc::now().to_rfc3339(),
191 "iteration": i
192 });
193
194 let start = Instant::now();
195 match engine.process_message(&mut message).await {
196 Ok(_) => {
197 let duration = start.elapsed();
198 all_durations.push(duration);
199 total_duration += duration;
200 min_duration = min_duration.min(duration);
201 max_duration = max_duration.max(duration);
202
203 if message.has_errors() {
205 error_count += 1;
206 if error_count <= 5 {
207 println!("Processing errors in iteration {}: {:?}", i, message.errors);
209 }
210 }
211 }
212 Err(e) => {
213 error_count += 1;
214 if error_count <= 5 {
215 println!("Error in iteration {}: {:?}", i, e);
216 }
217 let duration = start.elapsed();
219 all_durations.push(duration);
220 total_duration += duration;
221 min_duration = min_duration.min(duration);
222 max_duration = max_duration.max(duration);
223 }
224 }
225
226 if (i + 1) % 1000 == 0 {
227 println!("Completed {} async iterations", i + 1);
228 }
229 }
230
231 if error_count > 0 {
232 println!("Total errors encountered: {}", error_count);
233 }
234
235 all_durations.sort();
237
238 let p95_idx = (num_iterations as f64 * 0.95) as usize;
239 let p99_idx = (num_iterations as f64 * 0.99) as usize;
240 let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
241 let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
242 let avg_duration = total_duration / num_iterations as u32;
243
244 println!("\nAsync Benchmark Results (v{}):", VERSION);
245 println!(" Iterations: {}", num_iterations);
246 println!(" Errors: {}", error_count);
247 println!(" Min time: {:?}", min_duration);
248 println!(" Max time: {:?}", max_duration);
249 println!(" Avg time: {:?}", avg_duration);
250 println!(" 95th percentile: {:?}", p95);
251 println!(" 99th percentile: {:?}", p99);
252 println!(" Total time: {:?}", total_duration);
253
254 Ok(BenchmarkResults {
255 iterations: num_iterations,
256 min_time: min_duration,
257 max_time: max_duration,
258 avg_time: avg_duration,
259 p95: *p95,
260 p99: *p99,
261 total_time: total_duration,
262 })
263}
264
265async fn run_sync_benchmark(
266 engine: &Engine,
267 sample_user_data: &Value,
268 num_iterations: usize,
269) -> Result<BenchmarkResults, Box<dyn std::error::Error>> {
270 let mut total_duration = Duration::new(0, 0);
271 let mut min_duration = Duration::new(u64::MAX, 0);
272 let mut max_duration = Duration::new(0, 0);
273 let mut all_durations = Vec::with_capacity(num_iterations);
274 let mut error_count = 0;
275
276 println!(
277 "Starting sync-style benchmark with {} iterations...",
278 num_iterations
279 );
280
281 for i in 0..num_iterations {
282 let mut message = Message::new(&json!({}));
283 message.temp_data = sample_user_data.clone();
284 message.data = json!({});
285 message.metadata = json!({
286 "timestamp": chrono::Utc::now().to_rfc3339(),
287 "iteration": i
288 });
289
290 let start = Instant::now();
291 let result = tokio::task::block_in_place(|| {
293 tokio::runtime::Handle::current().block_on(engine.process_message(&mut message))
294 });
295
296 match result {
297 Ok(_) => {
298 let duration = start.elapsed();
299 all_durations.push(duration);
300 total_duration += duration;
301 min_duration = min_duration.min(duration);
302 max_duration = max_duration.max(duration);
303
304 if message.has_errors() {
305 error_count += 1;
306 }
307 }
308 Err(e) => {
309 error_count += 1;
310 if error_count <= 5 {
311 println!("Sync error in iteration {}: {:?}", i, e);
312 }
313 let duration = start.elapsed();
314 all_durations.push(duration);
315 total_duration += duration;
316 min_duration = min_duration.min(duration);
317 max_duration = max_duration.max(duration);
318 }
319 }
320
321 if (i + 1) % 1000 == 0 {
322 println!("Completed {} sync iterations", i + 1);
323 }
324 }
325
326 if error_count > 0 {
327 println!("Total sync errors encountered: {}", error_count);
328 }
329
330 all_durations.sort();
331
332 let p95_idx = (num_iterations as f64 * 0.95) as usize;
333 let p99_idx = (num_iterations as f64 * 0.99) as usize;
334 let p95 = all_durations.get(p95_idx).unwrap_or(&Duration::ZERO);
335 let p99 = all_durations.get(p99_idx).unwrap_or(&Duration::ZERO);
336 let avg_duration = total_duration / num_iterations as u32;
337
338 println!("\nSync Benchmark Results (v{}):", VERSION);
339 println!(" Iterations: {}", num_iterations);
340 println!(" Errors: {}", error_count);
341 println!(" Min time: {:?}", min_duration);
342 println!(" Max time: {:?}", max_duration);
343 println!(" Avg time: {:?}", avg_duration);
344 println!(" 95th percentile: {:?}", p95);
345 println!(" 99th percentile: {:?}", p99);
346 println!(" Total time: {:?}", total_duration);
347
348 Ok(BenchmarkResults {
349 iterations: num_iterations,
350 min_time: min_duration,
351 max_time: max_duration,
352 avg_time: avg_duration,
353 p95: *p95,
354 p99: *p99,
355 total_time: total_duration,
356 })
357}
358
359fn log_benchmark_results(
360 iterations: usize,
361 min_time: Duration,
362 max_time: Duration,
363 avg_time: Duration,
364 p95: Duration,
365 p99: Duration,
366 total_time: Duration,
367 benchmark_type: String,
368) -> io::Result<()> {
369 let mut benchmark_data = read_benchmark_file().unwrap_or_else(|_| json!({}));
370
371 let benchmark_entry = json!({
372 "iterations": iterations,
373 "min_time_ns": min_time.as_nanos(),
374 "max_time_ns": max_time.as_nanos(),
375 "avg_time_ns": avg_time.as_nanos(),
376 "p95_ns": p95.as_nanos(),
377 "p99_ns": p99.as_nanos(),
378 "total_time_ns": total_time.as_nanos(),
379 "timestamp": chrono::Utc::now().to_rfc3339(),
380 "benchmark_type": benchmark_type,
381 });
382
383 let version_key = format!("{}_{}", VERSION, benchmark_type);
384
385 if let Some(obj) = benchmark_data.as_object_mut() {
386 obj.insert(version_key, benchmark_entry);
387 }
388
389 let file = OpenOptions::new()
390 .write(true)
391 .create(true)
392 .truncate(true)
393 .open(BENCHMARK_LOG_FILE)?;
394
395 serde_json::to_writer_pretty(file, &benchmark_data)?;
396
397 Ok(())
398}
399
400fn read_benchmark_file() -> io::Result<Value> {
401 if !Path::new(BENCHMARK_LOG_FILE).exists() {
402 return Ok(json!({}));
403 }
404
405 let mut file = File::open(BENCHMARK_LOG_FILE)?;
406 let mut contents = String::new();
407 file.read_to_string(&mut contents)?;
408
409 match serde_json::from_str(&contents) {
410 Ok(value) => Ok(value),
411 Err(e) => {
412 eprintln!("Warning: Could not parse benchmark file: {}", e);
413 Ok(json!({}))
414 }
415 }
416}