lifeloop-cli 0.3.0

Provider-neutral lifecycle abstraction and normalizer for AI harnesses
Documentation
#!/usr/bin/env python3
"""Sanitize raw live adapter capture into committable fixtures.

Reads `<capture-dir>/*.in.json` (one raw hook stdin per file, written
by `scripts/capture-live-hook.sh`), applies the sanitization rules
documented in `tests/fixtures/live/README.md`, and writes the
sanitized JSON to `<output-dir>/<harness>/<event-name>/<n>.json`.

Sanitization is **deny-by-default**: every JSON field traversed must
have an explicit rule (keep, replace, or recurse). An unknown field
fails the run with a non-zero exit code and a path-prefixed error
message. This keeps the rig from silently committing leaked data
when the upstream harness adds a new field.

Usage:
    sanitize-live-evidence.py \\
        --harness claude \\
        --capture-dir /tmp/lifeloop-live-capture \\
        --output-dir tests/fixtures/live

Re-running the sanitizer is idempotent and **atomic**: sanitized
fixtures are staged in a sibling temp directory and atomic-swapped
into place only after every capture has sanitized successfully. A
deny-by-default error mid-run leaves the existing committed
fixtures untouched. The script does not delete files outside the
specified harness subdirectory.
"""

from __future__ import annotations

import argparse
import json
import os
import re
import shutil
import sys
import tempfile
from collections import defaultdict
from pathlib import Path
from typing import Any, Callable

# ---------------------------------------------------------------------------
# Sanitization rules
# ---------------------------------------------------------------------------
#
# Each rule is a function `(value, path) -> sanitized_value`. The
# rules are matched by exact JSON-path key (period-joined), e.g.
# `cwd`, `session_id`, `hook_event_name`. Path-prefix rules let us
# reach nested fields without enumerating every parent.

PathRule = Callable[[Any, str], Any]

# Path placeholder: replace any non-empty string that looks
# absolute-path-like with `<REPO_ROOT>` etc.
PATH_PLACEHOLDER = "<REPO_ROOT>"
TRANSCRIPT_PATH_PLACEHOLDER = "<TRANSCRIPT_PATH>"
LOG_PATH_PLACEHOLDER = "<LOG_PATH>"
SESSION_ID_PLACEHOLDER = "<SESSION_ID>"
RUN_ID_PLACEHOLDER = "<RUN_ID>"
TASK_ID_PLACEHOLDER = "<TASK_ID>"
TIMESTAMP_PLACEHOLDER = "<TS>"
HOSTNAME_PLACEHOLDER = "<HOSTNAME>"
USERNAME_PLACEHOLDER = "<USERNAME>"
PID_PLACEHOLDER = "<PID>"
PROMPT_BODY_PLACEHOLDER = "<USER_PROMPT_BODY>"


def _replace_with(placeholder: str) -> PathRule:
    def rule(_value: Any, _path: str) -> Any:
        return placeholder
    return rule


def _keep(value: Any, _path: str) -> Any:
    return value


def _recurse_object(rules: dict[str, PathRule], path: str, value: Any) -> Any:
    if not isinstance(value, dict):
        raise SanitizerError(f"{path}: expected object, got {type(value).__name__}")
    out: dict[str, Any] = {}
    for key, child in value.items():
        child_path = f"{path}.{key}" if path else key
        rule = rules.get(child_path)
        if rule is None:
            raise SanitizerError(
                f"{child_path}: no sanitization rule (deny-by-default). "
                f"Add one to scripts/sanitize-live-evidence.py and document it in "
                f"tests/fixtures/live/README.md."
            )
        out[key] = rule(child, child_path)
    return out


class SanitizerError(Exception):
    pass


# Claude Code hook stdin schema rules.
#
# Reference: Claude Code's hook protocol — every hook event has a
# `hook_event_name` plus event-specific fields. The set below covers
# the lifecycle paths #30 captures: SessionStart, UserPromptSubmit,
# PreCompact, Stop, SessionEnd. New fields require a new rule
# entry; deny-by-default surfaces them as errors.
CLAUDE_RULES: dict[str, PathRule] = {
    # Common fields across all events.
    "session_id": _replace_with(SESSION_ID_PLACEHOLDER),
    "transcript_path": _replace_with(TRANSCRIPT_PATH_PLACEHOLDER),
    "cwd": _replace_with(PATH_PLACEHOLDER),
    "hook_event_name": _keep,
    "permission_mode": _keep,
    # SessionStart-specific.
    "source": _keep,
    # UserPromptSubmit-specific.
    "prompt": _replace_with(PROMPT_BODY_PLACEHOLDER),
    # PreCompact-specific.
    "trigger": _keep,
    "custom_instructions": _replace_with(PROMPT_BODY_PLACEHOLDER),
    # Stop-specific.
    "stop_hook_active": _keep,
    # SessionEnd-specific.
    "reason": _keep,
}

# Codex CLI hook stdin schema rules. Codex's hook surface is smaller
# than Claude's per the existing adapter mapping; this stub is here so
# #31 can fill it in without re-discovering the deny-by-default
# convention.
CODEX_RULES: dict[str, PathRule] = {
    "session_id": _replace_with(SESSION_ID_PLACEHOLDER),
    "cwd": _replace_with(PATH_PLACEHOLDER),
    "hook_event_name": _keep,
    "prompt": _replace_with(PROMPT_BODY_PLACEHOLDER),
}

