sekuire-protocol 0.1.1

Shared protocol types for the Sekuire Agent Identity Protocol
Documentation
#!/usr/bin/env python3
"""
OAG Conformance Harness Runner

Language-agnostic conformance test runner for the Open Agent Governance
Specification. Communicates with SDK adapters via stdin/stdout JSON-line protocol.

Usage:
    python3 runner.py --adapter ./adapters/typescript.sh
    python3 runner.py --adapter ./adapters/rust.sh --profile identity
    python3 runner.py --adapter ./adapters/python.sh --profile all --verbose
"""

import argparse
import json
import os
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional

VECTORS_DIR = Path(__file__).parent.parent / "test-vectors"

PROFILES = {
    "identity": {"level": "L1", "file": "identity.json", "required": True},
    "signing": {"level": "L2", "file": "signing.json", "required": True},
    "policy": {"level": "L1", "file": "policy-enforcement.json", "required": True},
}

CONFORMANCE_LEVELS = {
    "L1": {"requires": ["identity", "policy"], "label": "Core"},
    "L2": {"requires": ["identity", "policy", "signing"], "label": "Authenticated"},
}


@dataclass
class TestResult:
    profile: str
    vector_id: str
    passed: bool
    expected: str = ""
    actual: str = ""
    error: Optional[str] = None


@dataclass
class ProfileResult:
    profile: str
    total: int = 0
    passed: int = 0
    failed: int = 0
    errors: int = 0
    results: list = field(default_factory=list)

    @property
    def all_passed(self) -> bool:
        return self.passed == self.total and self.errors == 0


def send_command(proc: subprocess.Popen, command: dict) -> dict:
    line = json.dumps(command) + "\n"
    proc.stdin.write(line)
    proc.stdin.flush()

    response_line = proc.stdout.readline()
    if not response_line:
        return {"error": "adapter returned empty response"}

    try:
        return json.loads(response_line.strip())
    except json.JSONDecodeError as e:
        return {"error": f"invalid JSON from adapter: {e}", "raw": response_line.strip()}


def run_identity_vectors(proc: subprocess.Popen, verbose: bool) -> ProfileResult:
    vectors_file = VECTORS_DIR / "identity.json"
    if not vectors_file.exists():
        return ProfileResult(profile="identity", errors=1)

    data = json.loads(vectors_file.read_text())
    vectors = data["vectors"]
    result = ProfileResult(profile="identity", total=len(vectors))

    for v in vectors:
        vid = v["name"]
        inp = v["input"]
        expected = v["expected"]

        response = send_command(proc, {
            "operation": "calculate_sekuire_id",
            "params": {
                "model": inp["model"],
                "systemPrompt": inp["systemPrompt"],
                "tools": inp["tools"],
            },
        })

        if response.get("error"):
            tr = TestResult(
                profile="identity", vector_id=vid, passed=False,
                expected=expected, error=response["error"],
            )
            result.errors += 1
        elif response.get("result") == expected:
            tr = TestResult(
                profile="identity", vector_id=vid, passed=True,
                expected=expected, actual=response["result"],
            )
            result.passed += 1
        else:
            tr = TestResult(
                profile="identity", vector_id=vid, passed=False,
                expected=expected, actual=response.get("result", ""),
            )
            result.failed += 1

        result.results.append(tr)
        if verbose:
            status = "PASS" if tr.passed else "FAIL"
            print(f"  [{status}] {vid}", file=sys.stderr)
            if not tr.passed:
                print(f"    expected: {tr.expected}", file=sys.stderr)
                print(f"    actual:   {tr.actual or tr.error}", file=sys.stderr)

    return result


def run_signing_vectors(proc: subprocess.Popen, verbose: bool) -> ProfileResult:
    vectors_file = VECTORS_DIR / "signing.json"
    if not vectors_file.exists():
        return ProfileResult(profile="signing", errors=1)

    data = json.loads(vectors_file.read_text())
    secret_key = data["keyPair"]["secretKey"]
    public_key = data["keyPair"]["publicKey"]
    vectors = data["vectors"]
    result = ProfileResult(profile="signing", total=len(vectors) * 2)

    for v in vectors:
        vid = v["name"]
        message = v["message"]
        expected_sig = v["signature"]

        sign_resp = send_command(proc, {
            "operation": "sign",
            "params": {"message": message, "secretKey": secret_key},
        })

        if sign_resp.get("error"):
            tr = TestResult(
                profile="signing", vector_id=f"{vid}/sign", passed=False,
                expected=expected_sig, error=sign_resp["error"],
            )
            result.errors += 1
        elif sign_resp.get("result") == expected_sig:
            tr = TestResult(
                profile="signing", vector_id=f"{vid}/sign", passed=True,
                expected=expected_sig, actual=sign_resp["result"],
            )
            result.passed += 1
        else:
            tr = TestResult(
                profile="signing", vector_id=f"{vid}/sign", passed=False,
                expected=expected_sig, actual=sign_resp.get("result", ""),
            )
            result.failed += 1

        result.results.append(tr)
        if verbose:
            status = "PASS" if tr.passed else "FAIL"
            print(f"  [{status}] {vid}/sign", file=sys.stderr)

        verify_resp = send_command(proc, {
            "operation": "verify",
            "params": {
                "message": message,
                "signature": expected_sig,
                "publicKey": public_key,
            },
        })

        if verify_resp.get("error"):
            tr = TestResult(
                profile="signing", vector_id=f"{vid}/verify", passed=False,
                expected="true", error=verify_resp["error"],
            )
            result.errors += 1
        elif verify_resp.get("result") is True:
            tr = TestResult(
                profile="signing", vector_id=f"{vid}/verify", passed=True,
                expected="true", actual="true",
            )
            result.passed += 1
        else:
            tr = TestResult(
                profile="signing", vector_id=f"{vid}/verify", passed=False,
                expected="true", actual=str(verify_resp.get("result", "")),
            )
            result.failed += 1

        result.results.append(tr)
        if verbose:
            status = "PASS" if tr.passed else "FAIL"
            print(f"  [{status}] {vid}/verify", file=sys.stderr)

    return result


