import hashlib
import json
import os
import re
import subprocess
import sys
from datetime import datetime, timezone
from typing import Dict, List, Tuple
def run(cmd: List[str], cwd: str) -> subprocess.CompletedProcess:
return subprocess.run(cmd, cwd=cwd, text=True, capture_output=True)
def parse_staged_changes(repo_root: str) -> Dict[str, List[Tuple[int, int]]]:
diff = run(["git", "diff", "--cached", "--unified=0", "--no-color"], cwd=repo_root)
if diff.returncode != 0 or not diff.stdout.strip():
return {}
changed: Dict[str, set] = {}
current_file = ""
for raw in diff.stdout.splitlines():
line = raw.rstrip("\n")
if line.startswith("diff --git "):
parts = line.split()
if len(parts) >= 4:
path = parts[3]
if path.startswith("b/"):
path = path[2:]
current_file = path
changed.setdefault(current_file, set())
continue
if not line.startswith("@@") or not current_file:
continue
m = re.search(r"\+(\d+)(?:,(\d+))?", line)
if not m:
continue
start = int(m.group(1))
count = int(m.group(2) or "1")
if count <= 0:
continue
for ln in range(start, start + count):
changed[current_file].add(ln)
out: Dict[str, List[Tuple[int, int]]] = {}
for path, lines in changed.items():
sorted_lines = sorted(lines)
if not sorted_lines:
out[path] = []
continue
ranges: List[Tuple[int, int]] = []
start = sorted_lines[0]
prev = sorted_lines[0]
for ln in sorted_lines[1:]:
if ln == prev + 1:
prev = ln
continue
ranges.append((start, prev))
start = ln
prev = ln
ranges.append((start, prev))
out[path] = ranges
return out
def read_json_file(path: str):
if not os.path.exists(path):
return None
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return None
def parse_event_ts(event_ts: str) -> int:
try:
normalized = event_ts.replace("Z", "+00:00")
dt = datetime.fromisoformat(normalized)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return int(dt.timestamp())
except Exception:
return 0
def head_commit_ts(repo_root: str) -> int:
out = run(["git", "show", "-s", "--format=%ct", "HEAD"], cwd=repo_root)
if out.returncode != 0:
return 0
try:
return int(out.stdout.strip())
except Exception:
return 0
def event_overlaps_staged(event: dict, lines_by_file: Dict[str, List[Tuple[int, int]]]) -> bool:
file_path = str(event.get("file", "")).strip()
if not file_path or file_path not in lines_by_file:
return False
event_lines = event.get("lines")
if not isinstance(event_lines, list) or not event_lines:
return True
line_values = set()
for v in event_lines:
try:
n = int(v)
if n > 0:
line_values.add(n)
except Exception:
continue
if not line_values:
return True
for (start, end) in lines_by_file.get(file_path, []):
lo = min(start, end)
hi = max(start, end)
for ln in line_values:
if lo <= ln <= hi:
return True
return False
_NOISY_AGENTS = {"copilot"}
def read_events_per_file(
path: str,
touched_files: List[str],
min_ts: int,
lines_by_file: Dict[str, List[Tuple[int, int]]],
) -> Dict[str, dict]:
if not os.path.exists(path):
return {}
best: Dict[str, Tuple[int, dict]] = {} try:
with open(path, "r", encoding="utf-8") as f:
for raw in f:
line = raw.strip()
if not line:
continue
try:
event = json.loads(line)
except Exception:
continue
if not isinstance(event, dict):
continue
event_ts = parse_event_ts(str(event.get("timestamp") or ""))
if event_ts < min_ts:
continue
file_path = str(event.get("file", "")).strip()
if not file_path or file_path not in touched_files:
continue
if not event_overlaps_staged(event, lines_by_file):
continue
agent = str(event.get("agent") or "human")
prev_ts, prev_ev = best.get(file_path, (0, None))
prev_agent = str(prev_ev.get("agent") or "human") if prev_ev else ""
if prev_agent in _NOISY_AGENTS and agent not in _NOISY_AGENTS:
best[file_path] = (event_ts, event)
elif prev_agent not in _NOISY_AGENTS and prev_agent and agent in _NOISY_AGENTS:
pass
elif event_ts >= prev_ts:
best[file_path] = (event_ts, event)
except Exception:
return {}
return {fp: ev for fp, (_, ev) in best.items()}
def dominant_event(events_by_file: Dict[str, dict], lines_by_file: Dict[str, List[Tuple[int, int]]]) -> dict:
if not events_by_file:
return {}
line_count: Dict[str, int] = {}
for fp, event in events_by_file.items():
agent = str(event.get("agent") or "human")
ranges = lines_by_file.get(fp, [])
count = sum(max(0, b - a + 1) for a, b in ranges)
line_count[agent] = line_count.get(agent, 0) + count
dominant_agent = max(line_count, key=lambda a: line_count[a])
for fp, event in reversed(list(events_by_file.items())):
if str(event.get("agent") or "human") == dominant_agent:
return event
return next(iter(events_by_file.values()))
def get_git_username(repo_root: str) -> str:
try:
out = run(["git", "config", "user.name"], cwd=repo_root)
name = out.stdout.strip()
return name if name else "human"
except Exception:
return "human"
def prompt_excerpt(prompt: str, limit: int = 160) -> str:
text = " ".join((prompt or "").split())
if not text:
return ""
if len(text) <= limit:
return text
return text[: max(0, limit - 3)] + "..."
def sha256_text(text: str) -> str:
return hashlib.sha256((text or "").encode("utf-8")).hexdigest()
def main() -> int:
if len(sys.argv) < 5:
print(
"usage: prepare-ledger.py <repo_root> <session_log> <pending_context> <pending_ledger>",
file=sys.stderr,
)
return 2
repo_root = os.path.abspath(sys.argv[1])
session_log = os.path.abspath(sys.argv[2])
pending_context_path = os.path.abspath(sys.argv[3])
pending_ledger_path = os.path.abspath(sys.argv[4])
if not os.path.exists(os.path.join(repo_root, ".git")):
return 0
lines_by_file = parse_staged_changes(repo_root)
if not lines_by_file:
return 0
lines_by_file = {
f: r for f, r in lines_by_file.items()
if not f.startswith(".agentdiff/")
}
if not lines_by_file:
return 0
files_touched = sorted(lines_by_file.keys())
pending = read_json_file(pending_context_path)
if not isinstance(pending, dict):
pending = {}
events_by_file = read_events_per_file(
session_log,
files_touched,
head_commit_ts(repo_root),
lines_by_file,
)
event = dominant_event(events_by_file, lines_by_file) or {}
prompt = str(pending.get("prompt") or event.get("prompt") or "")
session_id = str(pending.get("session_id") or event.get("session_id") or "unknown")
agent = str(pending.get("agent") or event.get("agent") or "human")
if agent == "human":
agent = get_git_username(repo_root)
model = str(pending.get("model_id") or pending.get("model") or event.get("model") or "human")
files_read = pending.get("files_read")
if not isinstance(files_read, list):
files_read = []
files_read = [str(p) for p in files_read]
trust = pending.get("trust")
if isinstance(trust, bool) or not isinstance(trust, int):
trust = None
if isinstance(trust, int):
trust = max(0, min(100, trust))
flags = pending.get("flags")
if not isinstance(flags, list):
flags = []
flags = [str(f) for f in flags]
intent = pending.get("intent")
if intent is not None:
intent = str(intent)
attribution: Dict[str, dict] = {}
for fp, ev in events_by_file.items():
file_agent = str(ev.get("agent") or "human")
file_model = str(ev.get("model") or "human")
if file_agent != agent or file_model != model:
attribution[fp] = {
"agent": file_agent,
"model": file_model,
"session_id": str(ev.get("session_id") or "unknown"),
"tool": str(ev.get("tool") or "commit"),
}
payload = {
"captured_at": datetime.now(timezone.utc).isoformat(),
"agent": agent,
"model": model,
"session_id": session_id,
"files_touched": files_touched,
"lines": {
file_path: [[int(a), int(b)] for (a, b) in ranges]
for file_path, ranges in lines_by_file.items()
},
"prompt_excerpt": prompt_excerpt(prompt),
"prompt_hash": sha256_text(prompt),
"files_read": files_read,
"flags": flags,
"tool": str(event.get("tool") or "commit"),
"mode": event.get("mode"),
}
if attribution:
payload["attribution"] = attribution
if intent:
payload["intent"] = intent
if trust is not None:
payload["trust"] = trust
parent = os.path.dirname(pending_ledger_path)
if parent:
os.makedirs(parent, exist_ok=True)
with open(pending_ledger_path, "w", encoding="utf-8") as f:
json.dump(payload, f, separators=(",", ":"))
return 0
if __name__ == "__main__":
raise SystemExit(main())