HARNESS_RULES: dict[str, dict[str, PathRule]] = {
    "claude": CLAUDE_RULES,
    "codex": CODEX_RULES,
}


# ---------------------------------------------------------------------------
# Driver
# ---------------------------------------------------------------------------

CAPTURE_FILENAME_RE = re.compile(
    r"^(?P<ts>\d+)-(?P<event>[A-Za-z][A-Za-z0-9_]*)(?:-(?P<label>[^.]+))?\.in\.json$"
)


def sanitize_capture(raw: dict, rules: dict[str, PathRule]) -> dict:
    return _recurse_object(rules, "", raw)


def main(argv: list[str]) -> int:
    parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
    parser.add_argument("--harness", required=True, choices=sorted(HARNESS_RULES.keys()))
    parser.add_argument("--capture-dir", required=True, type=Path,
                        help="Directory containing raw *.in.json captures from capture-live-hook.sh")
    parser.add_argument("--output-dir", required=True, type=Path,
                        help="Root output directory (e.g. tests/fixtures/live).")
    parser.add_argument("--dry-run", action="store_true",
                        help="Sanitize and validate, but don't write output files.")
    args = parser.parse_args(argv)

    rules = HARNESS_RULES[args.harness]
    capture_dir: Path = args.capture_dir
    output_root: Path = args.output_dir / args.harness

    if not capture_dir.is_dir():
        print(f"sanitize-live-evidence: capture-dir does not exist: {capture_dir}", file=sys.stderr)
        return 64

    captures: list[tuple[Path, str]] = []
    for entry in sorted(capture_dir.iterdir()):
        if not entry.is_file():
            continue
        match = CAPTURE_FILENAME_RE.match(entry.name)
        if not match:
            continue
        captures.append((entry, match.group("event")))

    if not captures:
        print(f"sanitize-live-evidence: no captures found in {capture_dir}", file=sys.stderr)
        return 65

    # Sanitize all captures in memory FIRST. Atomicity contract:
    # the on-disk fixture tree is never partially destroyed —
    # either the full sanitized set lands and replaces the prior
    # output, or nothing on disk changes. AI review (MR !34) raised
    # this as [high/correctness/high]: a deny-by-default sanitizer
    # error mid-run was wiping committed fixtures before failing,
    # leaving recovery dependent on `git checkout` rather than the
    # script's own behavior.
    sanitized_outputs: list[tuple[str, str, dict]] = []
    per_event_count: dict[str, int] = defaultdict(int)
    errors: list[str] = []
    for capture_path, event in captures:
        try:
            raw = json.loads(capture_path.read_text())
        except json.JSONDecodeError as err:
            errors.append(f"{capture_path}: invalid JSON: {err}")
            continue
        try:
            sanitized = sanitize_capture(raw, rules)
        except SanitizerError as err:
            errors.append(f"{capture_path}: {err}")
            continue

        ordinal = per_event_count[event]
        per_event_count[event] += 1
        sanitized_outputs.append((event, f"{ordinal}.json", sanitized))

    if errors:
        print("sanitize-live-evidence: errors:", file=sys.stderr)
        for err in errors:
            print(f"  {err}", file=sys.stderr)
        print(
            "sanitize-live-evidence: existing fixtures under "
            f"{output_root} were NOT touched (atomic write)",
            file=sys.stderr,
        )
        return 1

    if not args.dry_run:
        # Stage to a sibling temp dir, then atomic-swap. Both dirs
        # live in the same parent so os.rename is a single inode
        # operation. If sanitization succeeded, this is a no-fail
        # path; if anything goes wrong here we surface it before
        # touching the real fixture tree.
        parent = output_root.parent
        parent.mkdir(parents=True, exist_ok=True)
        staging = Path(tempfile.mkdtemp(prefix=f".{output_root.name}.staging-", dir=parent))
        try:
            for event, filename, body in sanitized_outputs:
                event_dir = staging / event
                event_dir.mkdir(parents=True, exist_ok=True)
                (event_dir / filename).write_text(
                    json.dumps(body, indent=2, sort_keys=True) + "\n"
                )

            # Atomic-swap. If output_root exists, move it out of
            # the way first so we can rename staging into place,
            # then remove the old. The window during which both
            # exist on disk is <100us; if the process crashes there
            # we leave a `.<name>.previous-<pid>` directory next to
            # the new output_root that the operator can rm or
            # restore.
            previous: Path | None = None
            if output_root.exists():
                previous = parent / f".{output_root.name}.previous-{os.getpid()}"
                os.rename(output_root, previous)
            try:
                os.rename(staging, output_root)
            except OSError:
                # Roll back: restore the previous output_root if we
                # had one. Re-raise to surface the failure.
                if previous is not None and previous.exists():
                    os.rename(previous, output_root)
                raise
            if previous is not None:
                shutil.rmtree(previous)
        except OSError:
            # Best-effort cleanup of the staging dir; the real
            # fixture tree is intact (either untouched or restored
            # above).
            if staging.exists():
                shutil.rmtree(staging, ignore_errors=True)
            raise

    summary = ", ".join(f"{evt}={n}" for evt, n in sorted(per_event_count.items()))
    mode = "dry-run" if args.dry_run else "wrote"
    print(
        f"sanitize-live-evidence: {mode} {sum(per_event_count.values())} fixtures ({summary})",
        file=sys.stderr,
    )
    return 0


if __name__ == "__main__":
    sys.exit(main(sys.argv[1:]))