matcher_rs 0.14.3

A high-performance matcher designed to solve LOGICAL and TEXT VARIATIONS problems in word matching, implemented in Rust.
Documentation
#!/usr/bin/env python3

from __future__ import annotations

import dataclasses
import json
import pathlib
import re
from statistics import median
from typing import Iterable

TREE_LINE_RE = re.compile(r"^(?P<prefix>(?:|   )*)(?P<branch>[├╰])(?P<rest>.*)$")
BENCH_ROW_RE = re.compile(
    r"""
    ^(?P<name>.+?)\s{2,}
    (?P<fastest>[0-9.]+\s+(?:ns|µs|us|ms|s))\s+\s+
    (?P<slowest>[0-9.]+\s+(?:ns|µs|us|ms|s))\s+\s+
    (?P<median>[0-9.]+\s+(?:ns|µs|us|ms|s))\s+\s+
    (?P<mean>[0-9.]+\s+(?:ns|µs|us|ms|s))\s+\s+
    (?P<samples>\d+)\s+\s+(?P<iters>\d+)
    $
    """,
    re.VERBOSE,
)
META_RE = re.compile(r"^(?P<key>[A-Za-z][A-Za-z ]+):\s*(?P<value>.+)$")

UNIT_TO_SECONDS = {
    "ns": 1e-9,
    "us": 1e-6,
    "µs": 1e-6,
    "ms": 1e-3,
    "s": 1.0,
}


@dataclasses.dataclass(frozen=True)
class BenchResult:
    path: str
    fastest_s: float
    slowest_s: float
    median_s: float
    mean_s: float
    samples: int
    iters: int

    def metric(self, name: str) -> float:
        return {
            "fastest": self.fastest_s,
            "slowest": self.slowest_s,
            "median": self.median_s,
            "mean": self.mean_s,
        }[name]


@dataclasses.dataclass(frozen=True)
class BenchFile:
    path: pathlib.Path
    metadata: dict[str, str]
    results: dict[str, BenchResult]


@dataclasses.dataclass(frozen=True)
class AggregateRow:
    path: str
    value_s: float
    runs: int
    q1_s: float
    q3_s: float
    spread_pct: float
    noisy: bool


@dataclasses.dataclass(frozen=True)
class AggregateFile:
    path: pathlib.Path
    metadata: dict[str, str]
    metric: str
    rows: dict[str, AggregateRow]


def parse_duration(value: str) -> float:
    amount_str, unit = value.split()
    return float(amount_str) * UNIT_TO_SECONDS[unit]


def format_duration(seconds: float) -> str:
    if seconds < 1e-6:
        return f"{seconds * 1e9:.3f} ns"
    if seconds < 1e-3:
        return f"{seconds * 1e6:.3f} µs"
    if seconds < 1:
        return f"{seconds * 1e3:.3f} ms"
    return f"{seconds:.3f} s"


def parse_bench_file(path: pathlib.Path) -> BenchFile:
    metadata: dict[str, str] = {}
    results: dict[str, BenchResult] = {}
    stack: list[str] = []

    for raw_line in path.read_text(encoding="utf-8").splitlines():
        meta_match = META_RE.match(raw_line)
        if meta_match:
            metadata[meta_match.group("key")] = meta_match.group("value")
            continue

        tree_match = TREE_LINE_RE.match(raw_line)
        if not tree_match:
            continue

        depth = len(tree_match.group("prefix")) // 3
        rest = tree_match.group("rest").rstrip()
        row_match = BENCH_ROW_RE.match(rest)

        if row_match:
            name = row_match.group("name").strip().strip('"')
            stack = stack[:depth]
            full_path = " / ".join(stack + [name])
            results[full_path] = BenchResult(
                path=full_path,
                fastest_s=parse_duration(row_match.group("fastest")),
                slowest_s=parse_duration(row_match.group("slowest")),
                median_s=parse_duration(row_match.group("median")),
                mean_s=parse_duration(row_match.group("mean")),
                samples=int(row_match.group("samples")),
                iters=int(row_match.group("iters")),
            )
            continue

        name = rest.split("", 1)[0].rstrip().strip('"')
        stack = stack[:depth]
        if len(stack) == depth:
            stack.append(name)
        else:
            stack[depth] = name

    return BenchFile(path=path, metadata=metadata, results=results)


def compare_result_maps(
    baseline: dict[str, float],
    candidate: dict[str, float],
    min_change_pct: float,
) -> tuple[list[dict[str, str | int | float]], list[dict[str, str | int | float]], set[str], set[str]]:
    regressions: list[dict[str, str | int | float]] = []
    improvements: list[dict[str, str | int | float]] = []

    shared_paths = sorted(set(baseline) & set(candidate))
    baseline_only = set(baseline) - set(candidate)
    candidate_only = set(candidate) - set(baseline)

    for path in shared_paths:
        base_value = baseline[path]
        cand_value = candidate[path]
        if base_value == 0:
            continue
        delta_pct = ((cand_value - base_value) / base_value) * 100.0
        if abs(delta_pct) < min_change_pct:
            continue

        row = {
            "path": path,
            "baseline": base_value,
            "candidate": cand_value,
            "delta_pct": delta_pct,
        }
        if delta_pct > 0:
            regressions.append(row)
        else:
            improvements.append(row)

    regressions.sort(key=lambda item: item["delta_pct"], reverse=True)
    improvements.sort(key=lambda item: item["delta_pct"])
    return regressions, improvements, baseline_only, candidate_only


