synta 0.1.2

ASN.1 parser, decoder, and encoder library with DER/BER support and C FFI
Documentation
"""criterion_compat — Criterion-compatible benchmark sampling and JSON output.

Provides :func:`measure` for Criterion-compatible Linear-mode sampling and
:func:`save_criterion_files` to write the JSON artefacts expected by
cargo-criterion and criterion-compare.

Format summary
--------------
Criterion stores results under ``target/criterion/<group>/<function>/<value>/``:

  new/benchmark.json   — benchmark identity (group_id, function_id, value_str, full_id)
  new/sample.json      — raw samples: {"sampling_mode":"Linear","iters":[…],"times":[…]}
  new/estimates.json   — per-iteration statistics in **nanoseconds** with 95% CI:
                         mean, median, std_dev, median_abs_dev, slope
  new/tukey.json       — Tukey fences [Q1-1.5·IQR, Q1, Q3, Q3+1.5·IQR] in ns/iter

On each run an existing ``new/`` is rotated to ``base/`` before writing, so
criterion-compare can detect regressions across runs automatically.

Usage
-----
::

    from criterion_compat import measure, save_criterion_files
    from pathlib import Path

    avg_us, iters, times_ns = measure(fn, warmup_s=3.0, measure_s=5.0)
    print(f"avg: {avg_us:.2f} µs  ({sum(int(n) for n in iters)} iterations)")

    save_criterion_files(
        Path("target/criterion"),
        group_id="binding_comparison",
        function_id="synta",
        value_str="cert_00_GlobalSignRootCA",
        iters=iters,
        times_ns=times_ns,
    )

``value_str`` may contain forward slashes, which become extra path components
(e.g. ``"issuer/cert_00_name"`` → ``…/synta/issuer/cert_00_name/new/``).

Sampling details
----------------
:func:`measure` uses Criterion's **Linear** sampling mode: sample *i*
(1-indexed) runs ``i * base_iters`` iterations, giving 100 samples total.
``base_iters`` is calibrated from the warmup phase so that the total
measurement time is approximately ``measure_s`` seconds.

Statistics are computed from the 100 per-sample ratios ``time[i] / iters[i]``
(nanoseconds per iteration) using bootstrap resampling with 1000 resamples
and a 95% confidence level, matching Criterion's defaults.
"""

from __future__ import annotations

import json
import random
import shutil
import statistics
import time
from pathlib import Path

# ── Default configuration (matches Criterion defaults) ───────────────────────

N_SAMPLES: int = 100           # samples per benchmark in Linear mode
N_BOOTSTRAP: int = 1_000       # bootstrap resamples for CI computation
CONFIDENCE_LEVEL: float = 0.95  # 95% CI

# Fixed seed so the same sample data always produces the same CI bounds.
_RNG_SEED: int = 0x6372_6974_6572_696F_6E  # "criterion" in ASCII hex


# ── Public API ───────────────────────────────────────────────────────────────

def measure(
    fn,
    warmup_s: float,
    measure_s: float,
    n_samples: int = N_SAMPLES,
) -> tuple[float, list[float], list[float]]:
    """Criterion Linear-mode sampling benchmark.

    Run *fn* with a warmup phase, then collect *n_samples* timed samples whose
    iteration counts increase linearly (sample *i* runs ``i * base_iters``
    iterations).  ``base_iters`` is calibrated from the warmup rate so that
    total measurement wall-clock time ≈ *measure_s*.

    Returns ``(avg_us, iters, times_ns)`` where:

    - ``avg_us``   — mean per-iteration time in **microseconds** (for display)
    - ``iters``    — per-sample iteration counts as ``list[float]``
    - ``times_ns`` — per-sample wall-clock times in **nanoseconds** as ``list[float]``

    Pass *iters* and *times_ns* directly to :func:`save_criterion_files`.
    """
    # Warmup: discard results, estimate iteration rate.
    warmup_count: int = 0
    t0 = time.perf_counter()
    warmup_end = t0 + warmup_s
    while time.perf_counter() < warmup_end:
        fn()
        warmup_count += 1
    actual_warmup_s = time.perf_counter() - t0

    # Compute base_iters for Linear mode.
    # In Linear mode sample i (1-indexed) runs i * base_iters iterations, so
    # total iterations = base_iters * n_samples * (n_samples + 1) / 2.
    # Setting total_time ≈ measure_s:
    #   base_iters = rate * measure_s / (n_samples * (n_samples + 1) / 2)
    warmup_rate = warmup_count / actual_warmup_s  # iters / sec
    sum_factors = n_samples * (n_samples + 1) / 2.0  # = 5050 for n_samples=100
    base_iters = max(1, int(warmup_rate * measure_s / sum_factors))

    # Collect samples.
    iters: list[float] = []
    times_ns: list[float] = []
    for i in range(1, n_samples + 1):
        n = i * base_iters
        t_start = time.perf_counter()
        for _ in range(n):
            fn()
        times_ns.append((time.perf_counter() - t_start) * 1e9)
        iters.append(float(n))

    total_iters = sum(int(x) for x in iters)
    avg_us = sum(times_ns) / total_iters / 1_000.0
    return avg_us, iters, times_ns


