import argparse
import json
import os
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
VECTORS_DIR = Path(__file__).parent.parent / "test-vectors"
PROFILES = {
"identity": {"level": "L1", "file": "identity.json", "required": True},
"signing": {"level": "L2", "file": "signing.json", "required": True},
"policy": {"level": "L1", "file": "policy-enforcement.json", "required": True},
}
CONFORMANCE_LEVELS = {
"L1": {"requires": ["identity", "policy"], "label": "Core"},
"L2": {"requires": ["identity", "policy", "signing"], "label": "Authenticated"},
}
@dataclass
class TestResult:
profile: str
vector_id: str
passed: bool
expected: str = ""
actual: str = ""
error: Optional[str] = None
@dataclass
class ProfileResult:
profile: str
total: int = 0
passed: int = 0
failed: int = 0
errors: int = 0
results: list = field(default_factory=list)
@property
def all_passed(self) -> bool:
return self.passed == self.total and self.errors == 0
def send_command(proc: subprocess.Popen, command: dict) -> dict:
line = json.dumps(command) + "\n"
proc.stdin.write(line)
proc.stdin.flush()
response_line = proc.stdout.readline()
if not response_line:
return {"error": "adapter returned empty response"}
try:
return json.loads(response_line.strip())
except json.JSONDecodeError as e:
return {"error": f"invalid JSON from adapter: {e}", "raw": response_line.strip()}
def run_identity_vectors(proc: subprocess.Popen, verbose: bool) -> ProfileResult:
vectors_file = VECTORS_DIR / "identity.json"
if not vectors_file.exists():
return ProfileResult(profile="identity", errors=1)
data = json.loads(vectors_file.read_text())
vectors = data["vectors"]
result = ProfileResult(profile="identity", total=len(vectors))
for v in vectors:
vid = v["name"]
inp = v["input"]
expected = v["expected"]
response = send_command(proc, {
"operation": "calculate_sekuire_id",
"params": {
"model": inp["model"],
"systemPrompt": inp["systemPrompt"],
"tools": inp["tools"],
},
})
if response.get("error"):
tr = TestResult(
profile="identity", vector_id=vid, passed=False,
expected=expected, error=response["error"],
)
result.errors += 1
elif response.get("result") == expected:
tr = TestResult(
profile="identity", vector_id=vid, passed=True,
expected=expected, actual=response["result"],
)
result.passed += 1
else:
tr = TestResult(
profile="identity", vector_id=vid, passed=False,
expected=expected, actual=response.get("result", ""),
)
result.failed += 1
result.results.append(tr)
if verbose:
status = "PASS" if tr.passed else "FAIL"
print(f" [{status}] {vid}", file=sys.stderr)
if not tr.passed:
print(f" expected: {tr.expected}", file=sys.stderr)
print(f" actual: {tr.actual or tr.error}", file=sys.stderr)
return result
def run_signing_vectors(proc: subprocess.Popen, verbose: bool) -> ProfileResult:
vectors_file = VECTORS_DIR / "signing.json"
if not vectors_file.exists():
return ProfileResult(profile="signing", errors=1)
data = json.loads(vectors_file.read_text())
secret_key = data["keyPair"]["secretKey"]
public_key = data["keyPair"]["publicKey"]
vectors = data["vectors"]
result = ProfileResult(profile="signing", total=len(vectors) * 2)
for v in vectors:
vid = v["name"]
message = v["message"]
expected_sig = v["signature"]
sign_resp = send_command(proc, {
"operation": "sign",
"params": {"message": message, "secretKey": secret_key},
})
if sign_resp.get("error"):
tr = TestResult(
profile="signing", vector_id=f"{vid}/sign", passed=False,
expected=expected_sig, error=sign_resp["error"],
)
result.errors += 1
elif sign_resp.get("result") == expected_sig:
tr = TestResult(
profile="signing", vector_id=f"{vid}/sign", passed=True,
expected=expected_sig, actual=sign_resp["result"],
)
result.passed += 1
else:
tr = TestResult(
profile="signing", vector_id=f"{vid}/sign", passed=False,
expected=expected_sig, actual=sign_resp.get("result", ""),
)
result.failed += 1
result.results.append(tr)
if verbose:
status = "PASS" if tr.passed else "FAIL"
print(f" [{status}] {vid}/sign", file=sys.stderr)
verify_resp = send_command(proc, {
"operation": "verify",
"params": {
"message": message,
"signature": expected_sig,
"publicKey": public_key,
},
})
if verify_resp.get("error"):
tr = TestResult(
profile="signing", vector_id=f"{vid}/verify", passed=False,
expected="true", error=verify_resp["error"],
)
result.errors += 1
elif verify_resp.get("result") is True:
tr = TestResult(
profile="signing", vector_id=f"{vid}/verify", passed=True,
expected="true", actual="true",
)
result.passed += 1
else:
tr = TestResult(
profile="signing", vector_id=f"{vid}/verify", passed=False,
expected="true", actual=str(verify_resp.get("result", "")),
)
result.failed += 1
result.results.append(tr)
if verbose:
status = "PASS" if tr.passed else "FAIL"
print(f" [{status}] {vid}/verify", file=sys.stderr)
return result
def run_policy_vectors(proc: subprocess.Popen, verbose: bool) -> ProfileResult:
vectors_file = VECTORS_DIR / "policy-enforcement.json"
if not vectors_file.exists():
return ProfileResult(profile="policy", errors=1)
data = json.loads(vectors_file.read_text())
policy = data["policy"]
vectors = data["vectors"]
result = ProfileResult(profile="policy", total=len(vectors))
for v in vectors:
vid = v["name"]
action = v["action"]
expected_decision = v["expected"]["decision"]
response = send_command(proc, {
"operation": "enforce_policy",
"params": {"policy": policy, "action": action},
})
if response.get("error"):
tr = TestResult(
profile="policy", vector_id=vid, passed=False,
expected=expected_decision, error=response["error"],
)
result.errors += 1
elif response.get("result") == expected_decision:
tr = TestResult(
profile="policy", vector_id=vid, passed=True,
expected=expected_decision, actual=response["result"],
)
result.passed += 1
else:
tr = TestResult(
profile="policy", vector_id=vid, passed=False,
expected=expected_decision, actual=response.get("result", ""),
)
result.failed += 1
result.results.append(tr)
if verbose:
status = "PASS" if tr.passed else "FAIL"
print(f" [{status}] {vid}", file=sys.stderr)
if not tr.passed:
print(f" expected: {tr.expected}", file=sys.stderr)
print(f" actual: {tr.actual or tr.error}", file=sys.stderr)
return result
PROFILE_RUNNERS = {
"identity": run_identity_vectors,
"signing": run_signing_vectors,
"policy": run_policy_vectors,
}
def determine_conformance_level(profile_results: dict[str, ProfileResult]) -> str:
for level in ["L2", "L1"]:
required = CONFORMANCE_LEVELS[level]["requires"]
if all(
p in profile_results and profile_results[p].all_passed
for p in required
):
return level
return "none"
def main():
parser = argparse.ArgumentParser(description="OAG Conformance Harness Runner")
parser.add_argument("--adapter", required=True, help="Path to adapter script")
parser.add_argument(
"--profile", default="all",
choices=["all", "identity", "signing", "policy"],
help="Which profile to test (default: all)",
)
parser.add_argument("--verbose", "-v", action="store_true", help="Show per-vector results")
parser.add_argument("--json", action="store_true", help="Output results as JSON")
args = parser.parse_args()
adapter_path = os.path.abspath(args.adapter)
if not os.path.isfile(adapter_path):
print(f"Error: adapter not found: {adapter_path}", file=sys.stderr)
sys.exit(1)
profiles_to_run = list(PROFILES.keys()) if args.profile == "all" else [args.profile]
proc = subprocess.Popen(
[adapter_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE if not args.verbose else None,
text=True,
bufsize=1,
)
profile_results: dict[str, ProfileResult] = {}
total_passed = 0
total_tests = 0
try:
for profile_name in profiles_to_run:
if args.verbose:
print(f"\n--- {profile_name} ---", file=sys.stderr)
runner = PROFILE_RUNNERS[profile_name]
pr = runner(proc, args.verbose)
profile_results[profile_name] = pr
total_passed += pr.passed
total_tests += pr.total
send_command(proc, {"operation": "exit", "params": {}})
except (BrokenPipeError, OSError):
pass
finally:
proc.terminate()
proc.wait(timeout=5)
level = determine_conformance_level(profile_results)
level_label = CONFORMANCE_LEVELS.get(level, {}).get("label", "None")
if args.json:
output = {
"conformanceLevel": level,
"conformanceLevelLabel": level_label,
"totalPassed": total_passed,
"totalTests": total_tests,
"profiles": {},
}
for name, pr in profile_results.items():
output["profiles"][name] = {
"total": pr.total,
"passed": pr.passed,
"failed": pr.failed,
"errors": pr.errors,
"allPassed": pr.all_passed,
}
print(json.dumps(output, indent=2))
else:
print(f"\n{'='*50}")
print(f"OAG Conformance Results")
print(f"{'='*50}")
for name, pr in profile_results.items():
status = "PASS" if pr.all_passed else "FAIL"
print(f" {name:20s} {pr.passed}/{pr.total} [{status}]")
print(f"{'='*50}")
print(f" {'Total':20s} {total_passed}/{total_tests}")
print(f" Conformance Level: {level} ({level_label})")
print(f"{'='*50}")
sys.exit(0 if total_passed == total_tests else 1)
if __name__ == "__main__":
main()