from __future__ import annotations
import argparse
import json
import os
import re
import shutil
import sys
import tempfile
from collections import defaultdict
from pathlib import Path
from typing import Any, Callable
PathRule = Callable[[Any, str], Any]
PATH_PLACEHOLDER = "<REPO_ROOT>"
TRANSCRIPT_PATH_PLACEHOLDER = "<TRANSCRIPT_PATH>"
LOG_PATH_PLACEHOLDER = "<LOG_PATH>"
SESSION_ID_PLACEHOLDER = "<SESSION_ID>"
RUN_ID_PLACEHOLDER = "<RUN_ID>"
TASK_ID_PLACEHOLDER = "<TASK_ID>"
TIMESTAMP_PLACEHOLDER = "<TS>"
HOSTNAME_PLACEHOLDER = "<HOSTNAME>"
USERNAME_PLACEHOLDER = "<USERNAME>"
PID_PLACEHOLDER = "<PID>"
PROMPT_BODY_PLACEHOLDER = "<USER_PROMPT_BODY>"
def _replace_with(placeholder: str) -> PathRule:
def rule(_value: Any, _path: str) -> Any:
return placeholder
return rule
def _keep(value: Any, _path: str) -> Any:
return value
def _recurse_object(rules: dict[str, PathRule], path: str, value: Any) -> Any:
if not isinstance(value, dict):
raise SanitizerError(f"{path}: expected object, got {type(value).__name__}")
out: dict[str, Any] = {}
for key, child in value.items():
child_path = f"{path}.{key}" if path else key
rule = rules.get(child_path)
if rule is None:
raise SanitizerError(
f"{child_path}: no sanitization rule (deny-by-default). "
f"Add one to scripts/sanitize-live-evidence.py and document it in "
f"tests/fixtures/live/README.md."
)
out[key] = rule(child, child_path)
return out
class SanitizerError(Exception):
pass
CLAUDE_RULES: dict[str, PathRule] = {
"session_id": _replace_with(SESSION_ID_PLACEHOLDER),
"transcript_path": _replace_with(TRANSCRIPT_PATH_PLACEHOLDER),
"cwd": _replace_with(PATH_PLACEHOLDER),
"hook_event_name": _keep,
"permission_mode": _keep,
"source": _keep,
"prompt": _replace_with(PROMPT_BODY_PLACEHOLDER),
"trigger": _keep,
"custom_instructions": _replace_with(PROMPT_BODY_PLACEHOLDER),
"stop_hook_active": _keep,
"reason": _keep,
}
CODEX_RULES: dict[str, PathRule] = {
"session_id": _replace_with(SESSION_ID_PLACEHOLDER),
"cwd": _replace_with(PATH_PLACEHOLDER),
"hook_event_name": _keep,
"prompt": _replace_with(PROMPT_BODY_PLACEHOLDER),
}
HARNESS_RULES: dict[str, dict[str, PathRule]] = {
"claude": CLAUDE_RULES,
"codex": CODEX_RULES,
}
CAPTURE_FILENAME_RE = re.compile(
r"^(?P<ts>\d+)-(?P<event>[A-Za-z][A-Za-z0-9_]*)(?:-(?P<label>[^.]+))?\.in\.json$"
)
def sanitize_capture(raw: dict, rules: dict[str, PathRule]) -> dict:
return _recurse_object(rules, "", raw)
def main(argv: list[str]) -> int:
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--harness", required=True, choices=sorted(HARNESS_RULES.keys()))
parser.add_argument("--capture-dir", required=True, type=Path,
help="Directory containing raw *.in.json captures from capture-live-hook.sh")
parser.add_argument("--output-dir", required=True, type=Path,
help="Root output directory (e.g. tests/fixtures/live).")
parser.add_argument("--dry-run", action="store_true",
help="Sanitize and validate, but don't write output files.")
args = parser.parse_args(argv)
rules = HARNESS_RULES[args.harness]
capture_dir: Path = args.capture_dir
output_root: Path = args.output_dir / args.harness
if not capture_dir.is_dir():
print(f"sanitize-live-evidence: capture-dir does not exist: {capture_dir}", file=sys.stderr)
return 64
captures: list[tuple[Path, str]] = []
for entry in sorted(capture_dir.iterdir()):
if not entry.is_file():
continue
match = CAPTURE_FILENAME_RE.match(entry.name)
if not match:
continue
captures.append((entry, match.group("event")))
if not captures:
print(f"sanitize-live-evidence: no captures found in {capture_dir}", file=sys.stderr)
return 65
sanitized_outputs: list[tuple[str, str, dict]] = []
per_event_count: dict[str, int] = defaultdict(int)
errors: list[str] = []
for capture_path, event in captures:
try:
raw = json.loads(capture_path.read_text())
except json.JSONDecodeError as err:
errors.append(f"{capture_path}: invalid JSON: {err}")
continue
try:
sanitized = sanitize_capture(raw, rules)
except SanitizerError as err:
errors.append(f"{capture_path}: {err}")
continue
ordinal = per_event_count[event]
per_event_count[event] += 1
sanitized_outputs.append((event, f"{ordinal}.json", sanitized))
if errors:
print("sanitize-live-evidence: errors:", file=sys.stderr)
for err in errors:
print(f" {err}", file=sys.stderr)
print(
"sanitize-live-evidence: existing fixtures under "
f"{output_root} were NOT touched (atomic write)",
file=sys.stderr,
)
return 1
if not args.dry_run:
parent = output_root.parent
parent.mkdir(parents=True, exist_ok=True)
staging = Path(tempfile.mkdtemp(prefix=f".{output_root.name}.staging-", dir=parent))
try:
for event, filename, body in sanitized_outputs:
event_dir = staging / event
event_dir.mkdir(parents=True, exist_ok=True)
(event_dir / filename).write_text(
json.dumps(body, indent=2, sort_keys=True) + "\n"
)
previous: Path | None = None
if output_root.exists():
previous = parent / f".{output_root.name}.previous-{os.getpid()}"
os.rename(output_root, previous)
try:
os.rename(staging, output_root)
except OSError:
if previous is not None and previous.exists():
os.rename(previous, output_root)
raise
if previous is not None:
shutil.rmtree(previous)
except OSError:
if staging.exists():
shutil.rmtree(staging, ignore_errors=True)
raise
summary = ", ".join(f"{evt}={n}" for evt, n in sorted(per_event_count.items()))
mode = "dry-run" if args.dry_run else "wrote"
print(
f"sanitize-live-evidence: {mode} {sum(per_event_count.values())} fixtures ({summary})",
file=sys.stderr,
)
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))