def save_criterion_files(
    output_dir: Path,
    group_id: str,
    function_id: str,
    value_str: str,
    iters: list[float],
    times_ns: list[float],
    n_bootstrap: int = N_BOOTSTRAP,
    confidence_level: float = CONFIDENCE_LEVEL,
) -> None:
    """Write Criterion-compatible JSON files for one benchmark measurement.

    Creates ``{output_dir}/{group_id}/{function_id}/{value_str}/new/`` and
    writes ``benchmark.json``, ``sample.json``, ``estimates.json``, and
    ``tukey.json``.

    If ``new/`` already exists it is rotated to ``base/`` first (replacing any
    prior ``base/``), mirroring Criterion's rotation so that criterion-compare
    can detect regressions across runs automatically.
    """
    full_id = f"{group_id}/{function_id}/{value_str}"
    bench_root = output_dir / full_id
    new_dir = bench_root / "new"
    base_dir = bench_root / "base"

    # Rotate new/ → base/ before writing fresh results.
    if new_dir.exists():
        if base_dir.exists():
            shutil.rmtree(base_dir)
        new_dir.rename(base_dir)
    new_dir.mkdir(parents=True, exist_ok=True)

    # benchmark.json
    (new_dir / "benchmark.json").write_text(
        json.dumps({
            "group_id": group_id,
            "function_id": function_id,
            "value_str": value_str,
            "throughput": None,
            "full_id": full_id,
            "directory_name": full_id,
            "title": full_id,
        }),
        encoding="utf-8",
    )

    # sample.json
    (new_dir / "sample.json").write_text(
        json.dumps({"sampling_mode": "Linear", "iters": iters, "times": times_ns}),
        encoding="utf-8",
    )

    # estimates.json  (all values in nanoseconds per iteration)
    (new_dir / "estimates.json").write_text(
        json.dumps(_compute_estimates(iters, times_ns, n_bootstrap, confidence_level)),
        encoding="utf-8",
    )

    # tukey.json
    per_iter = [t / n for t, n in zip(times_ns, iters)]
    (new_dir / "tukey.json").write_text(
        json.dumps(_tukey_fences(per_iter)), encoding="utf-8"
    )


# ── Statistical helpers ──────────────────────────────────────────────────────

def _percentile(sorted_data: list[float], p: float) -> float:
    """Linear-interpolation percentile (Hyndman-Fan method 7 / Excel PERCENTILE).

    *p* is in [0, 100].  *sorted_data* must already be sorted ascending.
    """
    n = len(sorted_data)
    if n == 1:
        return sorted_data[0]
    h = (n - 1) * p / 100.0
    lo = int(h)
    hi = min(lo + 1, n - 1)
    return sorted_data[lo] + (h - lo) * (sorted_data[hi] - sorted_data[lo])


def _mad_normalized(data: list[float]) -> float:
    """Median absolute deviation scaled by 1.4826.

    The factor 1.4826 makes MAD a consistent estimator of the standard
    deviation for normally-distributed data, matching Criterion's
    ``median_abs_dev`` field.
    """
    med = statistics.median(data)
    return statistics.median([abs(x - med) for x in data]) * 1.4826


def _slope_through_origin(iters: list[float], times_ns: list[float]) -> float:
    """OLS slope through the origin (ns per iteration).

    Minimises ``Σ(t_i - slope·n_i)²``, yielding
    ``slope = Σ(n_i·t_i) / Σ(n_i²)``.
    """
    num = sum(n * t for n, t in zip(iters, times_ns))
    den = sum(n * n for n in iters)
    return num / den if den else 0.0


