import json
import os
import subprocess
import sys
import tempfile
import uuid as uuid_mod
from typing import List, Optional
def _capture_prompts_enabled() -> bool:
config_path = os.path.expanduser("~/.agentdiff/config.toml")
try:
with open(config_path, encoding="utf-8") as f:
content = f.read()
for line in content.splitlines():
stripped = line.strip().replace(" ", "").lower()
if stripped.startswith("capture_prompts="):
val = stripped.split("=", 1)[1].split("#")[0].strip()
return val not in ("false", "0", "no", "off")
except (OSError, IOError):
pass
return True
def run(cmd: List[str], cwd: str) -> subprocess.CompletedProcess:
return subprocess.run(cmd, cwd=cwd, text=True, capture_output=True)
def read_json_file(path: str):
if not os.path.exists(path):
return None
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return None
def remove_if_exists(path: str) -> None:
try:
if os.path.exists(path):
os.remove(path)
except Exception:
pass
def sha_in_content(content: str, sha: str) -> bool:
for raw in content.splitlines():
line = raw.strip()
if not line:
continue
try:
obj = json.loads(line)
except Exception:
continue
if isinstance(obj, dict) and obj.get("sha") == sha:
return True
return False
def get_meta_branch_content(repo_root: str) -> Optional[str]:
res = run(["git", "show", "agentdiff-meta:ledger.jsonl"], cwd=repo_root)
if res.returncode == 0:
return res.stdout
return None
def write_to_meta_branch(repo_root: str, new_content: str) -> bool:
tmp_path = None
try:
with tempfile.NamedTemporaryFile(
mode="w", suffix=".jsonl", delete=False, encoding="utf-8"
) as f:
f.write(new_content)
tmp_path = f.name
blob_res = run(["git", "hash-object", "-w", tmp_path], cwd=repo_root)
if blob_res.returncode != 0:
return False
blob_sha = blob_res.stdout.strip()
tree_input = f"100644 blob {blob_sha}\tledger.jsonl\n"
tree_res = subprocess.run(
["git", "mktree"],
input=tree_input,
text=True,
capture_output=True,
cwd=repo_root,
)
if tree_res.returncode != 0:
return False
tree_sha = tree_res.stdout.strip()
parent_res = run(
["git", "rev-parse", "refs/heads/agentdiff-meta"], cwd=repo_root
)
parent_args: List[str] = []
if parent_res.returncode == 0:
parent_sha = parent_res.stdout.strip()
if parent_sha:
parent_args = ["-p", parent_sha]
short_res = run(["git", "rev-parse", "--short", "HEAD"], cwd=repo_root)
short_sha = short_res.stdout.strip() if short_res.returncode == 0 else "?"
commit_res = subprocess.run(
["git", "commit-tree", tree_sha, "-m", f"agentdiff: {short_sha}"]
+ parent_args,
text=True,
capture_output=True,
cwd=repo_root,
)
if commit_res.returncode != 0:
return False
commit_sha = commit_res.stdout.strip()
ref_res = run(
["git", "update-ref", "refs/heads/agentdiff-meta", commit_sha],
cwd=repo_root,
)
return ref_res.returncode == 0
except Exception:
return False
finally:
if tmp_path and os.path.exists(tmp_path):
try:
os.unlink(tmp_path)
except Exception:
pass
def write_agent_trace(repo_root: str, pending: dict, sha: str, ts: str) -> bool:
branch_res = run(["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=repo_root)
if branch_res.returncode != 0:
return False
branch = branch_res.stdout.strip()
if not branch or branch == "HEAD":
return False
sanitized = branch.replace("/", "%2F")
traces_dir = os.path.join(repo_root, ".git", "agentdiff", "traces")
os.makedirs(traces_dir, exist_ok=True)
traces_path = os.path.join(traces_dir, f"{sanitized}.jsonl")
agent = str(pending.get("agent") or "human")
model = str(pending.get("model") or "human")
attribution = pending.get("attribution") or {}
lines_map = pending.get("lines") or {}
files = []
for file_path, ranges in lines_map.items():
file_attr = attribution.get(file_path, {})
file_agent = str(file_attr.get("agent") or agent)
file_model = str(file_attr.get("model") or model)
contributor_type = "human" if file_agent == "human" else "ai"
contributor: dict = {"type": contributor_type}
if file_model and file_model != "human":
contributor["model_id"] = file_model
trace_ranges = []
for r in ranges:
if isinstance(r, (list, tuple)) and len(r) == 2:
trace_ranges.append({
"start_line": int(min(r[0], r[1])),
"end_line": int(max(r[0], r[1])),
})
if trace_ranges:
files.append({
"path": file_path,
"conversations": [{
"contributor": contributor,
"ranges": trace_ranges,
}],
})
if not files:
return True
prompts_on = _capture_prompts_enabled()
metadata: dict = {}
if prompts_on:
if pending.get("prompt_excerpt"):
metadata["prompt_excerpt"] = str(pending["prompt_excerpt"])
if pending.get("prompt_hash"):
metadata["prompt_hash"] = str(pending["prompt_hash"])
if isinstance(pending.get("trust"), int):
metadata["trust"] = pending["trust"]
if isinstance(pending.get("flags"), list) and pending["flags"]:
metadata["flags"] = pending["flags"]
if pending.get("session_id"):
metadata["session_id"] = str(pending["session_id"])
trace: dict = {
"version": "0.1.0",
"id": str(uuid_mod.uuid4()),
"timestamp": ts,
"vcs": {"type": "git", "revision": sha},
"tool": {"name": agent},
"files": files,
}
if metadata:
trace["metadata"] = metadata
line = json.dumps(trace, separators=(",", ":")) + "\n"
try:
with open(traces_path, "a", encoding="utf-8") as f:
f.write(line)
return True
except Exception:
return False
def sha_exists_on_disk(ledger_path: str, sha: str) -> bool:
if not os.path.exists(ledger_path):
return False
try:
with open(ledger_path, "r", encoding="utf-8") as f:
return sha_in_content(f.read(), sha)
except Exception:
return False
def main() -> int:
if len(sys.argv) < 4:
print(
"usage: finalize-ledger.py <repo_root> <pending_ledger> <pending_context> [<ledger_path>]",
file=sys.stderr,
)
return 2
repo_root = os.path.abspath(sys.argv[1])
pending_ledger_path = os.path.abspath(sys.argv[2])
pending_context_path = os.path.abspath(sys.argv[3])
ledger_path = os.path.abspath(sys.argv[4]) if len(sys.argv) >= 5 else None
if not os.path.exists(os.path.join(repo_root, ".git")):
return 0
pending = read_json_file(pending_ledger_path)
if not isinstance(pending, dict):
return 0
sha_res = run(["git", "rev-parse", "HEAD"], cwd=repo_root)
if sha_res.returncode != 0:
return 1
sha = sha_res.stdout.strip()
if not sha:
return 1
meta_content = get_meta_branch_content(repo_root)
if meta_content is not None:
if sha_in_content(meta_content, sha):
remove_if_exists(pending_ledger_path)
remove_if_exists(pending_context_path)
return 0
elif ledger_path and sha_exists_on_disk(ledger_path, sha):
remove_if_exists(pending_ledger_path)
remove_if_exists(pending_context_path)
return 0
ts_res = run(["git", "show", "-s", "--format=%cI", "HEAD"], cwd=repo_root)
if ts_res.returncode != 0:
return 1
ts = ts_res.stdout.strip()
author_res = run(["git", "show", "-s", "--format=%an", "HEAD"], cwd=repo_root)
author = author_res.stdout.strip() if author_res.returncode == 0 else ""
prompts_on = _capture_prompts_enabled()
entry: dict = {
"sha": sha,
"ts": ts,
"agent": str(pending.get("agent") or "human"),
"model": str(pending.get("model") or "human"),
"session_id": str(pending.get("session_id") or "unknown"),
"author": author or None,
"files_touched": pending.get("files_touched") if isinstance(pending.get("files_touched"), list) else [],
"lines": pending.get("lines") if isinstance(pending.get("lines"), dict) else {},
"prompt_excerpt": str(pending.get("prompt_excerpt") or "") if prompts_on else "",
"prompt_hash": str(pending.get("prompt_hash") or "") if prompts_on else "",
"files_read": pending.get("files_read") if isinstance(pending.get("files_read"), list) else [],
"flags": pending.get("flags") if isinstance(pending.get("flags"), list) else [],
"tool": str(pending.get("tool") or "commit"),
"mode": pending.get("mode"),
}
if pending.get("intent"):
entry["intent"] = str(pending.get("intent"))
if isinstance(pending.get("trust"), int):
entry["trust"] = max(0, min(100, int(pending["trust"])))
if isinstance(pending.get("attribution"), dict):
entry["attribution"] = pending["attribution"]
entry = {k: v for (k, v) in entry.items() if v is not None}
entry_line = json.dumps(entry, separators=(",", ":")) + "\n"
write_agent_trace(repo_root, pending, sha, ts)
existing = meta_content if meta_content is not None else ""
if write_to_meta_branch(repo_root, existing + entry_line):
remove_if_exists(pending_ledger_path)
remove_if_exists(pending_context_path)
return 0
if not ledger_path:
remove_if_exists(pending_ledger_path)
remove_if_exists(pending_context_path)
return 1
parent = os.path.dirname(ledger_path)
if parent:
os.makedirs(parent, exist_ok=True)
with open(ledger_path, "a", encoding="utf-8") as f:
f.write(entry_line)
remove_if_exists(pending_ledger_path)
remove_if_exists(pending_context_path)
return 0
if __name__ == "__main__":
raise SystemExit(main())