def print_change_section(title: str, rows: Iterable[dict[str, str | int | float]]) -> None:
    rows = list(rows)
    print(title)
    if not rows:
        print("  none")
        return

    for row in rows:
        delta_pct = float(row["delta_pct"])
        print(
            "  - {path}: {baseline} -> {candidate} ({delta:+.2f}%)".format(
                path=row["path"],
                baseline=format_duration(float(row["baseline"])),
                candidate=format_duration(float(row["candidate"])),
                delta=delta_pct,
            )
        )


def print_path_section(title: str, paths: Iterable[str]) -> None:
    paths = sorted(paths)
    print(title)
    if not paths:
        print("  none")
        return

    for path in paths:
        print(f"  - {path}")


def median_of_metric(files: Iterable[BenchFile], metric: str) -> dict[str, list[float]]:
    metric_by_path: dict[str, list[float]] = {}
    for bench_file in files:
        for path, result in bench_file.results.items():
            metric_by_path.setdefault(path, []).append(result.metric(metric))
    return metric_by_path


def quartiles(values: list[float]) -> tuple[float, float]:
    ordered = sorted(values)
    if len(ordered) == 1:
        return ordered[0], ordered[0]

    mid = len(ordered) // 2
    if len(ordered) % 2 == 0:
        lower = ordered[:mid]
        upper = ordered[mid:]
    else:
        lower = ordered[:mid]
        upper = ordered[mid + 1 :]
        if not lower:
            lower = ordered
        if not upper:
            upper = ordered

    return median(lower), median(upper)


def aggregate_bench_files(
    files: Iterable[BenchFile],
    metric: str = "median",
    noisy_threshold_pct: float = 5.0,
) -> dict[str, AggregateRow]:
    rows: dict[str, AggregateRow] = {}
    for path, values in median_of_metric(files, metric).items():
        value_s = median(values)
        q1_s, q3_s = quartiles(values)
        spread_pct = 0.0 if value_s == 0 else ((q3_s - q1_s) / value_s) * 100.0
        rows[path] = AggregateRow(
            path=path,
            value_s=value_s,
            runs=len(values),
            q1_s=q1_s,
            q3_s=q3_s,
            spread_pct=spread_pct,
            noisy=spread_pct > noisy_threshold_pct,
        )
    return rows


def write_aggregate_json(path: pathlib.Path, aggregate: AggregateFile) -> None:
    payload = {
        "metadata": aggregate.metadata,
        "metric": aggregate.metric,
        "rows": {
            row_path: {
                "value_s": row.value_s,
                "runs": row.runs,
                "q1_s": row.q1_s,
                "q3_s": row.q3_s,
                "spread_pct": row.spread_pct,
                "noisy": row.noisy,
            }
            for row_path, row in sorted(aggregate.rows.items())
        },
    }
    path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")


def read_aggregate_json(path: pathlib.Path) -> AggregateFile:
    payload = json.loads(path.read_text(encoding="utf-8"))
    rows = {
        row_path: AggregateRow(
            path=row_path,
            value_s=row_payload["value_s"],
            runs=row_payload["runs"],
            q1_s=row_payload["q1_s"],
            q3_s=row_payload["q3_s"],
            spread_pct=row_payload["spread_pct"],
            noisy=row_payload["noisy"],
        )
        for row_path, row_payload in payload["rows"].items()
    }
    return AggregateFile(
        path=path,
        metadata=payload.get("metadata", {}),
        metric=payload["metric"],
        rows=rows,
    )


def render_aggregate_summary(aggregate: AggregateFile) -> str:
    stable = sum(not row.noisy for row in aggregate.rows.values())
    noisy = sum(row.noisy for row in aggregate.rows.values())

    lines = [
        f"Metric: {aggregate.metric}",
        f"Benchmarks: {len(aggregate.rows)}",
        f"Stable rows: {stable}",
        f"Noisy rows (>5% spread): {noisy}",
    ]
    for key, value in aggregate.metadata.items():
        if key == "Metric":
            continue
        lines.append(f"{key}: {value}")

    lines.append("")
    lines.append("Rows")
    for row in sorted(aggregate.rows.values(), key=lambda item: item.path):
        flag = " noisy" if row.noisy else ""
        lines.append(
            f"  - {row.path}: {format_duration(row.value_s)} | spread={row.spread_pct:.2f}% | runs={row.runs}{flag}"
        )
    return "\n".join(lines) + "\n"


def load_aggregate_input(
    path: pathlib.Path,
    metric: str = "median",
    noisy_threshold_pct: float = 5.0,
) -> AggregateFile:
    if path.is_file():
        if path.suffix == ".json":
            return read_aggregate_json(path)
        bench_file = parse_bench_file(path)
        rows = aggregate_bench_files([bench_file], metric=metric, noisy_threshold_pct=noisy_threshold_pct)
        return AggregateFile(path=path, metadata=bench_file.metadata, metric=metric, rows=rows)

    aggregate_json = path / "aggregate.json"
    if aggregate_json.exists():
        return read_aggregate_json(aggregate_json)

    raw_dir = path / "raw"
    candidates = sorted(raw_dir.glob("*.txt")) if raw_dir.exists() else sorted(path.glob("*.txt"))
    files = [
        parse_bench_file(candidate)
        for candidate in candidates
        if candidate.name not in {"summary.txt"}
    ]
    metadata = {"Source": str(path)}
    manifest = path / "manifest.json"
    if manifest.exists():
        manifest_payload = json.loads(manifest.read_text(encoding="utf-8"))
        metadata.update({key.replace("_", " ").title(): str(value) for key, value in manifest_payload.items() if not isinstance(value, list)})
    rows = aggregate_bench_files(files, metric=metric, noisy_threshold_pct=noisy_threshold_pct)
    return AggregateFile(path=path, metadata=metadata, metric=metric, rows=rows)