import asyncio
import hashlib
import json
import os
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable
from .common import (
CYAN,
NC,
RunContext,
_is_runtime_error,
run_context,
delete_cached_response,
get_cache_key_from_cmd,
get_cached_response,
log,
log_error,
log_success,
save_cached_response,
)
_default_parallel = os.cpu_count() or 4
MAX_PARALLEL_QUESTIONS = int(os.environ.get("CODEIX_BENCH_PARALLEL", _default_parallel))
MAX_TURNS_TEST = 25
MAX_TURNS_JUDGE = 10
@dataclass
class QuestionProgress:
question_id: str
max_turns_ab: int = MAX_TURNS_TEST
max_turns_j: int = MAX_TURNS_JUDGE
turns_a: list[str] = field(default_factory=list)
turns_b: list[str] = field(default_factory=list)
turns_j: list[str] = field(default_factory=list)
result: str = "..." done_a: bool = False
done_b: bool = False
done_j: bool = False
def format_turns(self, turns: list[str], max_turns: int, done: bool) -> str:
GREEN = "\033[32m"
RED = "\033[31m"
GRAY = "\033[90m"
NC = "\033[0m"
display = turns.copy()
while len(display) < max_turns:
display.append(' ')
if done:
for i in range(len(display) - 1, -1, -1):
if display[i] == ' ':
display[i] = '-'
else:
break
colored = []
for c in display[:max_turns]:
if c == 'o':
colored.append(f"{GREEN}o{NC}")
elif c == 'x':
colored.append(f"{RED}x{NC}")
elif c == '-':
colored.append(f"{GRAY}-{NC}")
else:
colored.append(c)
return f"[{''.join(colored)}]"
def format_line(self) -> str:
RED = "\033[31m"
NC = "\033[0m"
a_str = self.format_turns(self.turns_a, self.max_turns_ab, self.done_a)
b_str = self.format_turns(self.turns_b, self.max_turns_ab, self.done_b)
j_str = self.format_turns(self.turns_j, self.max_turns_j, self.done_j)
a_label = f"{RED}A{NC}" if 'x' in self.turns_a else "A"
b_label = f"{RED}B{NC}" if 'x' in self.turns_b else "B"
j_label = f"{RED}J{NC}" if 'x' in self.turns_j else "J"
display_result = "=" if self.result == "tie" else self.result
result_str = f" : {display_result}" if self.result != "..." else ""
return f"[bench] {self.question_id:<28} {a_label} {a_str} {b_label} {b_str} {j_label} {j_str}{result_str}"
class ProgressDisplay:
def __init__(self, questions: list[dict]):
self.questions = {q['id']: QuestionProgress(question_id=q['id']) for q in questions}
self.question_order = [q['id'] for q in questions]
self.lock = asyncio.Lock()
self._lines_printed = 0
self._enabled = sys.stderr.isatty()
self._redraw()
async def update(self, question_id: str, **kwargs):
async with self.lock:
if question_id not in self.questions:
return
prog = self.questions[question_id]
for key, value in kwargs.items():
if hasattr(prog, key):
setattr(prog, key, value)
self._redraw()
async def add_turn(self, question_id: str, session: str, char: str = 'o'):
async with self.lock:
if question_id not in self.questions:
return
prog = self.questions[question_id]
if session == 'a':
prog.turns_a.append(char)
elif session == 'b':
prog.turns_b.append(char)
elif session == 'j':
prog.turns_j.append(char)
self._redraw()
async def mark_done(self, question_id: str, session: str):
async with self.lock:
if question_id not in self.questions:
return
prog = self.questions[question_id]
if session == 'a':
prog.done_a = True
elif session == 'b':
prog.done_b = True
elif session == 'j':
prog.done_j = True
self._redraw()
async def set_result(self, question_id: str, result: str):
async with self.lock:
if question_id not in self.questions:
return
self.questions[question_id].result = result
self._redraw()
def _redraw(self):
if not self._enabled:
return
if self._lines_printed > 0:
sys.stderr.write(f"\033[{self._lines_printed}A")
for qid in self.question_order:
prog = self.questions[qid]
line = prog.format_line()
sys.stderr.write(f"\033[2K{line}\n")
sys.stderr.flush()
self._lines_printed = len(self.question_order)
def log(self, msg: str):
if not self._enabled:
sys.stderr.write(f"{msg}\n")
return
if self._lines_printed > 0:
sys.stderr.write(f"\033[{self._lines_printed}A")
sys.stderr.write(f"\033[2K{msg}\n")
for _ in range(self._lines_printed - 1):
sys.stderr.write("\033[2K\n")
sys.stderr.flush()
self._lines_printed = 0
self._redraw()
def finish(self):
if not self._enabled:
for qid in self.question_order:
prog = self.questions[qid]
sys.stderr.write(f"{prog.format_line()}\n")
sys.stderr.flush()
def build_codeix_cmd(
mcp_config: str,
prompt: str,
) -> list[str]:
return [
"claude", "--print", "--output-format", "stream-json", "--verbose",
"--no-session-persistence",
"--max-turns", str(MAX_TURNS_TEST),
"--allowedTools", "mcp__codeindex__*",
"--strict-mcp-config",
"--mcp-config", mcp_config,
"-p", prompt,
]
def build_claude_cmd(
prompt: str,
) -> list[str]:
return [
"claude", "--print", "--output-format", "stream-json", "--verbose",
"--no-session-persistence",
"--max-turns", str(MAX_TURNS_TEST),
"--disallowedTools", "mcp__*",
"-p", prompt,
]
def build_mcp_config(bin_path: str) -> str:
return json.dumps({
"mcpServers": {
"codeindex": {
"command": bin_path,
}
}
})
def build_prompt(project: str, question: str) -> str:
return f"Project: {project}\n\n{question}\n\nUse the codeindex MCP tools to answer."
@dataclass
class ABConfig:
name: str
label_a: str
label_b: str
title: str
setup_run: Callable[[RunContext], tuple[str, str]]
get_commands: Callable[[dict, RunContext], tuple[list[str], Path, list[str], Path]]
setup_a: Callable[[dict, RunContext], bool] | None = None
setup_b: Callable[[dict, RunContext], bool] | None = None
extra_judge_fields: str = ""
def parse_judge_winner(judge: dict) -> str:
if not isinstance(judge, dict):
return "?"
structured = judge.get("structured_output", {})
if isinstance(structured, dict) and "winner" in structured:
return structured["winner"]
result_text = judge.get("result", "")
if isinstance(result_text, dict):
return result_text.get("winner", "?")
if isinstance(result_text, str):
try:
return json.loads(result_text).get("winner", "?")
except json.JSONDecodeError:
pass
match = re.search(r'\{[^{}]*"winner"\s*:\s*"([^"]+)"[^{}]*\}', result_text)
if match:
return match.group(1)
match = re.search(r'"winner"\s*:\s*"([^"]+)"', result_text, re.IGNORECASE)
if match:
return match.group(1)
match = re.search(r'\*?\*?[Ww]inner\*?\*?\s*:\s*\*?\*?([ABab]|[Tt]ie)\*?\*?', result_text)
if match:
return match.group(1).upper() if match.group(1).upper() in ("A", "B") else "tie"
return "?"
def parse_stream_json(stdout_str: str) -> dict:
lines = stdout_str.strip().split("\n")
turns = []
final_result = {}
pending_tools: dict[str, dict] = {}
for line in lines:
if not line.strip():
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
event_type = event.get("type")
if event_type == "assistant":
message = event.get("message", {})
content = message.get("content", [])
tools_used = []
for item in content:
if item.get("type") == "tool_use":
tool_id = item.get("id")
tool_info = {
"id": tool_id,
"name": item.get("name"),
"input": item.get("input"),
"output": None, "is_error": False,
}
tools_used.append(tool_info)
if tool_id:
pending_tools[tool_id] = tool_info
usage = message.get("usage", {})
turns.append({
"tools": tools_used,
"usage": {
"input_tokens": usage.get("input_tokens", 0),
"output_tokens": usage.get("output_tokens", 0),
"cache_read_input_tokens": usage.get("cache_read_input_tokens", 0),
"cache_creation_input_tokens": usage.get("cache_creation_input_tokens", 0),
},
})
elif event_type == "user":
message = event.get("message", {})
content = message.get("content", [])
for item in content:
if item.get("type") == "tool_result":
tool_id = item.get("tool_use_id")
if tool_id and tool_id in pending_tools:
tool_result = event.get("tool_use_result", {})
output = tool_result.get("stdout") if isinstance(tool_result, dict) else None
if not output:
item_content = item.get("content", "")
if isinstance(item_content, list):
output = "\n".join(
block.get("text", "") for block in item_content
if isinstance(block, dict) and block.get("type") == "text"
)
else:
output = item_content
if isinstance(output, str) and len(output) > 2000:
output = output[:2000] + "... (truncated)"
pending_tools[tool_id]["output"] = output
pending_tools[tool_id]["is_error"] = item.get("is_error", False)
elif event_type == "result":
usage = event.get("usage", {})
final_result = {
"result": event.get("result", ""),
"subtype": event.get("subtype", ""),
"is_error": event.get("is_error", False),
"total_cost_usd": event.get("total_cost_usd") or event.get("cost_usd") or event.get("cost"),
"usage": {
"input_tokens": usage.get("input_tokens", 0),
"output_tokens": usage.get("output_tokens", 0),
"cache_read_input_tokens": usage.get("cache_read_input_tokens", 0),
"cache_creation_input_tokens": usage.get("cache_creation_input_tokens", 0),
},
"session_id": event.get("session_id"),
"num_turns": event.get("num_turns"),
}
result = final_result.copy()
result["turns"] = turns
all_tools = []
total_input = 0
total_output = 0
for turn in turns:
all_tools.extend([t["name"] for t in turn["tools"]])
total_input += turn["usage"]["input_tokens"]
total_output += turn["usage"]["output_tokens"]
result["tool_usage"] = {
"tools_called": all_tools,
"tool_count": len(all_tools),
"unique_tools": list(set(all_tools)),
"turns_with_tools": sum(1 for t in turns if t["tools"]),
}
if not result.get("usage") or not result["usage"].get("input_tokens"):
result["usage"] = {"input_tokens": total_input, "output_tokens": total_output}
return result
def classify_tool_output(output: str | None, is_error: bool) -> str:
if is_error:
return 'x'
if not output:
return '.'
output_stripped = output.strip()
if not output_stripped:
return '.'
if output_stripped in ('[]', '{}', '""', "''"):
return '.'
useless_patterns = [
'No files found',
'No matches found',
'No results',
'not found',
'0 matches',
'0 results',
]
for pattern in useless_patterns:
if pattern.lower() in output_stripped.lower():
return '.'
return 'o'
async def run_subprocess_streaming(
cmd: list[str],
cwd: Path | None = None,
bin_dir: Path | None = None,
on_turn: Callable[[str], None] | None = None,
) -> dict:
env = os.environ.copy()
if bin_dir:
env["PATH"] = f"{bin_dir}:{env.get('PATH', '')}"
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(cwd) if cwd else None,
env=env,
start_new_session=True,
)
lines = []
pending_tools: dict[str, dict] = {}
current_turn_tools: list[str] = []
while True:
line = await proc.stdout.readline()
if not line:
break
line_str = line.decode()
lines.append(line_str)
if on_turn:
try:
event = json.loads(line_str)
event_type = event.get("type")
if event_type == "assistant":
message = event.get("message", {})
content = message.get("content", [])
current_turn_tools = []
for item in content:
if item.get("type") == "tool_use":
tool_id = item.get("id")
if tool_id:
current_turn_tools.append(tool_id)
pending_tools[tool_id] = {"output": None, "is_error": False}
if not current_turn_tools:
on_turn(' ')
elif event_type == "user":
message = event.get("message", {})
content = message.get("content", [])
for item in content:
if item.get("type") == "tool_result":
tool_id = item.get("tool_use_id")
if tool_id and tool_id in pending_tools:
tool_result = event.get("tool_use_result", {})
output = tool_result.get("stdout") if isinstance(tool_result, dict) else None
if not output:
item_content = item.get("content", "")
if isinstance(item_content, list):
output = "\n".join(
block.get("text", "") for block in item_content
if isinstance(block, dict) and block.get("type") == "text"
)
else:
output = item_content
pending_tools[tool_id]["output"] = output
pending_tools[tool_id]["is_error"] = item.get("is_error", False)
if current_turn_tools:
all_done = all(
tool_id in pending_tools and pending_tools[tool_id].get("output") is not None
for tool_id in current_turn_tools
)
got_results = any(
tool_id in pending_tools and pending_tools[tool_id].get("output") is not None
for tool_id in current_turn_tools
)
if got_results:
chars = []
for tool_id in current_turn_tools:
if tool_id in pending_tools:
t = pending_tools[tool_id]
chars.append(classify_tool_output(t.get("output"), t.get("is_error", False)))
if 'x' in chars:
on_turn('x')
elif 'o' in chars:
on_turn('o')
else:
on_turn('.')
current_turn_tools = []
except json.JSONDecodeError:
pass
_, stderr = await proc.communicate()
stderr_str = stderr.decode() if stderr else ""
stdout_str = "".join(lines)
if not stdout_str:
return {"result": "", "error": stderr_str}
if "\n" in stdout_str.strip():
result = parse_stream_json(stdout_str)
if stderr_str:
result["error"] = stderr_str
return result
else:
try:
return json.loads(stdout_str)
except json.JSONDecodeError:
return {"result": stdout_str, "error": stderr_str}
except asyncio.CancelledError:
if 'proc' in locals():
proc.terminate()
raise
except Exception as e:
return {"result": "", "error": str(e)}
async def run_subprocess(cmd: list[str], cwd: Path | None = None, bin_dir: Path | None = None) -> dict:
env = os.environ.copy()
if bin_dir:
env["PATH"] = f"{bin_dir}:{env.get('PATH', '')}"
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.DEVNULL, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(cwd) if cwd else None,
env=env,
start_new_session=True, )
stdout, stderr = await proc.communicate()
stdout_str = stdout.decode() if stdout else ""
stderr_str = stderr.decode() if stderr else ""
if not stdout_str:
return {"result": "", "error": stderr_str}
if "\n" in stdout_str.strip():
result = parse_stream_json(stdout_str)
if stderr_str:
result["error"] = stderr_str
return result
else:
try:
return json.loads(stdout_str)
except json.JSONDecodeError:
return {"result": stdout_str, "error": stderr_str}
except asyncio.CancelledError:
if 'proc' in locals():
proc.terminate()
raise
except Exception as e:
return {"result": "", "error": str(e)}
async def run_question(
q: dict,
config: ABConfig,
ctx: RunContext,
progress: ProgressDisplay | None = None,
) -> dict | None:
qid = q['id']
cmd_a, cwd_a, cmd_b, cwd_b = config.get_commands(q, ctx)
cache_key_a = get_cache_key_from_cmd(cmd_a)
cache_key_b = get_cache_key_from_cmd(cmd_b)
cached_a = get_cached_response(cache_key_a)
cached_b = get_cached_response(cache_key_b)
def classify_cached_turn(turn: dict) -> str:
tools = turn.get("tools", [])
if not tools:
return ' '
chars = []
for tool in tools:
output = tool.get("output")
is_error = tool.get("is_error", False)
chars.append(classify_tool_output(output, is_error))
if 'x' in chars:
return 'x'
elif 'o' in chars:
return 'o'
else:
return '.'
if progress:
if cached_a:
for t in cached_a["response"].get("turns", []):
await progress.add_turn(qid, 'a', classify_cached_turn(t))
await progress.mark_done(qid, 'a')
if cached_b:
for t in cached_b["response"].get("turns", []):
await progress.add_turn(qid, 'b', classify_cached_turn(t))
await progress.mark_done(qid, 'b')
if cached_a and cached_b:
response_a = cached_a["response"]
response_b = cached_b["response"]
else:
if not cached_a and config.setup_a and not config.setup_a(q, ctx):
log_error(f" Setup A failed for {q['id']}")
if progress:
await progress.set_result(qid, "err")
return None
if not cached_b and config.setup_b and not config.setup_b(q, ctx):
log_error(f" Setup B failed for {q['id']}")
if progress:
await progress.set_result(qid, "err")
return None
if not cached_a:
def on_turn_a(char: str):
asyncio.create_task(progress.add_turn(qid, 'a', char)) if progress else None
response_a = await run_subprocess_streaming(cmd_a, cwd_a, ctx.bin_dir, on_turn_a if progress else None)
if _is_runtime_error(response_a):
if progress:
prog = progress.questions.get(qid)
if prog:
prog.turns_a = []
cmd_a_fallback = [c for c in cmd_a if c != "--verbose"]
cmd_a_fallback = [c if c != "stream-json" else "json" for c in cmd_a_fallback]
response_a = await run_subprocess(cmd_a_fallback, cwd_a, ctx.bin_dir)
if progress:
await progress.mark_done(qid, 'a')
else:
response_a = cached_a["response"]
if not cached_b:
def on_turn_b(char: str):
asyncio.create_task(progress.add_turn(qid, 'b', char)) if progress else None
response_b = await run_subprocess_streaming(cmd_b, cwd_b, ctx.bin_dir, on_turn_b if progress else None)
if _is_runtime_error(response_b):
if progress:
prog = progress.questions.get(qid)
if prog:
prog.turns_b = []
cmd_b_fallback = [c for c in cmd_b if c != "--verbose"]
cmd_b_fallback = [c if c != "stream-json" else "json" for c in cmd_b_fallback]
response_b = await run_subprocess(cmd_b_fallback, cwd_b, ctx.bin_dir)
if progress:
await progress.mark_done(qid, 'b')
else:
response_b = cached_b["response"]
if not cached_a:
save_cached_response(cache_key_a, response_a, {"question_id": q["id"], "label": config.label_a, "cmd": cmd_a})
if not cached_b:
save_cached_response(cache_key_b, response_b, {"question_id": q["id"], "label": config.label_b, "cmd": cmd_b})
def has_error(resp: dict) -> str | None:
subtype = resp.get("subtype", "")
if subtype.startswith("error"):
return subtype
result = resp.get("result", "")
if isinstance(result, str) and "hit your limit" in result.lower():
return "rate_limit"
if resp.get("is_error") and subtype != "success":
return subtype or "error"
return None
error_a = has_error(response_a)
error_b = has_error(response_b)
if error_a or error_b:
if progress:
await progress.set_result(qid, "err")
result = {
"question": q,
"response_a": response_a,
"response_b": response_b,
"judge": {},
"cost_a": response_a.get("total_cost_usd"),
"cost_b": response_b.get("total_cost_usd"),
"cached_a": cached_a is not None,
"cached_b": cached_b is not None,
"cached_judge": False,
"error_a": error_a,
"error_b": error_b,
}
result_file = ctx.results_dir / f"{q['id']}.json"
result_file.write_text(json.dumps(result, indent=2))
return result
response_a_content = json.dumps(response_a.get('result', response_a), indent=2)
response_b_content = json.dumps(response_b.get('result', response_b), indent=2)
hash_a = hashlib.sha256(response_a_content.encode()).hexdigest()[:16]
hash_b = hashlib.sha256(response_b_content.encode()).hexdigest()[:16]
response_a_file = ctx.results_dir / f"response_{hash_a}.txt"
response_b_file = ctx.results_dir / f"response_{hash_b}.txt"
response_a_file.write_text(response_a_content)
response_b_file.write_text(response_b_content)
cost_a = response_a.get("total_cost_usd")
cost_b = response_b.get("total_cost_usd")
cost_info = ""
if cost_a is not None and cost_b is not None:
cost_info = f"\nCost: A=${cost_a:.4f}, B=${cost_b:.4f}"
cwd_a_rel = cwd_a.relative_to(ctx.run_dir)
judge_prompt = f"""Compare two responses to: "{q['question']}"
The codebase is available at: {cwd_a_rel}
Response A ({config.label_a}): results/response_{hash_a}.txt
Response B ({config.label_b}): results/response_{hash_b}.txt{cost_info}
Use the Read tool to view the full responses.
You may use Grep/Glob on {cwd_a_rel} to verify claims against the actual codebase if needed.
Evaluate accuracy, completeness, and efficiency (lower cost is better for similar quality).
Output JSON: {{"winner": "A"|"B"|"tie", "reason": "brief explanation"{config.extra_judge_fields}}}"""
judge_schema = '{"type":"object","properties":{"winner":{"type":"string","enum":["A","B","tie"]},"reason":{"type":"string"}},"required":["winner","reason"]}'
judge_cmd = [
"claude", "--print", "--output-format", "json",
"--no-session-persistence",
"--dangerously-skip-permissions",
"--max-turns", str(MAX_TURNS_JUDGE),
"--json-schema", judge_schema,
"-p", judge_prompt,
]
judge_cache_key = get_cache_key_from_cmd(judge_cmd)
cached_judge = get_cached_response(judge_cache_key)
if cached_judge:
response = cached_judge.get("response", {})
structured = response.get("structured_output", {})
result_text = response.get("result", "")
has_winner = (isinstance(structured, dict) and "winner" in structured) or (result_text and '"winner"' in result_text)
if not has_winner:
delete_cached_response(judge_cache_key)
cached_judge = None
if cached_judge:
judge_response = cached_judge["response"]
cached_judge_flag = True
if progress:
await progress.add_turn(qid, 'j', 'o')
await progress.mark_done(qid, 'j')
winner = parse_judge_winner(judge_response)
await progress.set_result(qid, winner)
else:
judge_response = await run_subprocess(judge_cmd, cwd=ctx.run_dir)
if progress:
await progress.add_turn(qid, 'j', 'o') await progress.mark_done(qid, 'j')
winner = parse_judge_winner(judge_response)
await progress.set_result(qid, winner)
if not _is_runtime_error(judge_response):
save_cached_response(judge_cache_key, judge_response, {"question_id": q["id"], "type": "judge", "cmd": judge_cmd})
cached_judge_flag = False
result = {
"question": q,
"response_a": response_a,
"response_b": response_b,
"judge": judge_response,
"cost_a": response_a.get("total_cost_usd"),
"cost_b": response_b.get("total_cost_usd"),
"usage_a": response_a.get("usage", {}),
"usage_b": response_b.get("usage", {}),
"tool_usage_a": response_a.get("tool_usage", {}),
"tool_usage_b": response_b.get("tool_usage", {}),
"turns_a": response_a.get("turns", []),
"turns_b": response_b.get("turns", []),
"cached_a": cached_a is not None,
"cached_b": cached_b is not None,
"cached_judge": cached_judge_flag,
"error_a": None, "error_b": None,
}
result_file = ctx.results_dir / f"{q['id']}.json"
result_file.write_text(json.dumps(result, indent=2))
if progress:
winner = parse_judge_winner(judge_response)
await progress.set_result(qid, winner)
return result
async def run_async(
config: ABConfig,
question_id: str | None = None,
) -> list[dict]:
questions_file = Path(__file__).parent / "questions.json"
if not questions_file.exists():
log_error(f"Questions file not found: {questions_file}")
sys.exit(1)
questions = json.loads(questions_file.read_text())
if question_id:
questions = [q for q in questions if q["id"] == question_id]
if not questions:
log_error(f"Question '{question_id}' not found")
sys.exit(1)
with run_context() as ctx:
log(f"Running {config.name} with {len(questions)} question(s) in parallel")
log(f"Run dir: {ctx.run_dir}")
print()
bin_a, bin_b = config.setup_run(ctx)
log(f"A: {config.label_a} ({bin_a})")
log(f"B: {config.label_b} ({bin_b})")
print()
progress = ProgressDisplay(questions)
sem = asyncio.Semaphore(MAX_PARALLEL_QUESTIONS)
async def run_with_sem(q: dict) -> dict | None:
async with sem:
return await run_question(q, config, ctx, progress)
tasks = [asyncio.create_task(run_with_sem(q)) for q in questions]
try:
results = await asyncio.gather(*tasks)
results = [r for r in results if r is not None]
except asyncio.CancelledError:
for task in tasks:
task.cancel()
print("\n\nInterrupted, stopping...")
print("Aborted.")
sys.exit(130)
progress.finish()
print()
print(f"{CYAN}═══════════════════════════════════════════════════════════════{NC}")
print(f"{CYAN}{config.title:^63}{NC}")
print(f"{CYAN}═══════════════════════════════════════════════════════════════{NC}")
print()
print(f"A: {config.label_a}")
print(f"B: {config.label_b}")
print()
print(f"{'Question':<30} {'Winner':^8} {'A cost':>8} {'B cost':>8}")
print("─" * 60)
wins = {"A": 0, "B": 0, "tie": 0}
total_cost_a = 0.0
total_cost_b = 0.0
errors = 0
for r in results:
winner = parse_judge_winner(r.get("judge", {}))
wins[winner] = wins.get(winner, 0) + 1
if r['cost_a']:
total_cost_a += r['cost_a']
if r['cost_b']:
total_cost_b += r['cost_b']
cost_a = f"${r['cost_a']:.2f}" if r['cost_a'] else "-"
cost_b = f"${r['cost_b']:.2f}" if r['cost_b'] else "-"
error_info = ""
if r.get('error_a') or r.get('error_b'):
errors += 1
error_parts = []
if r.get('error_a'):
error_parts.append(f"A:{r['error_a']}")
if r.get('error_b'):
error_parts.append(f"B:{r['error_b']}")
error_info = f" [{', '.join(error_parts)}]"
print(f"{r['question']['id']:<30} {winner:^8} {cost_a:>8} {cost_b:>8}{error_info}")
print("─" * 60)
a_wins = wins.get('A', 0)
b_wins = wins.get('B', 0)
wins_summary = f"A:{a_wins} B:{b_wins}"
cost_a_str = f"${total_cost_a:.2f}"
cost_b_str = f"${total_cost_b:.2f}"
print(f"{'TOTAL':<30} {wins_summary:^8} {cost_a_str:>8} {cost_b_str:>8}")
print()
if a_wins > b_wins:
overall = f"A ({config.label_a})"
elif b_wins > a_wins:
overall = f"B ({config.label_b})"
else:
overall = "tie"
print(f"Winner: {overall}")
cache_hits_a = sum(1 for r in results if r.get("cached_a"))
cache_hits_b = sum(1 for r in results if r.get("cached_b"))
cache_hits_judge = sum(1 for r in results if r.get("cached_judge"))
if cache_hits_a or cache_hits_b or cache_hits_judge:
print(f"Cache: A={cache_hits_a}/{len(results)}, B={cache_hits_b}/{len(results)}, judge={cache_hits_judge}/{len(results)}")
return results
def run(
config: ABConfig,
question_id: str | None = None,
) -> list[dict]:
try:
return asyncio.run(run_async(config, question_id))
except KeyboardInterrupt:
print("\n\nInterrupted.")
sys.exit(130)