import json
import os
import re
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Any
import argparse
import sys
try:
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import numpy as np
import pandas as pd
except ImportError as e:
print(f"Error: Missing required dependency: {e}")
print("Please install required packages:")
print("pip install matplotlib numpy pandas")
sys.exit(1)
class BenchmarkData:
def __init__(self, benchmark_name: str, channel_impl: str, parameter: str,
throughput_elements: int, mean_time_ns: float, median_time_ns: float):
self.benchmark_name = benchmark_name
self.channel_impl = channel_impl
self.parameter = parameter
self.throughput_elements = throughput_elements
self.mean_time_ns = mean_time_ns
self.median_time_ns = median_time_ns
@property
def throughput_per_second(self) -> float:
time_seconds = self.mean_time_ns / 1_000_000_000
return self.throughput_elements / time_seconds
@property
def median_throughput_per_second(self) -> float:
time_seconds = self.median_time_ns / 1_000_000_000
return self.throughput_elements / time_seconds
class CriterionParser:
def __init__(self, criterion_dir: Path):
self.criterion_dir = Path(criterion_dir)
self.benchmarks: List[BenchmarkData] = []
def parse_all_benchmarks(self) -> List[BenchmarkData]:
print(f"Parsing benchmarks from: {self.criterion_dir}")
benchmark_dirs = [d for d in self.criterion_dir.iterdir()
if d.is_dir() and not d.name == 'report']
print(f"Found {len(benchmark_dirs)} benchmark directories")
for benchmark_dir in benchmark_dirs:
self._parse_benchmark_directory(benchmark_dir)
print(f"Parsed {len(self.benchmarks)} benchmark results")
return self.benchmarks
def _parse_benchmark_directory(self, benchmark_dir: Path):
dir_name = benchmark_dir.name
parts = dir_name.split('_')
if len(parts) < 2:
print(f"Warning: Skipping directory with unexpected name format: {dir_name}")
return
channel_impl = parts[-1]
benchmark_name = '_'.join(parts[:-1])
self._parse_benchmark_results(benchmark_dir, benchmark_name, channel_impl)
def _parse_benchmark_results(self, benchmark_dir: Path, benchmark_name: str, channel_impl: str):
subdirs = [d for d in benchmark_dir.iterdir() if d.is_dir() and d.name != 'report']
if not subdirs:
return
for subdir in subdirs:
parameter = subdir.name
nested_dirs = [d for d in subdir.iterdir() if d.is_dir() and d.name != 'report']
if nested_dirs:
for nested_dir in nested_dirs:
nested_param = nested_dir.name
full_parameter = f"{parameter}/{nested_param}"
self._parse_single_benchmark(nested_dir, benchmark_name, channel_impl, full_parameter)
else:
self._parse_single_benchmark(subdir, benchmark_name, channel_impl, parameter)
def _parse_single_benchmark(self, result_dir: Path, benchmark_name: str,
channel_impl: str, parameter: str):
new_dir = result_dir / 'new'
if not new_dir.exists():
new_dir = result_dir / 'base'
if not new_dir.exists():
return
benchmark_json = new_dir / 'benchmark.json'
estimates_json = new_dir / 'estimates.json'
if not (benchmark_json.exists() and estimates_json.exists()):
return
try:
with open(benchmark_json, 'r') as f:
benchmark_data = json.load(f)
with open(estimates_json, 'r') as f:
estimates_data = json.load(f)
throughput_info = benchmark_data.get('throughput', {})
if 'Elements' not in throughput_info:
return
throughput_elements = throughput_info['Elements']
mean_time_ns = estimates_data['mean']['point_estimate']
median_time_ns = estimates_data['median']['point_estimate']
benchmark = BenchmarkData(
benchmark_name=benchmark_name,
channel_impl=channel_impl,
parameter=parameter,
throughput_elements=throughput_elements,
mean_time_ns=mean_time_ns,
median_time_ns=median_time_ns
)
self.benchmarks.append(benchmark)
except (json.JSONDecodeError, KeyError) as e:
print(f"Warning: Failed to parse {result_dir}: {e}")
def _get_output_path(output_filename: Optional[str], output_dir: Optional[Path]) -> Optional[str]:
if not output_filename:
return None
if output_dir:
return str(output_dir / output_filename)
else:
return output_filename
class BenchmarkVisualizer:
def __init__(self, benchmarks: List[BenchmarkData]):
self.benchmarks = benchmarks
self.df = self._create_dataframe()
def _create_dataframe(self) -> pd.DataFrame:
data = []
for benchmark in self.benchmarks:
data.append({
'benchmark_name': benchmark.benchmark_name,
'channel_impl': benchmark.channel_impl,
'parameter': benchmark.parameter,
'throughput_elements': benchmark.throughput_elements,
'mean_time_ns': benchmark.mean_time_ns,
'median_time_ns': benchmark.median_time_ns,
'throughput_per_second': benchmark.throughput_per_second,
'median_throughput_per_second': benchmark.median_throughput_per_second
})
return pd.DataFrame(data)
def get_available_benchmarks(self) -> List[str]:
return sorted(self.df['benchmark_name'].unique())
def get_available_channels(self) -> List[str]:
return sorted(self.df['channel_impl'].unique())
def plot_benchmark_comparison(self, benchmark_name: str, output_file: Optional[str] = None,
use_median: bool = False, chart_type: str = 'bar',
silent: bool = False):
if chart_type not in ['bar', 'line']:
raise ValueError("chart_type must be 'bar' or 'line'")
benchmark_data = self.df[self.df['benchmark_name'] == benchmark_name]
if benchmark_data.empty:
print(f"No data found for benchmark: {benchmark_name}")
return
throughput_col = 'median_throughput_per_second' if use_median else 'throughput_per_second'
fig, ax = plt.subplots(figsize=(12, 8))
parameters = sorted(benchmark_data['parameter'].unique(), key=self._sort_parameter_key)
channels = sorted(benchmark_data['channel_impl'].unique())
colors = plt.cm.Set3(np.linspace(0, 1, len(channels)))
channel_colors = dict(zip(channels, colors))
if chart_type == 'bar':
self._plot_bar_chart(ax, benchmark_data, parameters, channels, channel_colors, throughput_col)
else: self._plot_line_chart(ax, benchmark_data, parameters, channels, channel_colors, throughput_col)
ax.set_xlabel('Parameter')
ax.set_ylabel('Throughput (elements/second)')
chart_type_title = 'Bar Chart' if chart_type == 'bar' else 'Line Chart'
ax.set_title(f'Throughput Comparison: {benchmark_name.replace("_", " ").title()}\n'
f'({chart_type_title}, {"Median" if use_median else "Mean"} times)')
ax.tick_params(axis='x', rotation=45)
ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
ax.grid(True, alpha=0.3)
max_throughput = benchmark_data[throughput_col].max()
min_throughput = benchmark_data[benchmark_data[throughput_col] > 0][throughput_col].min()
if max_throughput / min_throughput > 100:
ax.set_yscale('log')
plt.tight_layout()
if output_file:
plt.savefig(output_file, dpi=300, bbox_inches='tight')
if not silent:
print(f"Saved plot to: {output_file}")
else:
plt.show()
def _plot_bar_chart(self, ax, benchmark_data, parameters, channels, channel_colors, throughput_col):
x_positions = np.arange(len(parameters))
bar_width = 0.8 / len(channels)
for i, channel in enumerate(channels):
channel_data = benchmark_data[benchmark_data['channel_impl'] == channel]
throughputs = []
for param in parameters:
param_data = channel_data[channel_data['parameter'] == param]
if not param_data.empty:
throughputs.append(param_data[throughput_col].iloc[0])
else:
throughputs.append(0)
x_pos = x_positions + i * bar_width - (len(channels) - 1) * bar_width / 2
bars = ax.bar(x_pos, throughputs, bar_width,
label=channel, color=channel_colors[channel], alpha=0.8)
for bar, throughput in zip(bars, throughputs):
if throughput > 0:
height = bar.get_height()
ax.text(bar.get_x() + bar.get_width()/2., height,
f'{throughput:.0f}', ha='center', va='bottom',
fontsize=8, rotation=90)
def _plot_line_chart(self, ax, benchmark_data, parameters, channels, channel_colors, throughput_col):
x_positions = np.arange(len(parameters))
for channel in channels:
channel_data = benchmark_data[benchmark_data['channel_impl'] == channel]
throughputs = []
valid_x_positions = []
for i, param in enumerate(parameters):
param_data = channel_data[channel_data['parameter'] == param]
if not param_data.empty:
throughputs.append(param_data[throughput_col].iloc[0])
valid_x_positions.append(x_positions[i])
if throughputs: ax.plot(valid_x_positions, throughputs,
marker='o', linewidth=2, markersize=6,
label=channel, color=channel_colors[channel])
for x, throughput in zip(valid_x_positions, throughputs):
ax.annotate(f'{throughput:.0f}', (x, throughput),
textcoords="offset points", xytext=(0,10),
ha='center', fontsize=8)
ax.set_xticks(x_positions)
ax.set_xticklabels(parameters)
def plot_all_benchmarks_summary(self, output_file: Optional[str] = None, use_median: bool = False):
throughput_col = 'median_throughput_per_second' if use_median else 'throughput_per_second'
summary = self.df.groupby(['benchmark_name', 'channel_impl'])[throughput_col].mean().reset_index()
pivot_data = summary.pivot(index='benchmark_name', columns='channel_impl', values=throughput_col)
fig, ax = plt.subplots(figsize=(14, 8))
pivot_data.plot(kind='bar', ax=ax, width=0.8)
ax.set_xlabel('Benchmark')
ax.set_ylabel('Average Throughput (elements/second)')
ax.set_title(f'Average Throughput Comparison Across All Benchmarks\n'
f'({"Median" if use_median else "Mean"} times)')
ax.legend(title='Channel Implementation', bbox_to_anchor=(1.05, 1), loc='upper left')
ax.grid(True, alpha=0.3)
plt.setp(ax.get_xticklabels(), rotation=45, ha='right')
plt.tight_layout()
if output_file:
plt.savefig(output_file, dpi=300, bbox_inches='tight')
print(f"Saved summary plot to: {output_file}")
else:
plt.show()
def generate_all_benchmark_charts(self, output_prefix: Optional[str] = None,
use_median: bool = False, chart_type: str = 'bar',
output_dir: Optional[Path] = None):
benchmarks = self.get_available_benchmarks()
print(f"\nGenerating {chart_type} charts for {len(benchmarks)} benchmarks...")
generated_files = []
for i, benchmark in enumerate(benchmarks, 1):
print(f"[{i}/{len(benchmarks)}] Processing: {benchmark}")
if output_prefix:
if output_prefix.endswith('.png'):
base_name = output_prefix[:-4]
filename = f"{base_name}_{benchmark}_{chart_type}.png"
else:
filename = f"{output_prefix}_{benchmark}_{chart_type}.png"
else:
filename = f"{benchmark}_{chart_type}_chart.png"
if output_dir:
output_file = str(output_dir / filename)
else:
output_file = filename
try:
self.plot_benchmark_comparison(
benchmark_name=benchmark,
output_file=output_file,
use_median=use_median,
chart_type=chart_type,
silent=True
)
generated_files.append(output_file)
except Exception as e:
print(f" ⚠️ Error generating chart for {benchmark}: {e}")
continue
print(f"\n✅ Successfully generated {len(generated_files)} charts:")
for filename in generated_files:
print(f" - {filename}")
if len(generated_files) < len(benchmarks):
failed_count = len(benchmarks) - len(generated_files)
print(f"\n⚠️ {failed_count} charts failed to generate")
def _sort_parameter_key(self, param: str):
numbers = re.findall(r'\d+', param)
if numbers:
return (int(numbers[0]), param)
return (0, param)
def print_summary(self):
print("\n=== Benchmark Summary ===")
print(f"Total benchmark results: {len(self.benchmarks)}")
print(f"Unique benchmarks: {len(self.get_available_benchmarks())}")
print(f"Channel implementations: {', '.join(self.get_available_channels())}")
print("\nAvailable benchmarks:")
for benchmark in self.get_available_benchmarks():
count = len(self.df[self.df['benchmark_name'] == benchmark])
print(f" - {benchmark}: {count} results")
def main():
parser = argparse.ArgumentParser(description='Analyze Criterion benchmark results')
parser.add_argument('criterion_dir', nargs='?',
default='target/criterion',
help='Path to criterion results directory (default: target/criterion)')
parser.add_argument('--benchmark', '-b',
help='Specific benchmark to plot (if not provided, shows summary)')
parser.add_argument('--output', '-o',
help='Output file for the plot (if not provided, displays plot)')
parser.add_argument('--list', '-l', action='store_true',
help='List available benchmarks and exit')
parser.add_argument('--median', action='store_true',
help='Use median times instead of mean times')
parser.add_argument('--summary', '-s', action='store_true',
help='Generate summary plot of all benchmarks')
parser.add_argument('--chart-type', '-t', choices=['bar', 'line'], default='bar',
help='Chart type: bar or line (default: bar)')
parser.add_argument('--all', '-a', action='store_true',
help='Generate charts for all available benchmarks')
parser.add_argument('--output-dir', '-d',
help='Directory to save charts (created if it doesn\'t exist)')
args = parser.parse_args()
criterion_dir = Path(args.criterion_dir)
if not criterion_dir.exists():
print(f"Error: Criterion directory not found: {criterion_dir}")
print("Make sure you've run benchmarks with: cargo bench")
sys.exit(1)
parser = CriterionParser(criterion_dir)
benchmarks = parser.parse_all_benchmarks()
if not benchmarks:
print("No benchmark data found!")
sys.exit(1)
visualizer = BenchmarkVisualizer(benchmarks)
visualizer.print_summary()
if args.list:
print("\nAvailable benchmarks:")
for benchmark in visualizer.get_available_benchmarks():
print(f" - {benchmark}")
return
output_dir = None
if args.output_dir:
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
print(f"Output directory: {output_dir}")
if args.summary:
output_file = _get_output_path(args.output, output_dir)
visualizer.plot_all_benchmarks_summary(output_file, args.median)
elif args.all:
visualizer.generate_all_benchmark_charts(args.output, args.median, args.chart_type, output_dir)
elif args.benchmark:
if args.benchmark not in visualizer.get_available_benchmarks():
print(f"Error: Benchmark '{args.benchmark}' not found")
print("Available benchmarks:")
for benchmark in visualizer.get_available_benchmarks():
print(f" - {benchmark}")
sys.exit(1)
output_file = _get_output_path(args.output, output_dir)
visualizer.plot_benchmark_comparison(args.benchmark, output_file, args.median, args.chart_type)
else:
output_file = _get_output_path(args.output, output_dir)
visualizer.plot_all_benchmarks_summary(output_file, args.median)
if __name__ == '__main__':
main()