adaptive_pipeline/application/use_cases/
benchmark_system.rs1use anyhow::Result;
32use std::path::PathBuf;
33use std::sync::Arc;
34use std::time::Instant;
35use tracing::{info, warn};
36
37use crate::infrastructure::metrics::MetricsService;
38use adaptive_pipeline_domain::value_objects::chunk_size::ChunkSize;
39use adaptive_pipeline_domain::value_objects::worker_count::WorkerCount;
40
41#[derive(Debug, Clone)]
43struct BenchmarkResult {
44 file_size_mb: usize,
45 chunk_size_mb: usize,
46 worker_count: usize,
47 avg_throughput_mbps: f64,
48 avg_duration_secs: f64,
49 config_type: String,
50}
51
52#[derive(Debug)]
54struct TestResult {
55 avg_throughput_mbps: f64,
56 avg_duration_secs: f64,
57}
58
59pub struct BenchmarkSystemUseCase;
64
65impl BenchmarkSystemUseCase {
66 pub fn new() -> Self {
68 Self
69 }
70
71 pub async fn execute(&self, file: Option<PathBuf>, size_mb: usize, iterations: usize) -> Result<()> {
105 info!("Running comprehensive pipeline optimization benchmark");
106 info!("Test size: {}MB", size_mb);
107 info!("Iterations: {}", iterations);
108
109 let metrics_service = Arc::new(MetricsService::new()?);
111
112 let test_sizes = if size_mb > 0 {
114 vec![size_mb]
115 } else {
116 vec![1, 5, 10, 50, 100, 500, 1000, 2048] };
118
119 let chunk_sizes = vec![1, 2, 4, 8, 16, 32, 64, 128];
121
122 let available_cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(4);
124 let max_workers = (available_cores * 2).min(16);
125 let worker_counts: Vec<usize> = (1..=max_workers).collect();
126
127 println!(
128 "\n========================================================================================================================"
129 );
130 println!(
131 "========================================== PIPELINE OPTIMIZATION BENCHMARK \
132 ==========================================="
133 );
134 println!(
135 "========================================================================================================================"
136 );
137 println!("System Info: {} CPU cores available", available_cores);
138 println!("Test Iterations: {}", iterations);
139 println!("File Sizes: {:?} MB", test_sizes);
140 println!("Chunk Sizes: {:?} MB", chunk_sizes);
141 println!("Worker Counts: {:?}", worker_counts);
142 println!(
143 "========================================================================================================================"
144 );
145
146 let mut results = Vec::new();
147
148 for &test_size_mb in &test_sizes {
149 println!("\nš Testing file size: {} MB", test_size_mb);
150
151 let test_file = if let Some(ref provided_file) = file {
153 provided_file.clone()
154 } else {
155 let test_file = PathBuf::from(format!("benchmark_test_{}mb.txt", test_size_mb));
156 Self::generate_test_file(&test_file, test_size_mb).await?;
157 test_file
158 };
159
160 let file_size_bytes = (test_size_mb * 1024 * 1024) as u64;
162 let adaptive_chunk = ChunkSize::optimal_for_file_size(file_size_bytes);
163 let adaptive_workers = WorkerCount::optimal_for_file_size(file_size_bytes);
164
165 println!(
166 " Adaptive recommendations: {} chunk, {} workers",
167 adaptive_chunk.megabytes(),
168 adaptive_workers.count()
169 );
170
171 println!(" Testing adaptive configuration...");
173 let adaptive_chunk_mb = ((adaptive_chunk.bytes() as f64) / (1024.0 * 1024.0)).max(1.0) as usize;
174 let adaptive_result = Self::run_benchmark_test(
175 &test_file,
176 test_size_mb,
177 Some(adaptive_chunk_mb),
178 Some(adaptive_workers.count()),
179 iterations,
180 &metrics_service,
181 )
182 .await?;
183
184 results.push(BenchmarkResult {
185 file_size_mb: test_size_mb,
186 chunk_size_mb: adaptive_chunk_mb,
187 worker_count: adaptive_workers.count(),
188 avg_throughput_mbps: adaptive_result.avg_throughput_mbps,
189 avg_duration_secs: adaptive_result.avg_duration_secs,
190 config_type: "Adaptive".to_string(),
191 });
192
193 println!(" Testing variations around adaptive values...");
195
196 for &chunk_mb in &chunk_sizes {
198 if chunk_mb == (adaptive_chunk.megabytes() as usize) {
199 continue; }
201
202 let result = Self::run_benchmark_test(
203 &test_file,
204 test_size_mb,
205 Some(chunk_mb),
206 Some(adaptive_workers.count()),
207 iterations,
208 &metrics_service,
209 )
210 .await?;
211
212 results.push(BenchmarkResult {
213 file_size_mb: test_size_mb,
214 chunk_size_mb: chunk_mb,
215 worker_count: adaptive_workers.count(),
216 avg_throughput_mbps: result.avg_throughput_mbps,
217 avg_duration_secs: result.avg_duration_secs,
218 config_type: "Chunk Variation".to_string(),
219 });
220 }
221
222 for &workers in &worker_counts {
224 if workers == adaptive_workers.count() {
225 continue; }
227
228 let result = Self::run_benchmark_test(
229 &test_file,
230 test_size_mb,
231 Some(adaptive_chunk_mb),
232 Some(workers),
233 iterations,
234 &metrics_service,
235 )
236 .await?;
237
238 results.push(BenchmarkResult {
239 file_size_mb: test_size_mb,
240 chunk_size_mb: adaptive_chunk_mb,
241 worker_count: workers,
242 avg_throughput_mbps: result.avg_throughput_mbps,
243 avg_duration_secs: result.avg_duration_secs,
244 config_type: "Worker Variation".to_string(),
245 });
246 }
247
248 if file.is_none() && test_file.exists() {
250 std::fs::remove_file(&test_file)?;
251 }
252 }
253
254 Self::generate_optimization_report(&results).await?;
256
257 println!("\nā
Benchmark completed successfully!");
258 println!("š Check the generated optimization report for detailed results.");
259
260 Ok(())
261 }
262
263 async fn simulate_pipeline_processing(
265 input_file: &PathBuf,
266 output_file: &PathBuf,
267 chunk_size_mb: usize,
268 worker_count: usize,
269 ) -> Result<()> {
270 use std::io::{Read, Write};
271 use tokio::task;
272
273 let chunk_size_bytes = chunk_size_mb * 1024 * 1024;
274 let mut input = std::fs::File::open(input_file)?;
275 let mut output = std::fs::File::create(output_file)?;
276
277 let mut buffer = vec![0u8; chunk_size_bytes];
279 let mut chunks = Vec::new();
280
281 loop {
282 let bytes_read = input.read(&mut buffer)?;
283 if bytes_read == 0 {
284 break;
285 }
286 chunks.push(buffer[..bytes_read].to_vec());
287 }
288
289 let chunk_count = chunks.len();
291 let chunks_per_worker = chunk_count.div_ceil(worker_count);
292
293 let mut handles = Vec::new();
294 for worker_id in 0..worker_count {
295 let start_idx = worker_id * chunks_per_worker;
296 let end_idx = ((worker_id + 1) * chunks_per_worker).min(chunk_count);
297
298 if start_idx < chunk_count {
299 let worker_chunks = chunks[start_idx..end_idx].to_vec();
300 let handle = task::spawn(async move {
301 for chunk in &worker_chunks {
303 let _processed: Vec<u8> = chunk.iter().map(|&b| b ^ 0x42).collect();
305 tokio::time::sleep(std::time::Duration::from_micros(1)).await;
307 }
308 worker_chunks
309 });
310 handles.push(handle);
311 }
312 }
313
314 for handle in handles {
316 let processed_chunks = handle.await.map_err(|e| anyhow::anyhow!("Worker task failed: {}", e))?;
317 for chunk in processed_chunks {
318 output.write_all(&chunk)?;
319 }
320 }
321
322 output.flush()?;
323 Ok(())
324 }
325
326 async fn generate_test_file(path: &PathBuf, size_mb: usize) -> Result<()> {
328 use std::io::Write;
329
330 let mut file = std::fs::File::create(path)?;
331 let chunk_size = 1024 * 1024; let data = vec![b'A'; chunk_size];
333
334 for _ in 0..size_mb {
335 file.write_all(&data)?;
336 }
337
338 file.flush()?;
339 Ok(())
340 }
341
342 async fn run_benchmark_test(
344 test_file: &PathBuf,
345 _file_size_mb: usize,
346 chunk_size_mb: Option<usize>,
347 worker_count: Option<usize>,
348 iterations: usize,
349 _metrics_service: &Arc<MetricsService>,
350 ) -> Result<TestResult> {
351 let mut durations = Vec::new();
352 let mut throughputs = Vec::new();
353
354 for i in 0..iterations {
355 let output_file = PathBuf::from(format!("benchmark_output_{}_{}.adapipe", std::process::id(), i));
356
357 let start_time = Instant::now();
358
359 let result = Self::simulate_pipeline_processing(
360 test_file,
361 &output_file,
362 chunk_size_mb.unwrap_or(1),
363 worker_count.unwrap_or(1),
364 )
365 .await;
366
367 let duration = start_time.elapsed();
368
369 if output_file.exists() {
371 std::fs::remove_file(&output_file)?;
372 }
373
374 match result {
375 Ok(_) => {
376 let duration_secs = duration.as_secs_f64();
377 let file_size_bytes = std::fs::metadata(test_file)?.len();
378 let throughput_mbps = (file_size_bytes as f64) / (1024.0 * 1024.0) / duration_secs;
379
380 durations.push(duration_secs);
381 throughputs.push(throughput_mbps);
382 }
383 Err(e) => {
384 warn!("Benchmark iteration {} failed: {}", i, e);
385 durations.push(999.0);
386 throughputs.push(0.0);
387 }
388 }
389 }
390
391 let avg_duration = durations.iter().sum::<f64>() / (durations.len() as f64);
392 let avg_throughput = throughputs.iter().sum::<f64>() / (throughputs.len() as f64);
393
394 Ok(TestResult {
395 avg_throughput_mbps: avg_throughput,
396 avg_duration_secs: avg_duration,
397 })
398 }
399
400 async fn generate_optimization_report(results: &[BenchmarkResult]) -> Result<()> {
402 let report_file = PathBuf::from("pipeline_optimization_report.md");
403 let mut report = String::new();
404
405 report.push_str("# Pipeline Optimization Benchmark Report\n\n");
406 report.push_str(&format!(
407 "Generated: {}\n\n",
408 chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC")
409 ));
410
411 let mut file_sizes: Vec<usize> = results.iter().map(|r| r.file_size_mb).collect();
413 file_sizes.sort_unstable();
414 file_sizes.dedup();
415
416 for file_size in &file_sizes {
417 report.push_str(&format!("## File Size: {} MB\n\n", file_size));
418
419 let size_results: Vec<_> = results.iter().filter(|r| r.file_size_mb == *file_size).collect();
420
421 let best_result = size_results
423 .iter()
424 .max_by(|a, b| {
425 a.avg_throughput_mbps
426 .partial_cmp(&b.avg_throughput_mbps)
427 .unwrap_or(std::cmp::Ordering::Equal)
428 })
429 .ok_or_else(|| anyhow::anyhow!("No benchmark results found"))?;
430
431 let adaptive_result = size_results
432 .iter()
433 .find(|r| r.config_type == "Adaptive")
434 .ok_or_else(|| anyhow::anyhow!("No adaptive results found"))?;
435
436 report.push_str("**Adaptive Configuration:**\n");
437 report.push_str(&format!("- Chunk Size: {} MB\n", adaptive_result.chunk_size_mb));
438 report.push_str(&format!("- Worker Count: {}\n", adaptive_result.worker_count));
439 report.push_str(&format!(
440 "- Throughput: {:.2} MB/s\n",
441 adaptive_result.avg_throughput_mbps
442 ));
443 report.push_str(&format!(
444 "- Duration: {:.2} seconds\n\n",
445 adaptive_result.avg_duration_secs
446 ));
447
448 report.push_str("**Best Configuration:**\n");
449 report.push_str(&format!("- Chunk Size: {} MB\n", best_result.chunk_size_mb));
450 report.push_str(&format!("- Worker Count: {}\n", best_result.worker_count));
451 report.push_str(&format!("- Throughput: {:.2} MB/s\n", best_result.avg_throughput_mbps));
452 report.push_str(&format!("- Duration: {:.2} seconds\n", best_result.avg_duration_secs));
453 report.push_str(&format!("- Configuration Type: {}\n\n", best_result.config_type));
454
455 let improvement = ((best_result.avg_throughput_mbps - adaptive_result.avg_throughput_mbps)
456 / adaptive_result.avg_throughput_mbps)
457 * 100.0;
458
459 if improvement > 0.0 {
460 report.push_str(&format!(
461 "**Performance Improvement:** {:.1}% faster than adaptive\n\n",
462 improvement
463 ));
464 } else {
465 report.push_str("**Performance:** Adaptive configuration is optimal\n\n");
466 }
467
468 report.push_str("### Detailed Results\n\n");
470 report.push_str("| Chunk Size (MB) | Workers | Throughput (MB/s) | Duration (s) | Config Type |\n");
471 report.push_str("|-----------------|---------|-------------------|--------------|-------------|\n");
472
473 let mut sorted_results = size_results.clone();
474 sorted_results.sort_by(|a, b| {
475 b.avg_throughput_mbps
476 .partial_cmp(&a.avg_throughput_mbps)
477 .unwrap_or(std::cmp::Ordering::Equal)
478 });
479
480 for result in sorted_results {
481 report.push_str(&format!(
482 "| {} | {} | {:.2} | {:.2} | {} |\n",
483 result.chunk_size_mb,
484 result.worker_count,
485 result.avg_throughput_mbps,
486 result.avg_duration_secs,
487 result.config_type
488 ));
489 }
490
491 report.push('\n');
492 }
493
494 report.push_str("## Summary Recommendations\n\n");
496
497 for file_size in &file_sizes {
498 let size_results: Vec<_> = results.iter().filter(|r| r.file_size_mb == *file_size).collect();
499
500 let best_result = size_results
501 .iter()
502 .max_by(|a, b| {
503 a.avg_throughput_mbps
504 .partial_cmp(&b.avg_throughput_mbps)
505 .unwrap_or(std::cmp::Ordering::Equal)
506 })
507 .ok_or_else(|| anyhow::anyhow!("No benchmark results found"))?;
508
509 report.push_str(&format!(
510 "- **{} MB files**: {} MB chunks, {} workers ({:.2} MB/s)\n",
511 file_size, best_result.chunk_size_mb, best_result.worker_count, best_result.avg_throughput_mbps
512 ));
513 }
514
515 std::fs::write(&report_file, report)?;
517
518 println!("\nš Optimization report generated: {}", report_file.display());
519
520 Ok(())
521 }
522}
523
524impl Default for BenchmarkSystemUseCase {
525 fn default() -> Self {
526 Self::new()
527 }
528}
529
530#[cfg(test)]
531mod tests {
532 use super::*;
533
534 #[tokio::test]
535 #[ignore] async fn test_benchmark_small_file() {
537 let use_case = BenchmarkSystemUseCase::new();
538 let result = use_case.execute(None, 1, 1).await; assert!(result.is_ok());
540 }
541}