adaptive_pipeline/application/use_cases/
benchmark_system.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Benchmark System Use Case
9//!
10//! This module implements comprehensive pipeline performance benchmarking.
11//! It tests various chunk sizes, worker counts, and file sizes to identify
12//! optimal configurations.
13//!
14//! ## Overview
15//!
16//! The Benchmark System use case provides:
17//!
18//! - **Performance Testing**: Measure throughput and processing time
19//! - **Configuration Optimization**: Test different chunk/worker combinations
20//! - **Adaptive Validation**: Compare adaptive settings against alternatives
21//! - **Report Generation**: Create detailed markdown reports
22//! - **Multiple File Sizes**: Test scalability across different file sizes
23//!
24//! ## Test Matrix
25//!
26//! - **File Sizes**: 1MB, 5MB, 10MB, 50MB, 100MB, 500MB, 1GB, 2GB
27//! - **Chunk Sizes**: 1MB, 2MB, 4MB, 8MB, 16MB, 32MB, 64MB, 128MB
28//! - **Worker Counts**: 1 to (2 Ɨ CPU cores), max 16
29//! - **Iterations**: Configurable (default: 3)
30
31use anyhow::Result;
32use std::path::PathBuf;
33use std::sync::Arc;
34use std::time::Instant;
35use tracing::{info, warn};
36
37use crate::infrastructure::metrics::MetricsService;
38use adaptive_pipeline_domain::value_objects::chunk_size::ChunkSize;
39use adaptive_pipeline_domain::value_objects::worker_count::WorkerCount;
40
41/// Benchmark result for a single configuration.
42#[derive(Debug, Clone)]
43struct BenchmarkResult {
44    file_size_mb: usize,
45    chunk_size_mb: usize,
46    worker_count: usize,
47    avg_throughput_mbps: f64,
48    avg_duration_secs: f64,
49    config_type: String,
50}
51
52/// Single test iteration result.
53#[derive(Debug)]
54struct TestResult {
55    avg_throughput_mbps: f64,
56    avg_duration_secs: f64,
57}
58
59/// Use case for benchmarking pipeline performance.
60///
61/// This use case performs comprehensive performance testing across multiple
62/// configurations to identify optimal settings for different file sizes.
63pub struct BenchmarkSystemUseCase;
64
65impl BenchmarkSystemUseCase {
66    /// Creates a new Benchmark System use case.
67    pub fn new() -> Self {
68        Self
69    }
70
71    /// Executes the benchmark system use case.
72    ///
73    /// Runs comprehensive benchmarks to test pipeline performance across
74    /// various configurations, comparing adaptive settings against
75    /// alternatives.
76    ///
77    /// ## Parameters
78    ///
79    /// * `file` - Optional existing file to use (otherwise generates test
80    ///   files)
81    /// * `size_mb` - Specific file size to test (0 = test all default sizes)
82    /// * `iterations` - Number of iterations per configuration (default: 3)
83    ///
84    /// ## Test Configurations
85    ///
86    /// For each file size, tests:
87    /// 1. **Adaptive Configuration**: Recommended chunk/worker settings
88    /// 2. **Chunk Variations**: Different chunk sizes with adaptive workers
89    /// 3. **Worker Variations**: Different worker counts with adaptive chunk
90    ///    size
91    ///
92    /// ## Output
93    ///
94    /// Generates `pipeline_optimization_report.md` containing:
95    /// - Performance comparison tables
96    /// - Adaptive vs best configuration analysis
97    /// - Detailed results for all tested configurations
98    /// - Summary recommendations for each file size
99    ///
100    /// ## Returns
101    ///
102    /// - `Ok(())` - Benchmark completed successfully
103    /// - `Err(anyhow::Error)` - Benchmark failed
104    pub async fn execute(&self, file: Option<PathBuf>, size_mb: usize, iterations: usize) -> Result<()> {
105        info!("Running comprehensive pipeline optimization benchmark");
106        info!("Test size: {}MB", size_mb);
107        info!("Iterations: {}", iterations);
108
109        // Create metrics service for benchmarking
110        let metrics_service = Arc::new(MetricsService::new()?);
111
112        // Test file sizes in MB
113        let test_sizes = if size_mb > 0 {
114            vec![size_mb]
115        } else {
116            vec![1, 5, 10, 50, 100, 500, 1000, 2048] // Default sizes up to 2GB
117        };
118
119        // Chunk sizes to test (in MB)
120        let chunk_sizes = vec![1, 2, 4, 8, 16, 32, 64, 128];
121
122        // Worker counts to test
123        let available_cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(4);
124        let max_workers = (available_cores * 2).min(16);
125        let worker_counts: Vec<usize> = (1..=max_workers).collect();
126
127        println!(
128            "\n========================================================================================================================"
129        );
130        println!(
131            "========================================== PIPELINE OPTIMIZATION BENCHMARK \
132             ==========================================="
133        );
134        println!(
135            "========================================================================================================================"
136        );
137        println!("System Info:        {} CPU cores available", available_cores);
138        println!("Test Iterations:    {}", iterations);
139        println!("File Sizes:         {:?} MB", test_sizes);
140        println!("Chunk Sizes:        {:?} MB", chunk_sizes);
141        println!("Worker Counts:      {:?}", worker_counts);
142        println!(
143            "========================================================================================================================"
144        );
145
146        let mut results = Vec::new();
147
148        for &test_size_mb in &test_sizes {
149            println!("\nšŸ” Testing file size: {} MB", test_size_mb);
150
151            // Create or use test file
152            let test_file = if let Some(ref provided_file) = file {
153                provided_file.clone()
154            } else {
155                let test_file = PathBuf::from(format!("benchmark_test_{}mb.txt", test_size_mb));
156                Self::generate_test_file(&test_file, test_size_mb).await?;
157                test_file
158            };
159
160            // Get adaptive recommendations
161            let file_size_bytes = (test_size_mb * 1024 * 1024) as u64;
162            let adaptive_chunk = ChunkSize::optimal_for_file_size(file_size_bytes);
163            let adaptive_workers = WorkerCount::optimal_for_file_size(file_size_bytes);
164
165            println!(
166                "   Adaptive recommendations: {} chunk, {} workers",
167                adaptive_chunk.megabytes(),
168                adaptive_workers.count()
169            );
170
171            // Test adaptive configuration first
172            println!("   Testing adaptive configuration...");
173            let adaptive_chunk_mb = ((adaptive_chunk.bytes() as f64) / (1024.0 * 1024.0)).max(1.0) as usize;
174            let adaptive_result = Self::run_benchmark_test(
175                &test_file,
176                test_size_mb,
177                Some(adaptive_chunk_mb),
178                Some(adaptive_workers.count()),
179                iterations,
180                &metrics_service,
181            )
182            .await?;
183
184            results.push(BenchmarkResult {
185                file_size_mb: test_size_mb,
186                chunk_size_mb: adaptive_chunk_mb,
187                worker_count: adaptive_workers.count(),
188                avg_throughput_mbps: adaptive_result.avg_throughput_mbps,
189                avg_duration_secs: adaptive_result.avg_duration_secs,
190                config_type: "Adaptive".to_string(),
191            });
192
193            // Test variations around adaptive values
194            println!("   Testing variations around adaptive values...");
195
196            // Test different chunk sizes with adaptive worker count
197            for &chunk_mb in &chunk_sizes {
198                if chunk_mb == (adaptive_chunk.megabytes() as usize) {
199                    continue; // Skip adaptive (already tested)
200                }
201
202                let result = Self::run_benchmark_test(
203                    &test_file,
204                    test_size_mb,
205                    Some(chunk_mb),
206                    Some(adaptive_workers.count()),
207                    iterations,
208                    &metrics_service,
209                )
210                .await?;
211
212                results.push(BenchmarkResult {
213                    file_size_mb: test_size_mb,
214                    chunk_size_mb: chunk_mb,
215                    worker_count: adaptive_workers.count(),
216                    avg_throughput_mbps: result.avg_throughput_mbps,
217                    avg_duration_secs: result.avg_duration_secs,
218                    config_type: "Chunk Variation".to_string(),
219                });
220            }
221
222            // Test different worker counts with adaptive chunk size
223            for &workers in &worker_counts {
224                if workers == adaptive_workers.count() {
225                    continue; // Skip adaptive (already tested)
226                }
227
228                let result = Self::run_benchmark_test(
229                    &test_file,
230                    test_size_mb,
231                    Some(adaptive_chunk_mb),
232                    Some(workers),
233                    iterations,
234                    &metrics_service,
235                )
236                .await?;
237
238                results.push(BenchmarkResult {
239                    file_size_mb: test_size_mb,
240                    chunk_size_mb: adaptive_chunk_mb,
241                    worker_count: workers,
242                    avg_throughput_mbps: result.avg_throughput_mbps,
243                    avg_duration_secs: result.avg_duration_secs,
244                    config_type: "Worker Variation".to_string(),
245                });
246            }
247
248            // Clean up generated test file
249            if file.is_none() && test_file.exists() {
250                std::fs::remove_file(&test_file)?;
251            }
252        }
253
254        // Generate comprehensive report
255        Self::generate_optimization_report(&results).await?;
256
257        println!("\nāœ… Benchmark completed successfully!");
258        println!("šŸ“Š Check the generated optimization report for detailed results.");
259
260        Ok(())
261    }
262
263    /// Simulates pipeline processing for benchmarking.
264    async fn simulate_pipeline_processing(
265        input_file: &PathBuf,
266        output_file: &PathBuf,
267        chunk_size_mb: usize,
268        worker_count: usize,
269    ) -> Result<()> {
270        use std::io::{Read, Write};
271        use tokio::task;
272
273        let chunk_size_bytes = chunk_size_mb * 1024 * 1024;
274        let mut input = std::fs::File::open(input_file)?;
275        let mut output = std::fs::File::create(output_file)?;
276
277        // Read file in chunks
278        let mut buffer = vec![0u8; chunk_size_bytes];
279        let mut chunks = Vec::new();
280
281        loop {
282            let bytes_read = input.read(&mut buffer)?;
283            if bytes_read == 0 {
284                break;
285            }
286            chunks.push(buffer[..bytes_read].to_vec());
287        }
288
289        // Process chunks with simulated concurrency
290        let chunk_count = chunks.len();
291        let chunks_per_worker = chunk_count.div_ceil(worker_count);
292
293        let mut handles = Vec::new();
294        for worker_id in 0..worker_count {
295            let start_idx = worker_id * chunks_per_worker;
296            let end_idx = ((worker_id + 1) * chunks_per_worker).min(chunk_count);
297
298            if start_idx < chunk_count {
299                let worker_chunks = chunks[start_idx..end_idx].to_vec();
300                let handle = task::spawn(async move {
301                    // Simulate processing work
302                    for chunk in &worker_chunks {
303                        // Simple processing simulation: XOR each byte
304                        let _processed: Vec<u8> = chunk.iter().map(|&b| b ^ 0x42).collect();
305                        // Small delay to simulate work
306                        tokio::time::sleep(std::time::Duration::from_micros(1)).await;
307                    }
308                    worker_chunks
309                });
310                handles.push(handle);
311            }
312        }
313
314        // Collect results and write to output
315        for handle in handles {
316            let processed_chunks = handle.await.map_err(|e| anyhow::anyhow!("Worker task failed: {}", e))?;
317            for chunk in processed_chunks {
318                output.write_all(&chunk)?;
319            }
320        }
321
322        output.flush()?;
323        Ok(())
324    }
325
326    /// Generates a test file of specified size.
327    async fn generate_test_file(path: &PathBuf, size_mb: usize) -> Result<()> {
328        use std::io::Write;
329
330        let mut file = std::fs::File::create(path)?;
331        let chunk_size = 1024 * 1024; // 1MB chunks
332        let data = vec![b'A'; chunk_size];
333
334        for _ in 0..size_mb {
335            file.write_all(&data)?;
336        }
337
338        file.flush()?;
339        Ok(())
340    }
341
342    /// Runs a single benchmark test configuration.
343    async fn run_benchmark_test(
344        test_file: &PathBuf,
345        _file_size_mb: usize,
346        chunk_size_mb: Option<usize>,
347        worker_count: Option<usize>,
348        iterations: usize,
349        _metrics_service: &Arc<MetricsService>,
350    ) -> Result<TestResult> {
351        let mut durations = Vec::new();
352        let mut throughputs = Vec::new();
353
354        for i in 0..iterations {
355            let output_file = PathBuf::from(format!("benchmark_output_{}_{}.adapipe", std::process::id(), i));
356
357            let start_time = Instant::now();
358
359            let result = Self::simulate_pipeline_processing(
360                test_file,
361                &output_file,
362                chunk_size_mb.unwrap_or(1),
363                worker_count.unwrap_or(1),
364            )
365            .await;
366
367            let duration = start_time.elapsed();
368
369            // Clean up output file
370            if output_file.exists() {
371                std::fs::remove_file(&output_file)?;
372            }
373
374            match result {
375                Ok(_) => {
376                    let duration_secs = duration.as_secs_f64();
377                    let file_size_bytes = std::fs::metadata(test_file)?.len();
378                    let throughput_mbps = (file_size_bytes as f64) / (1024.0 * 1024.0) / duration_secs;
379
380                    durations.push(duration_secs);
381                    throughputs.push(throughput_mbps);
382                }
383                Err(e) => {
384                    warn!("Benchmark iteration {} failed: {}", i, e);
385                    durations.push(999.0);
386                    throughputs.push(0.0);
387                }
388            }
389        }
390
391        let avg_duration = durations.iter().sum::<f64>() / (durations.len() as f64);
392        let avg_throughput = throughputs.iter().sum::<f64>() / (throughputs.len() as f64);
393
394        Ok(TestResult {
395            avg_throughput_mbps: avg_throughput,
396            avg_duration_secs: avg_duration,
397        })
398    }
399
400    /// Generates comprehensive optimization report in markdown format.
401    async fn generate_optimization_report(results: &[BenchmarkResult]) -> Result<()> {
402        let report_file = PathBuf::from("pipeline_optimization_report.md");
403        let mut report = String::new();
404
405        report.push_str("# Pipeline Optimization Benchmark Report\n\n");
406        report.push_str(&format!(
407            "Generated: {}\n\n",
408            chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC")
409        ));
410
411        // Group results by file size
412        let mut file_sizes: Vec<usize> = results.iter().map(|r| r.file_size_mb).collect();
413        file_sizes.sort_unstable();
414        file_sizes.dedup();
415
416        for file_size in &file_sizes {
417            report.push_str(&format!("## File Size: {} MB\n\n", file_size));
418
419            let size_results: Vec<_> = results.iter().filter(|r| r.file_size_mb == *file_size).collect();
420
421            // Find best configuration
422            let best_result = size_results
423                .iter()
424                .max_by(|a, b| {
425                    a.avg_throughput_mbps
426                        .partial_cmp(&b.avg_throughput_mbps)
427                        .unwrap_or(std::cmp::Ordering::Equal)
428                })
429                .ok_or_else(|| anyhow::anyhow!("No benchmark results found"))?;
430
431            let adaptive_result = size_results
432                .iter()
433                .find(|r| r.config_type == "Adaptive")
434                .ok_or_else(|| anyhow::anyhow!("No adaptive results found"))?;
435
436            report.push_str("**Adaptive Configuration:**\n");
437            report.push_str(&format!("- Chunk Size: {} MB\n", adaptive_result.chunk_size_mb));
438            report.push_str(&format!("- Worker Count: {}\n", adaptive_result.worker_count));
439            report.push_str(&format!(
440                "- Throughput: {:.2} MB/s\n",
441                adaptive_result.avg_throughput_mbps
442            ));
443            report.push_str(&format!(
444                "- Duration: {:.2} seconds\n\n",
445                adaptive_result.avg_duration_secs
446            ));
447
448            report.push_str("**Best Configuration:**\n");
449            report.push_str(&format!("- Chunk Size: {} MB\n", best_result.chunk_size_mb));
450            report.push_str(&format!("- Worker Count: {}\n", best_result.worker_count));
451            report.push_str(&format!("- Throughput: {:.2} MB/s\n", best_result.avg_throughput_mbps));
452            report.push_str(&format!("- Duration: {:.2} seconds\n", best_result.avg_duration_secs));
453            report.push_str(&format!("- Configuration Type: {}\n\n", best_result.config_type));
454
455            let improvement = ((best_result.avg_throughput_mbps - adaptive_result.avg_throughput_mbps)
456                / adaptive_result.avg_throughput_mbps)
457                * 100.0;
458
459            if improvement > 0.0 {
460                report.push_str(&format!(
461                    "**Performance Improvement:** {:.1}% faster than adaptive\n\n",
462                    improvement
463                ));
464            } else {
465                report.push_str("**Performance:** Adaptive configuration is optimal\n\n");
466            }
467
468            // Detailed results table
469            report.push_str("### Detailed Results\n\n");
470            report.push_str("| Chunk Size (MB) | Workers | Throughput (MB/s) | Duration (s) | Config Type |\n");
471            report.push_str("|-----------------|---------|-------------------|--------------|-------------|\n");
472
473            let mut sorted_results = size_results.clone();
474            sorted_results.sort_by(|a, b| {
475                b.avg_throughput_mbps
476                    .partial_cmp(&a.avg_throughput_mbps)
477                    .unwrap_or(std::cmp::Ordering::Equal)
478            });
479
480            for result in sorted_results {
481                report.push_str(&format!(
482                    "| {} | {} | {:.2} | {:.2} | {} |\n",
483                    result.chunk_size_mb,
484                    result.worker_count,
485                    result.avg_throughput_mbps,
486                    result.avg_duration_secs,
487                    result.config_type
488                ));
489            }
490
491            report.push('\n');
492        }
493
494        // Summary recommendations
495        report.push_str("## Summary Recommendations\n\n");
496
497        for file_size in &file_sizes {
498            let size_results: Vec<_> = results.iter().filter(|r| r.file_size_mb == *file_size).collect();
499
500            let best_result = size_results
501                .iter()
502                .max_by(|a, b| {
503                    a.avg_throughput_mbps
504                        .partial_cmp(&b.avg_throughput_mbps)
505                        .unwrap_or(std::cmp::Ordering::Equal)
506                })
507                .ok_or_else(|| anyhow::anyhow!("No benchmark results found"))?;
508
509            report.push_str(&format!(
510                "- **{} MB files**: {} MB chunks, {} workers ({:.2} MB/s)\n",
511                file_size, best_result.chunk_size_mb, best_result.worker_count, best_result.avg_throughput_mbps
512            ));
513        }
514
515        // Write report to file
516        std::fs::write(&report_file, report)?;
517
518        println!("\nšŸ“Š Optimization report generated: {}", report_file.display());
519
520        Ok(())
521    }
522}
523
524impl Default for BenchmarkSystemUseCase {
525    fn default() -> Self {
526        Self::new()
527    }
528}
529
530#[cfg(test)]
531mod tests {
532    use super::*;
533
534    #[tokio::test]
535    #[ignore] // Expensive benchmark test
536    async fn test_benchmark_small_file() {
537        let use_case = BenchmarkSystemUseCase::new();
538        let result = use_case.execute(None, 1, 1).await; // 1MB, 1 iteration
539        assert!(result.is_ok());
540    }
541}