from __future__ import annotations
import json
import os
import subprocess
import sys
import tempfile
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
SANITIZER = REPO_ROOT / "scripts" / "sanitize-live-evidence.py"
def _run(args: list[str]) -> subprocess.CompletedProcess:
return subprocess.run(
[sys.executable, str(SANITIZER), *args],
capture_output=True,
text=True,
check=False,
)
def case_known_fields_are_redacted() -> None:
with tempfile.TemporaryDirectory() as cap, tempfile.TemporaryDirectory() as out:
capdir = Path(cap)
outdir = Path(out)
raw = {
"hook_event_name": "UserPromptSubmit",
"session_id": "abc-123-def",
"transcript_path": "/Users/operator/transcripts/x.json",
"cwd": "/Users/operator/repo",
"prompt": "sensitive user prompt body",
"permission_mode": "plan",
}
(capdir / "0001-UserPromptSubmit.in.json").write_text(json.dumps(raw))
result = _run([
"--harness", "claude",
"--capture-dir", str(capdir),
"--output-dir", str(outdir),
])
assert result.returncode == 0, result.stderr
sanitized = json.loads((outdir / "claude" / "UserPromptSubmit" / "0.json").read_text())
expected = {
"hook_event_name": "UserPromptSubmit",
"session_id": "<SESSION_ID>",
"transcript_path": "<TRANSCRIPT_PATH>",
"cwd": "<REPO_ROOT>",
"prompt": "<USER_PROMPT_BODY>",
"permission_mode": "plan",
}
assert sanitized == expected, f"unexpected sanitized output: {sanitized}"
def case_unknown_field_is_denied() -> None:
with tempfile.TemporaryDirectory() as cap, tempfile.TemporaryDirectory() as out:
capdir = Path(cap)
outdir = Path(out)
raw = {
"hook_event_name": "SessionStart",
"session_id": "abc",
"transcript_path": "/x",
"cwd": "/x",
"source": "startup",
"fancy_new_field": "oops",
}
(capdir / "0002-SessionStart.in.json").write_text(json.dumps(raw))
result = _run([
"--harness", "claude",
"--capture-dir", str(capdir),
"--output-dir", str(outdir),
])
assert result.returncode == 1, f"expected rc=1, got {result.returncode}"
assert "fancy_new_field" in result.stderr, result.stderr
assert "deny-by-default" in result.stderr, result.stderr
def case_dry_run_does_not_write() -> None:
with tempfile.TemporaryDirectory() as cap, tempfile.TemporaryDirectory() as out:
capdir = Path(cap)
outdir = Path(out)
raw = {
"hook_event_name": "Stop",
"session_id": "abc",
"transcript_path": "/x",
"cwd": "/x",
"stop_hook_active": True,
}
(capdir / "0003-Stop.in.json").write_text(json.dumps(raw))
result = _run([
"--harness", "claude",
"--capture-dir", str(capdir),
"--output-dir", str(outdir),
"--dry-run",
])
assert result.returncode == 0, result.stderr
assert not (outdir / "claude" / "Stop").exists(), "dry-run must not write"
assert "dry-run" in result.stderr, result.stderr
def case_empty_capture_dir_is_a_distinct_error() -> None:
with tempfile.TemporaryDirectory() as cap, tempfile.TemporaryDirectory() as out:
result = _run([
"--harness", "claude",
"--capture-dir", cap,
"--output-dir", out,
])
assert result.returncode == 65, f"expected rc=65, got {result.returncode}"
def case_codex_harness_strips_sessions() -> None:
with tempfile.TemporaryDirectory() as cap, tempfile.TemporaryDirectory() as out:
capdir = Path(cap)
outdir = Path(out)
raw = {
"hook_event_name": "UserPromptSubmit",
"session_id": "ses-9001",
"cwd": "/Users/operator/repo",
"prompt": "secret",
}
(capdir / "0004-UserPromptSubmit.in.json").write_text(json.dumps(raw))
result = _run([
"--harness", "codex",
"--capture-dir", str(capdir),
"--output-dir", str(outdir),
])
assert result.returncode == 0, result.stderr
sanitized = json.loads((outdir / "codex" / "UserPromptSubmit" / "0.json").read_text())
assert sanitized["session_id"] == "<SESSION_ID>"
assert sanitized["cwd"] == "<REPO_ROOT>"
assert sanitized["prompt"] == "<USER_PROMPT_BODY>"
def case_failure_preserves_existing_fixtures_atomically() -> None:
with tempfile.TemporaryDirectory() as cap, tempfile.TemporaryDirectory() as out:
capdir = Path(cap)
outdir = Path(out)
existing_event_dir = outdir / "claude" / "SessionStart"
existing_event_dir.mkdir(parents=True)
existing_fixture = existing_event_dir / "0.json"
existing_body = json.dumps({"sentinel": "do_not_lose"}, indent=2)
existing_fixture.write_text(existing_body)
ok_raw = {
"hook_event_name": "SessionStart",
"session_id": "abc",
"transcript_path": "/x",
"cwd": "/x",
"source": "startup",
}
bad_raw = {**ok_raw, "fancy_new_field": "should_fail"}
(capdir / "0001-SessionStart.in.json").write_text(json.dumps(ok_raw))
(capdir / "0002-SessionStart.in.json").write_text(json.dumps(bad_raw))
result = _run([
"--harness", "claude",
"--capture-dir", str(capdir),
"--output-dir", str(outdir),
])
assert result.returncode == 1, f"expected rc=1, got {result.returncode}"
assert existing_fixture.exists(), "atomic-fail must NOT delete existing fixture"
assert existing_fixture.read_text() == existing_body, (
"atomic-fail must NOT rewrite existing fixture"
)
leaked = [p for p in outdir.iterdir() if p.name.startswith(".claude.staging-")]
assert not leaked, f"staging dir leaked: {leaked}"
previous = [p for p in outdir.iterdir() if p.name.startswith(".claude.previous-")]
assert not previous, f"`.previous-*` directory leaked: {previous}"
def case_success_replaces_fixture_tree_atomically() -> None:
with tempfile.TemporaryDirectory() as cap, tempfile.TemporaryDirectory() as out:
capdir = Path(cap)
outdir = Path(out)
stale_dir = outdir / "claude" / "SessionStart"
stale_dir.mkdir(parents=True)
(stale_dir / "9.json").write_text(json.dumps({"stale": True}))
ok_raw = {
"hook_event_name": "SessionStart",
"session_id": "abc",
"transcript_path": "/x",
"cwd": "/x",
"source": "startup",
}
(capdir / "0001-SessionStart.in.json").write_text(json.dumps(ok_raw))
result = _run([
"--harness", "claude",
"--capture-dir", str(capdir),
"--output-dir", str(outdir),
])
assert result.returncode == 0, result.stderr
assert (outdir / "claude" / "SessionStart" / "0.json").exists()
assert not (outdir / "claude" / "SessionStart" / "9.json").exists(), (
"atomic swap must replace the tree wholesale, not merge with stale entries"
)
def main() -> int:
cases = [
case_known_fields_are_redacted,
case_unknown_field_is_denied,
case_dry_run_does_not_write,
case_empty_capture_dir_is_a_distinct_error,
case_codex_harness_strips_sessions,
case_failure_preserves_existing_fixtures_atomically,
case_success_replaces_fixture_tree_atomically,
]
failures: list[str] = []
for case in cases:
try:
case()
except AssertionError as err:
failures.append(f"{case.__name__}: {err}")
except Exception as err: failures.append(f"{case.__name__}: unexpected {type(err).__name__}: {err}")
else:
print(f"ok {case.__name__}")
if failures:
print()
for line in failures:
print(f"FAIL {line}", file=sys.stderr)
return 1
print(f"\nall {len(cases)} sanitizer cases passed")
return 0
if __name__ == "__main__":
sys.exit(main())