dist_agent_lang 1.0.24

Agentic programming with library and CLI support for Off/On-chain network integration
Documentation
#!/usr/bin/env python3
"""
Generate and validate Trust-Split EVM audit evidence bundles.

This script supports two commands:
- generate: assemble a bundle directory and write bundle.manifest.json
- validate: fail-closed validation of manifest shape, required files, and checksums
"""

from __future__ import annotations

import argparse
import hashlib
import json
import os
import shutil
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List


REQUIRED_TOP_LEVEL_KEYS = {
    "release_tag",
    "git_commit",
    "generated_at_utc",
    "ci",
    "toolchain",
    "evidence",
    "checksums",
    "signoff",
}


def utc_now_rfc3339() -> str:
    return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")


def sha256_file(path: Path) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        for chunk in iter(lambda: f.read(1024 * 1024), b""):
            h.update(chunk)
    return h.hexdigest()


def run_cmd(cmd: List[str]) -> str:
    return subprocess.check_output(cmd, text=True).strip()


def copy_required(src: Path, dst: Path) -> None:
    if not src.exists():
        raise FileNotFoundError(f"Required source file missing: {src}")
    dst.parent.mkdir(parents=True, exist_ok=True)
    shutil.copy2(src, dst)


def build_manifest(bundle_dir: Path, args: argparse.Namespace) -> Dict:
    repo_root = Path(__file__).resolve().parents[2]
    trust_logs = repo_root / ".artifacts" / "trust-split-fast"

    # Required source logs generated by trust-split-fast job.
    policy_logs = [
        trust_logs / "trust_split_validation_tests.log",
        trust_logs / "strict_call_requires_calldata.log",
        trust_logs / "strict_deploy_requires_signed_tx.log",
    ]
    correctness_logs = [
        trust_logs / "abi_golden_vectors.log",
        trust_logs / "decentralized_v1_non_stub_codegen.log",
        trust_logs / "h4_deploy_provenance_key_contract.log",
        trust_logs / "h4_call_provenance_key_contract.log",
    ]

    # H6 baseline docs treated as required evidence artifacts.
    release_docs = [
        repo_root
        / "docs"
        / "release"
        / "TRUST_SPLIT_EVM_AUDIT_EVIDENCE_BUNDLE_SPEC.md",
        repo_root
        / "docs"
        / "release"
        / "TRUST_SPLIT_EVM_THREAT_MODEL_TRACEABILITY_MATRIX.md",
        repo_root
        / "docs"
        / "release"
        / "TRUST_SPLIT_EVM_RELEASE_RUNBOOK_AND_ROLLBACK_DRILL.md",
    ]

    # Bundle destination layout
    policy_dir = bundle_dir / "policy"
    correctness_dir = bundle_dir / "correctness"
    security_dir = bundle_dir / "security"
    operations_dir = bundle_dir / "operations"
    deployment_dir = bundle_dir / "deployment"
    for d in [policy_dir, correctness_dir, security_dir, operations_dir, deployment_dir]:
        d.mkdir(parents=True, exist_ok=True)

    evidence_entries = {
        "policy": [],
        "correctness": [],
        "security": [],
        "deployment": [],
        "operations": [],
    }

    # Copy logs
    for src in policy_logs:
        dst = policy_dir / src.name
        copy_required(src, dst)
        evidence_entries["policy"].append(
            {"path": str(dst.relative_to(bundle_dir)), "description": src.name, "required": True}
        )

    for src in correctness_logs:
        dst = correctness_dir / src.name
        copy_required(src, dst)
        evidence_entries["correctness"].append(
            {"path": str(dst.relative_to(bundle_dir)), "description": src.name, "required": True}
        )

    # Copy release docs
    spec_dst = operations_dir / release_docs[0].name
    matrix_dst = security_dir / release_docs[1].name
    runbook_dst = operations_dir / release_docs[2].name
    copy_required(release_docs[0], spec_dst)
    copy_required(release_docs[1], matrix_dst)
    copy_required(release_docs[2], runbook_dst)

    evidence_entries["security"].append(
        {
            "path": str(matrix_dst.relative_to(bundle_dir)),
            "description": "Threat-model traceability matrix",
            "required": True,
        }
    )
    evidence_entries["operations"].append(
        {
            "path": str(spec_dst.relative_to(bundle_dir)),
            "description": "Audit evidence bundle specification",
            "required": True,
        }
    )
    evidence_entries["operations"].append(
        {
            "path": str(runbook_dst.relative_to(bundle_dir)),
            "description": "Release runbook and rollback drill",
            "required": True,
        }
    )

    # Deployment metadata artifact
    deployment_metadata = {
        "git_commit": args.git_commit,
        "release_tag": args.release_tag,
        "ci_provider": args.ci_provider,
        "workflow": args.workflow,
        "run_id": args.run_id,
        "generated_at_utc": utc_now_rfc3339(),
        "rust_version": args.rust_version,
        "solc_version": args.solc_version,
        "dal_version": args.dal_version,
    }
    deploy_meta_path = deployment_dir / "deployment_metadata.json"
    deploy_meta_path.write_text(json.dumps(deployment_metadata, indent=2) + "\n", encoding="utf-8")
    evidence_entries["deployment"].append(
        {
            "path": str(deploy_meta_path.relative_to(bundle_dir)),
            "description": "Release metadata and toolchain versions",
            "required": True,
        }
    )

    checksums = []
    for category in evidence_entries.values():
        for entry in category:
            p = bundle_dir / entry["path"]
            checksums.append({"path": entry["path"], "sha256": sha256_file(p)})

    manifest = {
        "release_tag": args.release_tag,
        "git_commit": args.git_commit,
        "generated_at_utc": utc_now_rfc3339(),
        "ci": {"provider": args.ci_provider, "workflow": args.workflow, "run_id": args.run_id},
        "toolchain": {
            "rust_version": args.rust_version,
            "solc_version": args.solc_version,
            "dal_version": args.dal_version,
        },
        "evidence": evidence_entries,
        "checksums": checksums,
        "signoff": {
            "security_reviewer": args.security_reviewer,
            "runtime_owner": args.runtime_owner,
            "release_manager": args.release_manager,
            "approved_at_utc": utc_now_rfc3339(),
        },
    }
    return manifest


