import json
import os
import subprocess
import sys
from pathlib import Path
def send(proc: subprocess.Popen, payload: dict) -> None:
proc.stdin.write(json.dumps(payload) + "\n")
proc.stdin.flush()
def recv(proc: subprocess.Popen) -> dict:
line = proc.stdout.readline()
if not line:
raise RuntimeError("EOF from MCP server")
return json.loads(line)
def call_tool(proc: subprocess.Popen, request_id: int, name: str, arguments: dict) -> dict:
send(
proc,
{
"jsonrpc": "2.0",
"id": request_id,
"method": "tools/call",
"params": {"name": name, "arguments": arguments},
},
)
return recv(proc)
def parse_text_result(response: dict) -> dict:
return json.loads(response["result"]["content"][0]["text"])
def main() -> int:
root = Path(__file__).resolve().parent.parent
env = os.environ.copy()
env["CARGO_TARGET_DIR"] = str(root / "target")
proc = subprocess.Popen(
["cargo", "run", "--quiet"],
cwd=str(root),
env=env,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
try:
send(
proc,
{
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {"name": "smoke-test", "version": "2.0.0"},
},
},
)
initialize = recv(proc)
send(proc, {"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}})
send(proc, {"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}})
tools_resp = recv(proc)
tools = [t.get("name") for t in tools_resp.get("result", {}).get("tools", [])]
send(proc, {"jsonrpc": "2.0", "id": 3, "method": "prompts/list", "params": {}})
prompts_resp = recv(proc)
prompts = [p.get("name") for p in prompts_resp.get("result", {}).get("prompts", [])]
v1_ok = call_tool(
proc,
4,
"sequentialthinking_tools",
{
"session_id": "smoke-v1",
"thought": "Check v1 path.",
"thought_number": 1,
"total_thoughts": 2,
"next_thought_needed": True,
"available_tools": ["rg"],
"recommended_tools": [{"tool_name": "rg"}],
},
)
v1_clear = call_tool(proc, 5, "clear_thinking_history", {"session_id": "smoke-v1"})
invalid = call_tool(
proc,
6,
"sequentialthinking_tools",
{
"session_id": "smoke-invalid",
"thought": "Invalid recommendation check.",
"thought_number": 1,
"total_thoughts": 1,
"next_thought_needed": False,
"available_tools": ["rg"],
"recommended_tools": [{"tool_name": "ReadFile"}],
},
)
invalid_hist = call_tool(
proc,
7,
"get_thinking_history",
{"session_id": "smoke-invalid"},
)
started = call_tool(
proc,
8,
"start_deliberation",
{
"session_id": "smoke-v2",
"mode": "tot",
"goal": "Design a safe implementation path",
"profile": "balanced",
"constraints": ["must be auditable", "must be incremental"],
},
)
started_payload = parse_text_result(started)
deliberation_id = started_payload["deliberation_id"]
exported_boot = call_tool(
proc,
9,
"export_reasoning_audit",
{"deliberation_id": deliberation_id},
)
export_boot_payload = parse_text_result(exported_boot)
frontier = export_boot_payload["payload"]["frontier"]
expanded = call_tool(
proc,
10,
"expand_thoughts",
{
"deliberation_id": deliberation_id,
"from_node_ids": frontier,
"strategy": "diverse",
"count": 3,
},
)
expanded_payload = parse_text_result(expanded)
node_ids = [n["node_id"] for n in expanded_payload["created_nodes"]]
scored = call_tool(
proc,
11,
"score_thoughts",
{"deliberation_id": deliberation_id, "node_ids": node_ids},
)
pruned = call_tool(
proc,
12,
"prune_thoughts",
{
"deliberation_id": deliberation_id,
"beam_width": 2,
"diversity_floor": 0.3,
},
)
pruned_payload = parse_text_result(pruned)
kept = pruned_payload["kept_node_ids"]
verified = call_tool(
proc,
13,
"verify_thoughts",
{
"deliberation_id": deliberation_id,
"node_ids": kept,
"method": "hybrid",
"claims": [
{
"text": "Implementation is auditable",
"critical": True,
"evidence": [
{
"source": "official-spec",
"tier": "tier1",
"independence_group": "spec",
"supports": True,
"contradictory": False,
"unambiguous": True,
}
],
},
{
"text": "All critical claims are verified",
"critical": True,
"evidence": [
{
"source": "audit-log",
"tier": "tier1",
"independence_group": "audit",
"supports": True,
"contradictory": False,
"unambiguous": True,
}
],
},
],
},
)
pipeline = call_tool(
proc,
14,
"run_reasonkit_pipeline",
{"deliberation_id": deliberation_id, "profile": "balanced"},
)
consensus = call_tool(
proc,
15,
"consensus_answer",
{"deliberation_id": deliberation_id, "method": "self_consistency"},
)
exported = call_tool(
proc,
16,
"export_reasoning_audit",
{"deliberation_id": deliberation_id, "include_raw_thoughts": True},
)
harden_start = call_tool(
proc,
19,
"start_deliberation",
{"session_id": "smoke-harden", "mode": "reasonkit", "goal": "hard fail check"},
)
harden_id = parse_text_result(harden_start)["deliberation_id"]
harden_boot = call_tool(
proc,
20,
"export_reasoning_audit",
{"deliberation_id": harden_id},
)
harden_frontier = parse_text_result(harden_boot)["payload"]["frontier"]
call_tool(
proc,
21,
"expand_thoughts",
{
"deliberation_id": harden_id,
"from_node_ids": harden_frontier,
"count": 1,
},
)
harden_verify = call_tool(
proc,
22,
"verify_thoughts",
{
"deliberation_id": harden_id,
"node_ids": harden_frontier,
"claims": [
{
"text": "Critical claim without enough independent evidence",
"critical": True,
"evidence": [
{
"source": "single-blog",
"tier": "tier2",
"independence_group": "blog-a",
"supports": True,
"contradictory": False,
"unambiguous": False,
}
],
}
],
},
)
harden_consensus = call_tool(
proc,
23,
"consensus_answer",
{"deliberation_id": harden_id},
)
harden_policy_set = call_tool(
proc,
24,
"set_verification_policy",
{
"deliberation_id": harden_id,
"verification_policy": {
"min_independent_groups": 1,
"allow_tier3_for_independence": True,
"require_tier1_unambiguous_for_critical": False,
"fail_closed_on_critical_unresolved": False,
},
},
)
harden_consensus_relaxed = call_tool(
proc,
25,
"consensus_answer",
{"deliberation_id": harden_id},
)
alias_set = call_tool(
proc,
26,
"set_reasoning_aliases",
{
"merge": True,
"aliases": {
"cot": ["rapid reasoning"],
"tot": [],
"got": [],
"verification": [],
"governance": [],
"auditability": [],
"cleanup": []
},
},
)
resources_list = call_tool(proc, 27, "list_reasoning_resources", {})
resources_read = call_tool(
proc,
28,
"read_reasoning_resource",
{"uri": f"reasoning://session/{deliberation_id}/frontier"},
)
aliases_resource = call_tool(
proc,
29,
"read_reasoning_resource",
{"uri": "reasoning://config/aliases"},
)
expected_tools = {
"sequentialthinking_tools",
"get_thinking_history",
"clear_thinking_history",
"start_deliberation",
"expand_thoughts",
"score_thoughts",
"prune_thoughts",
"verify_thoughts",
"consensus_answer",
"run_reasonkit_pipeline",
"export_reasoning_audit",
"list_reasoning_resources",
"read_reasoning_resource",
"set_verification_policy",
"set_reasoning_aliases",
"reasoning_autopilot",
}
expected_prompts = {
"sequential-thinking-guidance",
"tot-planner-guidance",
"diverse-lens-pack",
"verification-pack",
"triangulation-pack",
"decision-routing-pack",
}
invalid_payload = parse_text_result(invalid)
invalid_hist_payload = parse_text_result(invalid_hist)
scored_payload = parse_text_result(scored)
verified_payload = parse_text_result(verified)
pipeline_payload = parse_text_result(pipeline)
consensus_payload = parse_text_result(consensus)
export_payload = parse_text_result(exported)
harden_verify_payload = parse_text_result(harden_verify)
ok = (
"result" in initialize
and expected_tools.issubset(set(tools))
and expected_prompts.issubset(set(prompts))
and not v1_ok.get("result", {}).get("isError", False)
and not v1_clear.get("result", {}).get("isError", False)
and invalid.get("result", {}).get("isError", False)
and invalid_payload.get("history_length") == 0
and len(invalid_hist_payload.get("thoughts", [])) == 0
and len(scored_payload.get("scores", [])) > 0
and "claim_status_summary" in verified_payload
and "route_decision" in pipeline_payload
and "route_decision" in consensus_payload
and "payload" in export_payload
and "resources" in parse_text_result(resources_list)
and "frontier" in parse_text_result(resources_read)
and harden_verify_payload["claim_status_summary"]["data_deficit"] >= 1
and harden_consensus.get("result", {}).get("isError", False)
and not harden_policy_set.get("result", {}).get("isError", False)
and not alias_set.get("result", {}).get("isError", False)
and not harden_consensus_relaxed.get("result", {}).get("isError", False)
and "aliases" in parse_text_result(aliases_resource)
)
summary = {
"initialize_ok": "result" in initialize,
"tools": tools,
"prompts": prompts,
"v1_ok": not v1_ok.get("result", {}).get("isError", False),
"invalid_v1_error": invalid.get("result", {}).get("isError"),
"v2_deliberation_id": deliberation_id,
"v2_scored_nodes": len(scored_payload.get("scores", [])),
"v2_verified": verified_payload.get("claim_status_summary"),
"v2_pipeline_route": pipeline_payload.get("route_decision"),
"v2_consensus_route": consensus_payload.get("route_decision"),
"harden_data_deficit": harden_verify_payload["claim_status_summary"]["data_deficit"],
"harden_consensus_error": harden_consensus.get("result", {}).get("isError"),
"harden_policy_set": not harden_policy_set.get("result", {}).get("isError", False),
"aliases_set": not alias_set.get("result", {}).get("isError", False),
"harden_relaxed_consensus_error": harden_consensus_relaxed.get("result", {}).get("isError"),
"harden_relaxed_consensus_payload": harden_consensus_relaxed.get("result", {}),
"status": "ok" if ok else "failed",
}
print(json.dumps(summary, indent=2))
return 0 if ok else 1
except Exception as exc: stderr = ""
try:
stderr = proc.stderr.read()
except Exception:
stderr = "<failed to read stderr>"
print(
json.dumps(
{
"status": "failed",
"error": str(exc),
"server_stderr": stderr,
},
indent=2,
),
file=sys.stderr,
)
return 1
finally:
proc.terminate()
try:
proc.wait(timeout=3)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait(timeout=2)
if __name__ == "__main__":
sys.exit(main())