def _bootstrap_ci(
    data: list[float],
    stat_fn,
    n_resamples: int,
    confidence_level: float,
    rng: random.Random,
) -> tuple[float, float, float, float]:
    """Bootstrap percentile CI for a scalar statistic over *data*.

    Returns ``(point_estimate, lower_bound, upper_bound, standard_error)``.
    All four values are in the same units as *data* (ns per iteration).
    """
    n = len(data)
    point = stat_fn(data)
    boots = sorted(
        stat_fn([data[rng.randrange(n)] for _ in range(n)])
        for _ in range(n_resamples)
    )
    alpha = (1.0 - confidence_level) / 2.0
    lo = boots[max(0, int(alpha * n_resamples))]
    hi = boots[min(int((1.0 - alpha) * n_resamples), n_resamples - 1)]
    return point, lo, hi, statistics.stdev(boots)


def _bootstrap_ci_slope(
    iters: list[float],
    times_ns: list[float],
    n_resamples: int,
    confidence_level: float,
    rng: random.Random,
) -> tuple[float, float, float, float]:
    """Bootstrap percentile CI for the OLS slope using paired resampling.

    Returns ``(point_estimate, lower_bound, upper_bound, standard_error)``
    in nanoseconds per iteration.
    """
    n = len(iters)
    point = _slope_through_origin(iters, times_ns)
    boots = []
    for _ in range(n_resamples):
        idx = [rng.randrange(n) for _ in range(n)]
        boots.append(_slope_through_origin(
            [iters[i] for i in idx],
            [times_ns[i] for i in idx],
        ))
    boots.sort()
    alpha = (1.0 - confidence_level) / 2.0
    lo = boots[max(0, int(alpha * n_resamples))]
    hi = boots[min(int((1.0 - alpha) * n_resamples), n_resamples - 1)]
    return point, lo, hi, statistics.stdev(boots)


def _estimate_entry(
    point: float,
    lo: float,
    hi: float,
    stderr: float,
    confidence_level: float,
) -> dict:
    """Criterion estimates.json entry for one statistic."""
    return {
        "confidence_interval": {
            "confidence_level": confidence_level,
            "lower_bound": lo,
            "upper_bound": hi,
        },
        "point_estimate": point,
        "standard_error": stderr,
    }


def _compute_estimates(
    iters: list[float],
    times_ns: list[float],
    n_bootstrap: int,
    confidence_level: float,
) -> dict:
    """Compute Criterion-compatible estimates.json content.

    All statistics are in **nanoseconds per iteration**, matching Criterion.
    Bootstrap resamples use a fixed seed so that the same sample data always
    produces the same CI bounds.
    """
    per_iter = [t / n for t, n in zip(times_ns, iters)]
    rng = random.Random(_RNG_SEED)

    def _ci(stat_fn):
        return _bootstrap_ci(per_iter, stat_fn, n_bootstrap, confidence_level, rng)

    mean_pt, mean_lo, mean_hi, mean_se = _ci(statistics.mean)
    med_pt,  med_lo,  med_hi,  med_se  = _ci(statistics.median)
    mad_pt,  mad_lo,  mad_hi,  mad_se  = _ci(_mad_normalized)
    std_pt,  std_lo,  std_hi,  std_se  = _ci(statistics.stdev)
    sl_pt,   sl_lo,   sl_hi,   sl_se   = _bootstrap_ci_slope(
        iters, times_ns, n_bootstrap, confidence_level, rng
    )

    e = _estimate_entry
    cl = confidence_level
    return {
        "mean":           e(mean_pt, mean_lo, mean_hi, mean_se, cl),
        "median":         e(med_pt,  med_lo,  med_hi,  med_se,  cl),
        "median_abs_dev": e(mad_pt,  mad_lo,  mad_hi,  mad_se,  cl),
        "slope":          e(sl_pt,   sl_lo,   sl_hi,   sl_se,   cl),
        "std_dev":        e(std_pt,  std_lo,  std_hi,  std_se,  cl),
    }


def _tukey_fences(per_iter_ns: list[float]) -> list[float]:
    """Return Tukey's ``[Q1 - 1.5·IQR, Q1, Q3, Q3 + 1.5·IQR]`` in ns/iter."""
    s = sorted(per_iter_ns)
    q1 = _percentile(s, 25.0)
    q3 = _percentile(s, 75.0)
    iqr = q3 - q1
    return [q1 - 1.5 * iqr, q1, q3, q3 + 1.5 * iqr]