def run_policy_vectors(proc: subprocess.Popen, verbose: bool) -> ProfileResult:
    vectors_file = VECTORS_DIR / "policy-enforcement.json"
    if not vectors_file.exists():
        return ProfileResult(profile="policy", errors=1)

    data = json.loads(vectors_file.read_text())
    policy = data["policy"]
    vectors = data["vectors"]
    result = ProfileResult(profile="policy", total=len(vectors))

    for v in vectors:
        vid = v["name"]
        action = v["action"]
        expected_decision = v["expected"]["decision"]

        response = send_command(proc, {
            "operation": "enforce_policy",
            "params": {"policy": policy, "action": action},
        })

        if response.get("error"):
            tr = TestResult(
                profile="policy", vector_id=vid, passed=False,
                expected=expected_decision, error=response["error"],
            )
            result.errors += 1
        elif response.get("result") == expected_decision:
            tr = TestResult(
                profile="policy", vector_id=vid, passed=True,
                expected=expected_decision, actual=response["result"],
            )
            result.passed += 1
        else:
            tr = TestResult(
                profile="policy", vector_id=vid, passed=False,
                expected=expected_decision, actual=response.get("result", ""),
            )
            result.failed += 1

        result.results.append(tr)
        if verbose:
            status = "PASS" if tr.passed else "FAIL"
            print(f"  [{status}] {vid}", file=sys.stderr)
            if not tr.passed:
                print(f"    expected: {tr.expected}", file=sys.stderr)
                print(f"    actual:   {tr.actual or tr.error}", file=sys.stderr)

    return result


PROFILE_RUNNERS = {
    "identity": run_identity_vectors,
    "signing": run_signing_vectors,
    "policy": run_policy_vectors,
}


def determine_conformance_level(profile_results: dict[str, ProfileResult]) -> str:
    for level in ["L2", "L1"]:
        required = CONFORMANCE_LEVELS[level]["requires"]
        if all(
            p in profile_results and profile_results[p].all_passed
            for p in required
        ):
            return level
    return "none"


def main():
    parser = argparse.ArgumentParser(description="OAG Conformance Harness Runner")
    parser.add_argument("--adapter", required=True, help="Path to adapter script")
    parser.add_argument(
        "--profile", default="all",
        choices=["all", "identity", "signing", "policy"],
        help="Which profile to test (default: all)",
    )
    parser.add_argument("--verbose", "-v", action="store_true", help="Show per-vector results")
    parser.add_argument("--json", action="store_true", help="Output results as JSON")
    args = parser.parse_args()

    adapter_path = os.path.abspath(args.adapter)
    if not os.path.isfile(adapter_path):
        print(f"Error: adapter not found: {adapter_path}", file=sys.stderr)
        sys.exit(1)

    profiles_to_run = list(PROFILES.keys()) if args.profile == "all" else [args.profile]

    proc = subprocess.Popen(
        [adapter_path],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE if not args.verbose else None,
        text=True,
        bufsize=1,
    )

    profile_results: dict[str, ProfileResult] = {}
    total_passed = 0
    total_tests = 0

    try:
        for profile_name in profiles_to_run:
            if args.verbose:
                print(f"\n--- {profile_name} ---", file=sys.stderr)

            runner = PROFILE_RUNNERS[profile_name]
            pr = runner(proc, args.verbose)
            profile_results[profile_name] = pr
            total_passed += pr.passed
            total_tests += pr.total

        send_command(proc, {"operation": "exit", "params": {}})
    except (BrokenPipeError, OSError):
        pass
    finally:
        proc.terminate()
        proc.wait(timeout=5)

    level = determine_conformance_level(profile_results)
    level_label = CONFORMANCE_LEVELS.get(level, {}).get("label", "None")

    if args.json:
        output = {
            "conformanceLevel": level,
            "conformanceLevelLabel": level_label,
            "totalPassed": total_passed,
            "totalTests": total_tests,
            "profiles": {},
        }
        for name, pr in profile_results.items():
            output["profiles"][name] = {
                "total": pr.total,
                "passed": pr.passed,
                "failed": pr.failed,
                "errors": pr.errors,
                "allPassed": pr.all_passed,
            }
        print(json.dumps(output, indent=2))
    else:
        print(f"\n{'='*50}")
        print(f"OAG Conformance Results")
        print(f"{'='*50}")
        for name, pr in profile_results.items():
            status = "PASS" if pr.all_passed else "FAIL"
            print(f"  {name:20s} {pr.passed}/{pr.total} [{status}]")
        print(f"{'='*50}")
        print(f"  {'Total':20s} {total_passed}/{total_tests}")
        print(f"  Conformance Level:  {level} ({level_label})")
        print(f"{'='*50}")

    sys.exit(0 if total_passed == total_tests else 1)


if __name__ == "__main__":
    main()