from __future__ import annotations
import json
import os
import shutil
import statistics
import subprocess
import sys
import time
import urllib.request
import urllib.error
from pathlib import Path
ROOT = Path(__file__).resolve().parent
CACHE = ROOT / "datasets" / "bfcl"
RESULTS = ROOT / "results"
BASE = os.environ.get("MR_SERVER_URL", "http://localhost:3001")
NS = "bench-tool-routing"
def fetch_bfcl_v4() -> tuple[Path, Path]:
CACHE.mkdir(parents=True, exist_ok=True)
questions = CACHE / "BFCL_v4_live_multiple.json"
answers = CACHE / "BFCL_v4_live_multiple_answer.json"
if questions.exists() and answers.exists():
return questions, answers
print(f" → cloning gorilla repo (sparse, ~5MB) into {CACHE}/_repo")
repo_dir = CACHE / "_repo"
if repo_dir.exists():
shutil.rmtree(repo_dir)
subprocess.run(
["git", "clone", "--depth", "1", "--filter=blob:none", "--sparse",
"https://github.com/ShishirPatil/gorilla", str(repo_dir)],
check=True, capture_output=True,
)
subprocess.run(
["git", "-C", str(repo_dir), "sparse-checkout", "set",
"berkeley-function-call-leaderboard/bfcl_eval/data"],
check=True, capture_output=True,
)
src = repo_dir / "berkeley-function-call-leaderboard" / "bfcl_eval" / "data"
shutil.copy(src / "BFCL_v4_live_multiple.json", questions)
shutil.copy(src / "possible_answer" / "BFCL_v4_live_multiple.json", answers)
shutil.rmtree(repo_dir)
print(f" → cached: {questions.name}, {answers.name}")
return questions, answers
def parse_bfcl(questions_path: Path, answers_path: Path) -> tuple[
dict[str, str], list[dict], list[tuple[str, str]], ]:
answers: dict[str, str] = {}
with open(answers_path) as f:
for line in f:
row = json.loads(line)
gt = row["ground_truth"][0]
answers[row["id"]] = next(iter(gt.keys()))
desc_pool: dict[str, str] = {}
full_specs: dict[str, dict] = {}
pairs: list[tuple[str, str]] = []
with open(questions_path) as f:
for line in f:
row = json.loads(line)
for fn in row["function"]:
desc_pool.setdefault(fn["name"], fn.get("description", "") or fn["name"])
full_specs.setdefault(fn["name"], fn)
user_msg = row["question"][0][0]["content"]
expected = answers.get(row["id"])
if expected:
pairs.append((user_msg, expected))
return desc_pool, list(full_specs.values()), pairs
def _req(method: str, path: str, body=None, ns: str | None = None,
timeout: float = 30.0) -> dict:
headers = {"Content-Type": "application/json"}
if ns:
headers["X-Namespace-ID"] = ns
data = json.dumps(body).encode() if body is not None else None
req = urllib.request.Request(f"{BASE}{path}", data=data, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=timeout) as r:
raw = r.read()
return json.loads(raw) if raw else {}
except urllib.error.HTTPError as e:
raise RuntimeError(f"HTTP {e.code} {method} {path}: {e.read().decode(errors='replace')}") from None
def health_check() -> bool:
try:
_req("GET", "/api/namespaces")
return True
except Exception as e:
print(f"server not reachable at {BASE}: {e}")
return False
def setup_namespace_plain(function_pool: dict[str, str]) -> int:
try:
_req("DELETE", "/api/namespaces", {"namespace_id": NS})
except Exception:
pass
_req("POST", "/api/namespaces", {
"namespace_id": NS,
"description": "BFCL v4 live_multiple — plain seed-only baseline",
})
n = 0
for fname, desc in function_pool.items():
seed = desc.strip() if desc.strip() else fname
try:
_req("POST", "/api/intents", {
"id": fname,
"phrases": [seed],
}, ns=NS)
n += 1
except RuntimeError as e:
print(f" ⚠ skipping {fname}: {e}")
return n
def setup_namespace_mcp(function_pool_full: list[dict]) -> int:
try:
_req("DELETE", "/api/namespaces", {"namespace_id": NS})
except Exception:
pass
_req("POST", "/api/namespaces", {
"namespace_id": NS,
"description": "BFCL v4 live_multiple — MCP-import + L1-augmented",
})
tools = [{
"name": f["name"],
"description": f.get("description", ""),
"inputSchema": f.get("parameters", {"type": "object", "properties": {}}),
} for f in function_pool_full]
res = _req("POST", "/api/import/mcp/apply", {
"tools_json": json.dumps({"tools": tools}),
"selected": [t["name"] for t in tools],
"domain": "",
}, ns=NS, timeout=300.0)
return int(res.get("imported") or res.get("created") or len(tools))
def measure(pairs: list[tuple[str, str]], grounded_l1: bool = False) -> dict:
top1_hits = 0
top3_hits = 0
latencies_us: list[float] = []
misclassifications: list[tuple[str, str, str]] = []
for query, expected in pairs:
t0 = time.perf_counter()
res = _req("POST", "/api/route_multi", {
"query": query,
"log": False,
"grounded_l1": grounded_l1,
}, ns=NS)
latencies_us.append((time.perf_counter() - t0) * 1_000_000.0)
ranked = res.get("ranked") or res.get("confirmed") or []
top_ids = [m["id"] for m in ranked[:3]]
if top_ids and top_ids[0] == expected:
top1_hits += 1
if expected in top_ids:
top3_hits += 1
else:
misclassifications.append((query, expected, top_ids[0] if top_ids else "(none)"))
n = len(pairs)
sorted_us = sorted(latencies_us)
p = lambda q: sorted_us[min(int((n - 1) * q), n - 1)] if sorted_us else 0
return {
"n": n,
"top1_hits": top1_hits,
"top3_hits": top3_hits,
"top1_pct": round(100 * top1_hits / n, 2) if n else 0,
"top3_pct": round(100 * top3_hits / n, 2) if n else 0,
"latency_p50_us": round(p(0.50), 1),
"latency_p95_us": round(p(0.95), 1),
"latency_p99_us": round(p(0.99), 1),
"latency_mean_us": round(statistics.fmean(latencies_us), 1) if latencies_us else 0,
"misclassified_sample": misclassifications[:10],
}
def main() -> int:
mode = "plain"
for arg in sys.argv[1:]:
if arg in ("--mode=plain", "--plain"): mode = "plain"
elif arg in ("--mode=mcp", "--mcp"): mode = "mcp"
elif arg in ("-h", "--help"):
print(__doc__); print("\nFlags: --plain (default) | --mcp"); return 0
print(f"BFCL v4 tool-routing benchmark → {BASE}")
print(f" mode: {mode}\n")
if not health_check():
return 1
print("→ fetching BFCL v4 live_multiple (real user queries on real APIs)...")
q_path, a_path = fetch_bfcl_v4()
desc_pool, full_specs, test_pairs = parse_bfcl(q_path, a_path)
print(f" {len(desc_pool)} unique functions, {len(test_pairs)} labeled queries\n")
print(f"→ setup [{mode}]: loading {len(desc_pool)} functions as intents into '{NS}'...")
t0 = time.perf_counter()
if mode == "mcp":
n_loaded = setup_namespace_mcp(full_specs)
else:
n_loaded = setup_namespace_plain(desc_pool)
setup_secs = time.perf_counter() - t0
print(f" loaded {n_loaded} intents in {setup_secs:.1f}s\n")
grounded = (mode == "mcp")
print(f"→ measuring: routing {len(test_pairs)} BFCL queries (grounded_l1={grounded})...")
t0 = time.perf_counter()
metrics = measure(test_pairs, grounded_l1=grounded)
print(f" done in {time.perf_counter() - t0:.1f}s\n")
print(f" top-1 {metrics['top1_pct']:>6.2f}% ({metrics['top1_hits']} / {metrics['n']})")
print(f" top-3 {metrics['top3_pct']:>6.2f}% ({metrics['top3_hits']} / {metrics['n']})")
print(f" p50 {metrics['latency_p50_us']:>6.1f} µs")
print(f" p95 {metrics['latency_p95_us']:>6.1f} µs")
print(f" p99 {metrics['latency_p99_us']:>6.1f} µs")
print(f" mean {metrics['latency_mean_us']:>6.1f} µs")
RESULTS.mkdir(exist_ok=True)
out = RESULTS / f"tool_routing_{mode}.json"
out.write_text(json.dumps({
"dataset": "BFCL v4 live_multiple",
"mode": mode,
"setup_seconds": round(setup_secs, 2),
"intents_loaded": n_loaded,
"test_examples": metrics["n"],
"seeds_per_intent": "1 (description)" if mode == "plain"
else "1 (description) + LLM-augmented L1 graph",
**{k: v for k, v in metrics.items() if k != "misclassified_sample"},
"first_misclassifications": metrics["misclassified_sample"],
}, indent=2))
print(f"\n → wrote {out}")
try:
_req("DELETE", "/api/namespaces", {"namespace_id": NS})
except Exception:
pass
return 0
if __name__ == "__main__":
sys.exit(main())