from __future__ import annotations
import importlib
import json
import os
import select
import subprocess
import sys
import time
from pathlib import Path
try:
import tomllib except ImportError: import tomli as tomllib
ROOT = Path(__file__).resolve().parent
REPO_ROOT = ROOT.parents[1]
DEFAULT_BIN = REPO_ROOT / "target" / "release" / "tokensave"
DEFAULT_LOG = "/tmp/tokensave_matrix.log"
DEFAULT_REPOS = ROOT / "repos.toml"
DEFAULT_STDERR_DIR = "/tmp/tokensave_matrix_stderr"
BIN = Path(os.environ.get("TOKENSAVE_PROBE_BIN", str(DEFAULT_BIN)))
LOG_PATH = os.environ.get("TOKENSAVE_PROBE_LOG", DEFAULT_LOG)
REPOS_CONF = Path(os.environ.get("TOKENSAVE_PROBE_REPOS", str(DEFAULT_REPOS)))
TIMEOUT = float(os.environ.get("TOKENSAVE_PROBE_TIMEOUT", "25"))
STDERR_DIR = Path(os.environ.get("TOKENSAVE_PROBE_STDERR_DIR", DEFAULT_STDERR_DIR))
class McpInitError(RuntimeError):
def __init__(self, repo: str, stderr_path: Path, hint: str):
super().__init__(f"MCP init failed for {repo}: {hint} (stderr: {stderr_path})")
self.repo = repo
self.stderr_path = stderr_path
self.hint = hint
class McpClient:
def __init__(self, repo_path: str, repo_name: str = "unknown"):
STDERR_DIR.mkdir(parents=True, exist_ok=True)
self.stderr_path = STDERR_DIR / f"{repo_name}.stderr"
self.stderr_file = open(self.stderr_path, "wb")
self.proc = subprocess.Popen(
[str(BIN), "serve"],
cwd=repo_path,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=self.stderr_file,
bufsize=0,
)
self._id = 0
self._buf = b""
self._stale: set[int] = set()
try:
self._send({
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "tokensave-probe", "version": "1"},
},
"id": self._next(),
})
except (BrokenPipeError, OSError) as exc:
raise McpInitError(repo_name, self.stderr_path,
f"server died before initialize ({exc})") from exc
resp = self._recv_id(self._id, timeout=15)
if resp is None:
raise McpInitError(repo_name, self.stderr_path,
"server closed stdout during initialize")
if isinstance(resp, dict) and resp.get("_timeout"):
raise McpInitError(repo_name, self.stderr_path,
"initialize timed out after 15s")
if isinstance(resp, dict) and "error" in resp:
err = resp["error"]
msg = err.get("message", str(err)) if isinstance(err, dict) else str(err)
raise McpInitError(repo_name, self.stderr_path,
f"initialize returned error: {msg[:200]}")
try:
self._send({"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}})
except (BrokenPipeError, OSError) as exc:
raise McpInitError(repo_name, self.stderr_path,
f"server died after initialize ({exc})") from exc
def _next(self) -> int:
self._id += 1
return self._id
def _send(self, msg: dict) -> None:
assert self.proc.stdin is not None
self.proc.stdin.write((json.dumps(msg) + "\n").encode())
self.proc.stdin.flush()
def _readline_blocking(self, timeout: float) -> dict | None:
assert self.proc.stdout is not None
fd = self.proc.stdout.fileno()
deadline = time.time() + timeout
while True:
if b"\n" in self._buf:
line, _, rest = self._buf.partition(b"\n")
self._buf = rest
try:
return json.loads(line.decode())
except Exception:
return {"raw": line.decode()[:200]}
remaining = deadline - time.time()
if remaining <= 0:
return {"_timeout": True}
r, _, _ = select.select([fd], [], [], remaining)
if not r:
return {"_timeout": True}
try:
chunk = os.read(fd, 65536)
except OSError:
return None
if not chunk:
return None
self._buf += chunk
def _recv_id(self, want_id: int, timeout: float) -> dict:
deadline = time.time() + timeout
while True:
remaining = max(0.0, deadline - time.time())
msg = self._readline_blocking(remaining)
if msg is None:
return {"error": {"message": "server closed stdout"}}
if msg.get("_timeout"):
return msg
mid = msg.get("id")
if mid == want_id:
return msg
if mid in self._stale:
self._stale.discard(mid)
continue
if mid is None:
continue
self._stale.discard(mid)
def call(self, tool: str, args: dict, timeout: float = TIMEOUT) -> tuple[dict | None, float]:
rid = self._next()
self._send({
"jsonrpc": "2.0",
"method": "tools/call",
"params": {"name": tool, "arguments": args},
"id": rid,
})
t0 = time.time()
resp = self._recv_id(rid, timeout=timeout)
dt = time.time() - t0
if isinstance(resp, dict) and resp.get("_timeout"):
self._stale.add(rid)
return resp, dt
def close(self) -> None:
try:
if self.proc.stdin is not None:
self.proc.stdin.close()
self.proc.wait(timeout=3)
except Exception:
self.proc.kill()
try:
self.stderr_file.close()
except Exception:
pass
def text_of(resp) -> str | None:
if not isinstance(resp, dict):
return None
res = resp.get("result")
if not isinstance(res, dict):
return None
content = res.get("content") or []
for block in content:
t = block.get("text", "")
try:
json.loads(t)
return t
except Exception:
continue
return content[0].get("text") if content else None
def discover(cli: McpClient) -> dict:
out: dict = {"ids": [], "qnames": [], "names": [], "files": []}
for query in ["main", "new", "Error", "Config", "Display"]:
resp, _ = cli.call("tokensave_search", {"query": query}, timeout=15)
txt = text_of(resp)
if not txt:
continue
try:
parsed = json.loads(txt)
except Exception:
continue
if isinstance(parsed, list):
items = parsed
elif isinstance(parsed, dict):
items = parsed.get("results") or parsed.get("items") or parsed.get("matches") or []
else:
items = []
for item in items[:20]:
if not isinstance(item, dict):
continue
if item.get("id"):
out["ids"].append(item["id"])
if item.get("qualified_name"):
out["qnames"].append(item["qualified_name"])
fp = item.get("file") or item.get("file_path")
if fp:
out["files"].append(fp)
if item.get("name"):
out["names"].append(item["name"])
for key in out:
out[key] = list(dict.fromkeys(out[key]))[:5]
for nid in list(out["ids"]):
if len(out["qnames"]) >= 5:
break
resp, _ = cli.call("tokensave_node", {"node_id": nid}, timeout=10)
txt = text_of(resp)
if not txt:
continue
try:
parsed = json.loads(txt)
if isinstance(parsed, dict) and parsed.get("qualified_name"):
out["qnames"].append(parsed["qualified_name"])
except Exception:
pass
out["qnames"] = list(dict.fromkeys(out["qnames"]))[:5]
def pad(lst, fallback):
while len(lst) < 5:
lst.append(fallback)
return lst
out["ids"] = pad(out["ids"], "__missing__")
out["qnames"] = pad(out["qnames"], "__missing__")
out["names"] = pad(out["names"], "main")
out["files"] = pad(out["files"], "src/lib.rs")
return out
def classify(resp, dt) -> tuple[str, str]:
if resp is None:
return "BAD", "no response"
if isinstance(resp, dict) and resp.get("_timeout"):
return "TIMEOUT", f"{dt:.1f}s"
if "error" in resp:
err = resp["error"]
msg = err.get("message", "") if isinstance(err, dict) else str(err)
return "ERROR", msg[:100]
res = resp.get("result")
if res is None:
return "BAD", "no result"
if res.get("isError"):
c = res.get("content") or [{}]
return "ERROR", (c[0].get("text") or "")[:100]
c = res.get("content")
if not c:
return "EMPTY", "no content"
txt = c[0].get("text", "")
if not txt.strip():
return "EMPTY", "blank"
if dt > 10:
return "SLOW", f"{dt:.1f}s"
return "OK", f"{dt:.2f}s"
def load_repos() -> list[dict]:
with open(REPOS_CONF, "rb") as f:
return tomllib.load(f).get("repos", [])
def load_probe_set(languages: list[str], discovered: dict) -> dict:
merged: dict = {}
for lang in languages:
try:
mod = importlib.import_module(f"tools.{lang}")
except ImportError:
print(f"warning: no probe module for language '{lang}'", file=sys.stderr)
continue
merged.update(mod.probes_for(discovered))
return merged
def main() -> int:
if not BIN.exists():
print(f"error: tokensave binary not found at {BIN}", file=sys.stderr)
print(f"hint: run `cargo build --release` first", file=sys.stderr)
return 2
sys.path.insert(0, str(ROOT))
repos = load_repos()
if not repos:
print(f"error: no repos in {REPOS_CONF}", file=sys.stderr)
return 2
log = open(LOG_PATH, "w")
for repo in repos:
name = repo["name"]
path = repo["path"]
if not Path(path).exists():
print(f"skip {name}: {path} not found", file=sys.stderr)
continue
print(f"=== {name} ({path}) ===", file=sys.stderr, flush=True)
try:
cli = McpClient(path, repo_name=name)
except McpInitError as exc:
print(f"skip {name}: {exc.hint} — see {exc.stderr_path}", file=sys.stderr)
log.write(f"{name}\t_init_\t{{}}\tBAD\t{exc.hint}\n")
log.flush()
continue
try:
discovered = discover(cli)
probes = load_probe_set(repo.get("languages", []), discovered)
for tool, queries in probes.items():
for query in queries:
resp, dt = cli.call(tool, query, timeout=TIMEOUT)
st, detail = classify(resp, dt)
log.write(f"{name}\t{tool}\t{json.dumps(query)[:120]}\t{st}\t{detail}\n")
log.flush()
finally:
cli.close()
log.close()
print(f"log written to {LOG_PATH}", file=sys.stderr)
return 0
if __name__ == "__main__":
sys.exit(main())