def validate_manifest(bundle_dir: Path, manifest: Dict) -> None:
    missing = REQUIRED_TOP_LEVEL_KEYS - set(manifest.keys())
    if missing:
        raise ValueError(f"Manifest missing required top-level keys: {sorted(missing)}")

    for key in ["provider", "workflow", "run_id"]:
        if key not in manifest["ci"] or not str(manifest["ci"][key]).strip():
            raise ValueError(f"Manifest ci.{key} is required")

    for key in ["rust_version", "solc_version", "dal_version"]:
        if key not in manifest["toolchain"] or not str(manifest["toolchain"][key]).strip():
            raise ValueError(f"Manifest toolchain.{key} is required")

    for key in ["security_reviewer", "runtime_owner", "release_manager", "approved_at_utc"]:
        if key not in manifest["signoff"] or not str(manifest["signoff"][key]).strip():
            raise ValueError(f"Manifest signoff.{key} is required")

    checksum_by_path = {item["path"]: item["sha256"] for item in manifest["checksums"]}
    if not checksum_by_path:
        raise ValueError("Manifest checksums cannot be empty")

    required_categories = ["policy", "correctness", "security", "deployment", "operations"]
    evidence = manifest.get("evidence", {})
    for cat in required_categories:
        entries = evidence.get(cat, [])
        if not entries:
            raise ValueError(f"Required evidence category '{cat}' is empty")
        for entry in entries:
            if not entry.get("required", False):
                continue
            rel_path = entry.get("path")
            if not rel_path:
                raise ValueError(f"Required evidence entry in '{cat}' missing path")
            file_path = bundle_dir / rel_path
            if not file_path.exists():
                raise ValueError(f"Required evidence file missing: {rel_path}")
            if rel_path not in checksum_by_path:
                raise ValueError(f"Checksum missing for required file: {rel_path}")
            actual = sha256_file(file_path)
            expected = checksum_by_path[rel_path]
            if actual != expected:
                raise ValueError(
                    f"Checksum mismatch for {rel_path}: expected {expected}, got {actual}"
                )


def main() -> int:
    parser = argparse.ArgumentParser(description="Audit bundle generator/validator")
    subparsers = parser.add_subparsers(dest="command", required=True)

    gen = subparsers.add_parser("generate", help="Generate audit bundle and manifest")
    gen.add_argument("--bundle-dir", required=True)
    gen.add_argument("--release-tag", default="ci")
    gen.add_argument("--git-commit", default=os.getenv("GITHUB_SHA", "").strip())
    gen.add_argument("--run-id", default=os.getenv("GITHUB_RUN_ID", "local"))
    gen.add_argument("--ci-provider", default="github-actions")
    gen.add_argument("--workflow", default="ci.yml")
    gen.add_argument("--rust-version", default="")
    gen.add_argument("--solc-version", default="")
    gen.add_argument("--dal-version", default="dev")
    gen.add_argument("--security-reviewer", default="ci-automation")
    gen.add_argument("--runtime-owner", default="ci-automation")
    gen.add_argument("--release-manager", default="ci-automation")

    val = subparsers.add_parser("validate", help="Validate audit bundle manifest fail-closed")
    val.add_argument("--bundle-dir", required=True)

    args = parser.parse_args()
    bundle_dir = Path(args.bundle_dir).resolve()
    bundle_dir.mkdir(parents=True, exist_ok=True)

    if args.command == "generate":
        if not args.git_commit:
            try:
                args.git_commit = run_cmd(["git", "rev-parse", "HEAD"])
            except Exception as e:
                raise RuntimeError(f"Unable to resolve git commit SHA: {e}")
        if not args.rust_version:
            args.rust_version = run_cmd(["rustc", "--version"])
        if not args.solc_version:
            try:
                args.solc_version = run_cmd(["solc", "--version"]).splitlines()[0]
            except Exception:
                args.solc_version = "unavailable"

        manifest = build_manifest(bundle_dir, args)
        manifest_path = bundle_dir / "bundle.manifest.json"
        manifest_path.write_text(json.dumps(manifest, indent=2) + "\n", encoding="utf-8")
        print(f"Generated manifest: {manifest_path}")
        return 0

    manifest_path = bundle_dir / "bundle.manifest.json"
    if not manifest_path.exists():
        raise FileNotFoundError(f"Missing manifest file: {manifest_path}")
    manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
    validate_manifest(bundle_dir, manifest)
    print(f"Audit bundle validation passed: {manifest_path}")
    return 0


if __name__ == "__main__":
    try:
        raise SystemExit(main())
    except Exception as exc:
        print(f"ERROR: {exc}", file=sys.stderr)
        raise SystemExit(1)