from __future__ import annotations
import json
import random
import shutil
import statistics
import time
from pathlib import Path
N_SAMPLES: int = 100 N_BOOTSTRAP: int = 1_000 CONFIDENCE_LEVEL: float = 0.95
_RNG_SEED: int = 0x6372_6974_6572_696F_6E
def measure(
fn,
warmup_s: float,
measure_s: float,
n_samples: int = N_SAMPLES,
) -> tuple[float, list[float], list[float]]:
warmup_count: int = 0
t0 = time.perf_counter()
warmup_end = t0 + warmup_s
while time.perf_counter() < warmup_end:
fn()
warmup_count += 1
actual_warmup_s = time.perf_counter() - t0
warmup_rate = warmup_count / actual_warmup_s sum_factors = n_samples * (n_samples + 1) / 2.0 base_iters = max(1, int(warmup_rate * measure_s / sum_factors))
iters: list[float] = []
times_ns: list[float] = []
for i in range(1, n_samples + 1):
n = i * base_iters
t_start = time.perf_counter()
for _ in range(n):
fn()
times_ns.append((time.perf_counter() - t_start) * 1e9)
iters.append(float(n))
total_iters = sum(int(x) for x in iters)
avg_us = sum(times_ns) / total_iters / 1_000.0
return avg_us, iters, times_ns
def save_criterion_files(
output_dir: Path,
group_id: str,
function_id: str,
value_str: str,
iters: list[float],
times_ns: list[float],
n_bootstrap: int = N_BOOTSTRAP,
confidence_level: float = CONFIDENCE_LEVEL,
) -> None:
full_id = f"{group_id}/{function_id}/{value_str}"
bench_root = output_dir / full_id
new_dir = bench_root / "new"
base_dir = bench_root / "base"
if new_dir.exists():
if base_dir.exists():
shutil.rmtree(base_dir)
new_dir.rename(base_dir)
new_dir.mkdir(parents=True, exist_ok=True)
(new_dir / "benchmark.json").write_text(
json.dumps({
"group_id": group_id,
"function_id": function_id,
"value_str": value_str,
"throughput": None,
"full_id": full_id,
"directory_name": full_id,
"title": full_id,
}),
encoding="utf-8",
)
(new_dir / "sample.json").write_text(
json.dumps({"sampling_mode": "Linear", "iters": iters, "times": times_ns}),
encoding="utf-8",
)
(new_dir / "estimates.json").write_text(
json.dumps(_compute_estimates(iters, times_ns, n_bootstrap, confidence_level)),
encoding="utf-8",
)
per_iter = [t / n for t, n in zip(times_ns, iters)]
(new_dir / "tukey.json").write_text(
json.dumps(_tukey_fences(per_iter)), encoding="utf-8"
)
def _percentile(sorted_data: list[float], p: float) -> float:
n = len(sorted_data)
if n == 1:
return sorted_data[0]
h = (n - 1) * p / 100.0
lo = int(h)
hi = min(lo + 1, n - 1)
return sorted_data[lo] + (h - lo) * (sorted_data[hi] - sorted_data[lo])
def _mad_normalized(data: list[float]) -> float:
med = statistics.median(data)
return statistics.median([abs(x - med) for x in data]) * 1.4826
def _slope_through_origin(iters: list[float], times_ns: list[float]) -> float:
num = sum(n * t for n, t in zip(iters, times_ns))
den = sum(n * n for n in iters)
return num / den if den else 0.0
def _bootstrap_ci(
data: list[float],
stat_fn,
n_resamples: int,
confidence_level: float,
rng: random.Random,
) -> tuple[float, float, float, float]:
n = len(data)
point = stat_fn(data)
boots = sorted(
stat_fn([data[rng.randrange(n)] for _ in range(n)])
for _ in range(n_resamples)
)
alpha = (1.0 - confidence_level) / 2.0
lo = boots[max(0, int(alpha * n_resamples))]
hi = boots[min(int((1.0 - alpha) * n_resamples), n_resamples - 1)]
return point, lo, hi, statistics.stdev(boots)
def _bootstrap_ci_slope(
iters: list[float],
times_ns: list[float],
n_resamples: int,
confidence_level: float,
rng: random.Random,
) -> tuple[float, float, float, float]:
n = len(iters)
point = _slope_through_origin(iters, times_ns)
boots = []
for _ in range(n_resamples):
idx = [rng.randrange(n) for _ in range(n)]
boots.append(_slope_through_origin(
[iters[i] for i in idx],
[times_ns[i] for i in idx],
))
boots.sort()
alpha = (1.0 - confidence_level) / 2.0
lo = boots[max(0, int(alpha * n_resamples))]
hi = boots[min(int((1.0 - alpha) * n_resamples), n_resamples - 1)]
return point, lo, hi, statistics.stdev(boots)
def _estimate_entry(
point: float,
lo: float,
hi: float,
stderr: float,
confidence_level: float,
) -> dict:
return {
"confidence_interval": {
"confidence_level": confidence_level,
"lower_bound": lo,
"upper_bound": hi,
},
"point_estimate": point,
"standard_error": stderr,
}
def _compute_estimates(
iters: list[float],
times_ns: list[float],
n_bootstrap: int,
confidence_level: float,
) -> dict:
per_iter = [t / n for t, n in zip(times_ns, iters)]
rng = random.Random(_RNG_SEED)
def _ci(stat_fn):
return _bootstrap_ci(per_iter, stat_fn, n_bootstrap, confidence_level, rng)
mean_pt, mean_lo, mean_hi, mean_se = _ci(statistics.mean)
med_pt, med_lo, med_hi, med_se = _ci(statistics.median)
mad_pt, mad_lo, mad_hi, mad_se = _ci(_mad_normalized)
std_pt, std_lo, std_hi, std_se = _ci(statistics.stdev)
sl_pt, sl_lo, sl_hi, sl_se = _bootstrap_ci_slope(
iters, times_ns, n_bootstrap, confidence_level, rng
)
e = _estimate_entry
cl = confidence_level
return {
"mean": e(mean_pt, mean_lo, mean_hi, mean_se, cl),
"median": e(med_pt, med_lo, med_hi, med_se, cl),
"median_abs_dev": e(mad_pt, mad_lo, mad_hi, mad_se, cl),
"slope": e(sl_pt, sl_lo, sl_hi, sl_se, cl),
"std_dev": e(std_pt, std_lo, std_hi, std_se, cl),
}
def _tukey_fences(per_iter_ns: list[float]) -> list[float]:
s = sorted(per_iter_ns)
q1 = _percentile(s, 25.0)
q3 = _percentile(s, 75.0)
iqr = q3 - q1
return [q1 - 1.5 * iqr, q1, q3, q3 + 1.5 * iqr]