from __future__ import annotations
import argparse
import csv
import json
import math
import os
import subprocess
import sys
import unicodedata
from collections import defaultdict
from math import comb
from pathlib import Path
from typing import Iterator
K_FOR_SUCCESS = 3
BIN_PATH = Path(
os.environ.get("POND_BIN") or Path(__file__).resolve().parents[2] / "target/release/pond"
)
def nfc(text: str) -> str:
return unicodedata.normalize("NFC", text)
def session_root(session_id: str) -> str:
idx = session_id.find("/")
return session_id[:idx] if idx >= 0 else session_id
def parse_ground_truth(spec: str) -> tuple[str, list[str]]:
if spec.startswith("prefix:"):
return "prefix", [t.strip() for t in spec[7:].split(",") if t.strip()]
if spec.startswith("anchor:"):
return "anchor", [nfc(spec[7:].strip().lower())]
raise ValueError(f"unknown ground-truth scheme: {spec!r}")
def find_match_rank(hits: list[dict], kind: str, tokens: list[str]) -> int:
for idx, hit in enumerate(hits, start=1):
if kind == "prefix":
sid = (hit.get("session_id") or "")[:8]
mid = (hit.get("message_id") or "")[:8]
if sid in tokens or mid in tokens:
return idx
else:
text = nfc((hit.get("text") or "").lower())
if any(tok in text for tok in tokens):
return idx
return 0
def wilson_ci(successes: int, n: int, z: float = 1.96) -> tuple[float, float]:
if n == 0:
return 0.0, 0.0
p = successes / n
denom = 1.0 + z * z / n
center = (p + z * z / (2 * n)) / denom
halfwidth = (z * math.sqrt(p * (1 - p) / n + z * z / (4 * n * n))) / denom
return max(0.0, center - halfwidth), min(1.0, center + halfwidth)
def iter_queries(queries_path: Path) -> Iterator[dict]:
with queries_path.open() as f:
for row in csv.DictReader(f, delimiter="\t"):
yield row
def check_binary() -> None:
if not BIN_PATH.is_file():
print(f"pond binary not found: {BIN_PATH}", file=sys.stderr)
print("build it with: cargo build --release", file=sys.stderr)
sys.exit(69)
def run_search(query: str, mode: str, limit: int, grouped: bool = False) -> dict:
cmd = [str(BIN_PATH), "search", "--mode", mode, "--limit", str(limit), "--format", "json"]
if grouped:
cmd.append("--group-by-conversation")
cmd.extend(["--", query])
result = subprocess.run(cmd, capture_output=True, text=True, check=False)
if result.returncode != 0:
return {"hits": [], "error": result.stderr.strip() or f"exit {result.returncode}"}
try:
return json.loads(result.stdout)
except json.JSONDecodeError as e:
return {"hits": [], "error": f"json: {e}"}
class KbMcpClient:
def __init__(self) -> None:
self.proc = subprocess.Popen(
["kb", "mcp"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
bufsize=1,
)
self._req_id = 0
self._send({
"jsonrpc": "2.0", "method": "initialize", "id": self._next_id(),
"params": {"protocolVersion": "2024-11-05", "capabilities": {},
"clientInfo": {"name": "pond-bench", "version": "0.1"}},
})
self._recv() self._send({"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}})
def _next_id(self) -> int:
self._req_id += 1
return self._req_id
def _send(self, obj: dict) -> None:
assert self.proc.stdin is not None
self.proc.stdin.write(json.dumps(obj) + "\n")
self.proc.stdin.flush()
def _recv(self) -> dict | None:
assert self.proc.stdout is not None
line = self.proc.stdout.readline()
return json.loads(line) if line else None
def search(self, query: str, limit: int, min_score: float) -> dict:
self._send({
"jsonrpc": "2.0", "method": "tools/call", "id": self._next_id(),
"params": {"name": "kb_search",
"arguments": {"query": query, "limit": limit, "min_score": min_score}},
})
resp = self._recv()
if resp is None:
return {"hits": [], "error": "kb mcp closed stdout"}
if "error" in resp:
return {"hits": [], "error": str(resp["error"])}
result = (resp.get("result") or {}).get("structuredContent")
if not result:
return {"hits": [], "error": "no structuredContent"}
return result
def close(self) -> None:
try:
if self.proc.stdin:
self.proc.stdin.close()
finally:
self.proc.terminate()
try:
self.proc.wait(timeout=5)
except subprocess.TimeoutExpired:
self.proc.kill()
def normalize_hits(payload: dict) -> list[dict]:
if (hits := payload.get("hits")) is not None:
return hits
if (groups := payload.get("groups")) is not None:
return [
{"session_id": g.get("session_id", ""), "message_id": "", "text": g.get("text", "")}
for g in groups
]
result = payload.get("result")
if isinstance(result, dict) and (kb := result.get("results")):
return [
{
"session_id": h.get("conversation_id", ""),
"message_id": h.get("id", ""),
"text": h.get("content", ""),
}
for h in kb
]
return []
def cmd_run(args: argparse.Namespace) -> int:
backend = getattr(args, "backend", "pond")
if backend == "pond":
check_binary()
kb_client = None
elif backend == "kb-mcp":
kb_client = KbMcpClient()
else:
print(f"unknown backend: {backend}", file=sys.stderr)
return 64
out_dir = Path(args.out)
out_dir.mkdir(parents=True, exist_ok=True)
count = errors = 0
try:
for row in iter_queries(Path(args.queries)):
qid = row["id"]
if backend == "pond":
envelope = run_search(row["query"], args.mode, args.limit, args.grouped)
else:
envelope = kb_client.search(row["query"], args.limit, args.kb_min_score)
if "error" in envelope:
errors += 1
(out_dir / f"{qid}.stderr").write_text(envelope["error"])
print(f"FAIL {qid} (backend={backend}): {envelope['error']}", file=sys.stderr)
continue
(out_dir / f"{qid}.json").write_text(json.dumps(envelope))
count += 1
finally:
if kb_client is not None:
kb_client.close()
suffix = " grouped" if args.grouped else ""
mode_label = args.mode if backend == "pond" else f"kb-mcp(min_score={args.kb_min_score})"
print(
f"done: ran {count} queries in {mode_label} mode{suffix} "
f"(limit={args.limit}); {errors} errors"
)
return 0 if errors == 0 else 1
def cmd_verify(args: argparse.Namespace) -> int:
check_binary()
total = 0
missing: list[tuple[str, str, str, str]] = []
for row in iter_queries(Path(args.queries)):
total += 1
kind, tokens = parse_ground_truth(row["ground_truth"])
fts = run_search(row["query"], "fts", args.limit).get("hits") or []
if find_match_rank(fts, kind, tokens) > 0:
continue
vec = run_search(row["query"], "vector", args.limit).get("hits") or []
if find_match_rank(vec, kind, tokens) > 0:
continue
missing.append((row["id"], kind, row["query"], row["ground_truth"]))
if missing:
for qid, scheme, q, gt in missing:
print(f'MISSING {qid} ({scheme}): "{q}" -> {gt}', file=sys.stderr)
print(
f"\nanchor verification FAILED: {len(missing)}/{total} queries "
f"have unreachable ground truth",
file=sys.stderr,
)
print(
f"(target not in FTS top-{args.limit} AND not in Vector top-{args.limit})",
file=sys.stderr,
)
return 1
print(
f"anchor verification OK: {total}/{total} queries reachable in "
f"FTS or Vector top-{args.limit}"
)
return 0
def cmd_score(args: argparse.Namespace) -> int:
queries_path = Path(args.queries)
results_dir = Path(args.results)
out_csv = Path(args.out)
if not results_dir.is_dir():
print(f"results dir not found: {results_dir}", file=sys.stderr)
return 66
rows: list[dict] = []
for row in iter_queries(queries_path):
qid = row["id"]
base = {"id": qid, "stratum": row["stratum"], "lang": row["lang"]}
result_file = results_dir / f"{qid}.json"
if not result_file.exists():
rows.append({**base, "rank": 0, "note": "missing"})
continue
try:
payload = json.loads(result_file.read_text())
except json.JSONDecodeError as e:
rows.append({**base, "rank": 0, "note": f"json:{e}"})
continue
kind, tokens = parse_ground_truth(row["ground_truth"])
rank = find_match_rank(normalize_hits(payload), kind, tokens)
rows.append({**base, "rank": rank, "note": ""})
with out_csv.open("w", newline="") as f:
w = csv.DictWriter(f, fieldnames=["id", "stratum", "lang", "mode", "rank", "note"])
w.writeheader()
for r in rows:
w.writerow({**r, "mode": args.label})
_print_stratum_table(args.label, rows)
return 0
def _print_stratum_table(label: str, rows: list[dict]) -> None:
strata: dict[str, list[dict]] = {}
for r in rows:
strata.setdefault(f"{r['lang']}/{r['stratum']}", []).append(r)
print(f"# {label}\n")
print(
f"| stratum | n | S@{K_FOR_SUCCESS} | S@{K_FOR_SUCCESS} 95% CI | "
f"P@1 | P@1 95% CI | MRR |"
)
print("|---------|---|----|----|-----|----|-----|")
total_n = total_s3 = total_p1 = 0
total_mrr = 0.0
for stratum in sorted(strata):
items = strata[stratum]
n = len(items)
s3 = sum(1 for r in items if 1 <= r["rank"] <= K_FOR_SUCCESS)
p1 = sum(1 for r in items if r["rank"] == 1)
mrr = sum((1.0 / r["rank"]) if r["rank"] >= 1 else 0.0 for r in items) / n
s3_lo, s3_hi = wilson_ci(s3, n)
p1_lo, p1_hi = wilson_ci(p1, n)
print(
f"| {stratum} | {n} | {s3}/{n} = {s3 / n:.2f} | "
f"[{s3_lo:.2f},{s3_hi:.2f}] | "
f"{p1}/{n} = {p1 / n:.2f} | "
f"[{p1_lo:.2f},{p1_hi:.2f}] | {mrr:.3f} |"
)
total_n += n
total_s3 += s3
total_p1 += p1
total_mrr += mrr * n
if total_n:
print(
f"| ALL (unweighted sum) | {total_n} | "
f"{total_s3}/{total_n} = {total_s3 / total_n:.2f} | -- | "
f"{total_p1}/{total_n} = {total_p1 / total_n:.2f} | -- | "
f"{total_mrr / total_n:.3f} |"
)
def cmd_pair(args: argparse.Namespace) -> int:
def load(path: Path) -> dict[str, dict]:
out: dict[str, dict] = {}
with path.open() as f:
for r in csv.DictReader(f):
out[r["id"]] = {
"stratum": f"{r['lang']}/{r['stratum']}",
"rank": int(r["rank"]),
}
return out
a = load(Path(args.csv_a))
b = load(Path(args.csv_b))
by_stratum: dict[str, tuple[list[int], list[int]]] = {}
for qid in sorted(set(a) & set(b)):
s = a[qid]["stratum"]
by_stratum.setdefault(s, ([], []))
by_stratum[s][0].append(a[qid]["rank"])
by_stratum[s][1].append(b[qid]["rank"])
print(
f"# Paired sign test: {args.label_a} vs {args.label_b} "
f"(Success@{K_FOR_SUCCESS})\n"
)
print(
f"| stratum | n | {args.label_a}-only wins | "
f"{args.label_b}-only wins | ties | n_nonzero | p (two-sided) |"
)
print("|---------|---|----|----|------|-----------|---------------|")
def hit(rank: int) -> int:
return 1 if 1 <= rank <= K_FOR_SUCCESS else 0
for stratum in sorted(by_stratum):
ar, br = by_stratum[stratum]
wins_a = sum(1 for ra, rb in zip(ar, br) if hit(ra) and not hit(rb))
wins_b = sum(1 for ra, rb in zip(ar, br) if hit(rb) and not hit(ra))
ties = sum(1 for ra, rb in zip(ar, br) if hit(ra) == hit(rb))
n_nz = wins_a + wins_b
if n_nz == 0:
p = 1.0
else:
smaller = min(wins_a, wins_b)
tail = sum(comb(n_nz, k) for k in range(smaller + 1)) / (2**n_nz)
p = min(1.0, 2 * tail)
print(
f"| {stratum} | {len(ar)} | {wins_a} | {wins_b} | "
f"{ties} | {n_nz} | {p:.3f} |"
)
return 0
def load_arm(path: Path) -> tuple[list[dict], float | None]:
if not path.is_file():
return [], None
try:
payload = json.loads(path.read_text())
except json.JSONDecodeError:
return [], None
hits = payload.get("hits") or []
if not hits:
return [], None
first = hits[0]
top = first.get("_score") or first.get("score") or first.get("bm25")
return hits, (float(top) if top is not None else None)
def wrrf_merge(arms: list[tuple[list[dict], int, float]]) -> list[dict]:
merged: dict[str, dict] = {}
for hits, k, weight in arms:
k_eff = float(max(k, 1))
seen: set[str] = set()
dedup_rank = 0
for hit in hits:
root = session_root(hit.get("session_id") or "")
if root in seen:
continue
seen.add(root)
dedup_rank += 1
contribution = weight / (k_eff + float(dedup_rank))
entry = merged.setdefault(root, {"score": 0.0, "rep": hit})
entry["score"] += contribution
return _sort_merged(merged)
def normscore_merge(
arms: list[tuple[list[dict], float, str]],
norm: str = "minmax",
) -> list[dict]:
merged: dict[str, dict] = {}
for hits, weight, score_key in arms:
if not hits:
continue
raw = [float(h.get(score_key) or h.get("_score") or h.get("score") or 0.0) for h in hits]
if norm == "minmax":
lo, hi = min(raw), max(raw)
rng = hi - lo if hi > lo else 1.0
normed = [(r - lo) / rng for r in raw]
elif norm == "zscore":
mean = sum(raw) / len(raw)
var = sum((r - mean) ** 2 for r in raw) / max(len(raw), 1)
sd = math.sqrt(var) if var > 0 else 1.0
normed = [(r - mean) / sd for r in raw]
else:
normed = raw
seen: set[str] = set()
for h, ns in zip(hits, normed):
root = session_root(h.get("session_id") or "")
if root in seen:
continue
seen.add(root)
entry = merged.setdefault(root, {"score": 0.0, "rep": h})
entry["score"] += weight * ns
return _sort_merged(merged)
def _sort_merged(merged: dict[str, dict]) -> list[dict]:
out = [{**e["rep"], "_fused_score": e["score"]} for e in merged.values()]
out.sort(
key=lambda h: (
-h["_fused_score"],
h.get("session_id") or "",
h.get("message_id") or "",
)
)
return out
def parse_variant(spec: str) -> dict:
if spec == "fts-only":
return {"kind": "fts-only", "label": "fts-only"}
if spec == "vector-only":
return {"kind": "vector-only", "label": "vector-only"}
if spec.startswith("norm:"):
parts = spec[5:].split(",")
if len(parts) != 2:
raise ValueError("norm: expects w_fts,w_vec")
w_fts, w_vec = float(parts[0]), float(parts[1])
return {"kind": "norm", "w_fts": w_fts, "w_vec": w_vec, "label": f"norm:w=({w_fts},{w_vec})"}
if spec.startswith("sym:"):
k = int(spec[4:])
return {"kind": "rrf", "k_fts": k, "k_vec": k, "label": f"sym:k={k}"}
if spec.startswith("asym:"):
kf, kv = spec[5:].split(",")
return {"kind": "rrf", "k_fts": int(kf), "k_vec": int(kv), "label": f"asym:{kf},{kv}"}
if spec.startswith("wrrf:"):
parts = spec[5:].split(",")
if len(parts) != 4:
raise ValueError("wrrf: expects k_fts,k_vec,w_fts,w_vec")
kf, kv, wf, wv = int(parts[0]), int(parts[1]), float(parts[2]), float(parts[3])
return {"kind": "wrrf", "k_fts": kf, "k_vec": kv, "w_fts": wf, "w_vec": wv,
"label": f"wrrf:k=({kf},{kv}),w=({wf},{wv})"}
if spec.startswith("fts-gate:"):
thresh_s, ks = spec[len("fts-gate:"):].split(":")
kf, kv = ks.split(",")
return {"kind": "fts-gate", "threshold": float(thresh_s),
"k_fts": int(kf), "k_vec": int(kv),
"label": f"fts-gate:t={thresh_s},k=({kf},{kv})"}
raise ValueError(f"unknown variant: {spec!r}")
def apply_variant(
variant: dict, fts_hits: list[dict], vec_hits: list[dict], fts_top: float | None
) -> list[dict]:
kind = variant["kind"]
if kind == "fts-only":
return fts_hits
if kind == "vector-only":
return vec_hits
if kind == "norm":
return normscore_merge(
[(fts_hits, variant["w_fts"], "base_score"), (vec_hits, variant["w_vec"], "base_score")]
)
if kind == "rrf":
return wrrf_merge([(fts_hits, variant["k_fts"], 1.0), (vec_hits, variant["k_vec"], 1.0)])
if kind == "wrrf":
return wrrf_merge(
[
(fts_hits, variant["k_fts"], variant["w_fts"]),
(vec_hits, variant["k_vec"], variant["w_vec"]),
]
)
if kind == "fts-gate":
if fts_top is not None and fts_top < variant["threshold"]:
return vec_hits
return wrrf_merge([(fts_hits, variant["k_fts"], 1.0), (vec_hits, variant["k_vec"], 1.0)])
raise ValueError(f"unimplemented variant kind: {kind}")
def score_variant(
variant: dict, queries: list[dict], fts_dir: Path, vec_dir: Path
) -> dict:
per_query: list[dict] = []
for row in queries:
qid = row["id"]
kind, tokens = parse_ground_truth(row["ground_truth"])
fts_hits, fts_top = load_arm(fts_dir / f"{qid}.json")
vec_hits, _ = load_arm(vec_dir / f"{qid}.json")
fused = apply_variant(variant, fts_hits, vec_hits, fts_top)
per_query.append({
"id": qid,
"stratum": row["stratum"],
"lang": row["lang"],
"rank": find_match_rank(fused, kind, tokens),
})
n = len(per_query)
s3 = sum(1 for r in per_query if 1 <= r["rank"] <= K_FOR_SUCCESS)
p1 = sum(1 for r in per_query if r["rank"] == 1)
mrr = sum((1.0 / r["rank"]) if r["rank"] >= 1 else 0.0 for r in per_query)
by_stratum: dict[str, list[dict]] = defaultdict(list)
for r in per_query:
by_stratum[f"{r['lang']}/{r['stratum']}"].append(r)
return {
"label": variant["label"],
"n": n,
"s3": s3,
"p1": p1,
"mrr": (mrr / n) if n else 0.0,
"by_stratum": dict(by_stratum),
"per_query": per_query,
}
def default_grid() -> list[str]:
return [
"fts-only",
"vector-only",
"norm:0.135,1.0",
"sym:60",
"asym:5,20",
"norm:0.05,1.0",
"norm:0.10,1.0",
"norm:0.20,1.0",
]
def cmd_sweep(args: argparse.Namespace) -> int:
queries = list(iter_queries(Path(args.queries)))
fts_dir = Path(args.fts_fixtures)
vec_dir = Path(args.vector_fixtures)
grid = args.variant or default_grid()
rows = []
for spec in grid:
try:
variant = parse_variant(spec)
except ValueError as e:
print(f"skip {spec!r}: {e}", file=sys.stderr)
continue
rows.append(score_variant(variant, queries, fts_dir, vec_dir))
print(f"# Fusion sweep ({len(queries)} queries)\n")
print(f"| variant | n | S@{K_FOR_SUCCESS} | S@{K_FOR_SUCCESS} 95% CI | P@1 | MRR |")
print("|---------|---|-----|----------|-----|-----|")
for r in rows:
lo, hi = wilson_ci(r["s3"], r["n"])
print(
f"| {r['label']} | {r['n']} | "
f"{r['s3']}/{r['n']} = {r['s3'] / max(r['n'], 1):.2f} | "
f"[{lo:.2f},{hi:.2f}] | "
f"{r['p1']}/{r['n']} = {r['p1'] / max(r['n'], 1):.2f} | "
f"{r['mrr']:.3f} |"
)
if args.by_stratum:
print("\n## Per-stratum S@3\n")
strata = sorted({s for r in rows for s in r["by_stratum"]})
print("| variant | " + " | ".join(strata) + " |")
print("|---------|" + "|".join(["----"] * len(strata)) + "|")
for r in rows:
cells = []
for s in strata:
items = r["by_stratum"].get(s, [])
n_s = len(items)
if n_s == 0:
cells.append("-")
continue
s3_s = sum(1 for it in items if 1 <= it["rank"] <= K_FOR_SUCCESS)
cells.append(f"{s3_s}/{n_s}")
print(f"| {r['label']} | " + " | ".join(cells) + " |")
return 0
def cmd_variant(args: argparse.Namespace) -> int:
queries = list(iter_queries(Path(args.queries)))
variant = parse_variant(args.variant)
result = score_variant(variant, queries, Path(args.fts_fixtures), Path(args.vector_fixtures))
out = Path(args.out)
with out.open("w", newline="") as f:
w = csv.DictWriter(f, fieldnames=["id", "stratum", "lang", "mode", "rank", "note"])
w.writeheader()
for r in result["per_query"]:
w.writerow({**r, "mode": result["label"], "note": ""})
lo, hi = wilson_ci(result["s3"], result["n"])
print(
f"{result['label']}: S@3 = {result['s3']}/{result['n']} = "
f"{result['s3'] / max(result['n'], 1):.2f} [{lo:.2f},{hi:.2f}]; "
f"P@1 = {result['p1']}/{result['n']}; MRR = {result['mrr']:.3f}"
)
print(f"per-query ranks written to {out} (use `bench.py pair` for sign tests)")
return 0
def main() -> int:
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
sub = parser.add_subparsers(dest="cmd", required=True)
p_run = sub.add_parser("run", help="Run one retrieval mode against a query set")
p_run.add_argument("--queries", required=True, help="TSV: id\\tlang\\tstratum\\tquery\\tground_truth")
p_run.add_argument("--mode", default="hybrid", choices=["fts", "vector", "hybrid"],
help="pond retrieval mode (ignored for backend=kb-mcp)")
p_run.add_argument("--backend", default="pond", choices=["pond", "kb-mcp"],
help="Search backend: pond CLI (default) or kb MCP server over stdio")
p_run.add_argument("--kb-min-score", type=float, default=0.0,
help="kb_search min_score (default 0.0 for apples-to-apples vs pond default)")
p_run.add_argument("--out", required=True, help="Output dir for per-query JSON envelopes")
p_run.add_argument("--limit", type=int, default=20, help="pond search --limit (default 20)")
p_run.add_argument("--grouped", action="store_true", help="Pass --group-by-conversation")
p_run.set_defaults(func=cmd_run)
p_verify = sub.add_parser("verify", help="Check every query's ground truth is reachable")
p_verify.add_argument("--queries", required=True)
p_verify.add_argument("--limit", type=int, default=200, help="Top-N per arm (default 200)")
p_verify.set_defaults(func=cmd_verify)
p_score = sub.add_parser("score", help="Score results against ground truth")
p_score.add_argument("--queries", required=True)
p_score.add_argument("--results", required=True, help="Dir of <id>.json files from `run`")
p_score.add_argument("--label", required=True, help="Run label written into the CSV")
p_score.add_argument("--out", required=True, help="CSV path for per-query ranks")
p_score.set_defaults(func=cmd_score)
p_pair = sub.add_parser("pair", help="Paired sign test on two ranks CSVs (Success@3)")
p_pair.add_argument("--csv-a", required=True)
p_pair.add_argument("--csv-b", required=True)
p_pair.add_argument("--label-a", required=True)
p_pair.add_argument("--label-b", required=True)
p_pair.set_defaults(func=cmd_pair)
p_sweep = sub.add_parser("sweep", help="Score a grid of fusion variants over captured fixtures")
p_sweep.add_argument("--queries", required=True)
p_sweep.add_argument("--fts-fixtures", required=True)
p_sweep.add_argument("--vector-fixtures", required=True)
p_sweep.add_argument("--variant", action="append",
help="Variant spec; repeat for multiple. Defaults to a built-in grid.")
p_sweep.add_argument("--by-stratum", action="store_true",
help="Also print a per-stratum S@3 table for each variant.")
p_sweep.set_defaults(func=cmd_sweep)
p_variant = sub.add_parser("variant", help="Score one fusion variant; emit per-query ranks CSV")
p_variant.add_argument("--variant", required=True)
p_variant.add_argument("--queries", required=True)
p_variant.add_argument("--fts-fixtures", required=True)
p_variant.add_argument("--vector-fixtures", required=True)
p_variant.add_argument("--out", required=True)
p_variant.set_defaults(func=cmd_variant)
args = parser.parse_args()
return args.func(args)
if __name__ == "__main__":
sys.exit(main())