import os
import sys
import json
import subprocess
from datetime import datetime, timezone
def debug_enabled() -> bool:
return os.environ.get("AGENTDIFF_DEBUG", "").lower() in {"1", "true", "yes", "on"}
def capture_prompts_enabled() -> bool:
config_path = os.path.expanduser("~/.agentdiff/config.toml")
try:
with open(config_path, encoding="utf-8") as f:
content = f.read()
for line in content.splitlines():
stripped = line.strip().replace(" ", "").lower()
if stripped.startswith("capture_prompts="):
val = stripped.split("=", 1)[1].split("#")[0].strip()
return val not in ("false", "0", "no", "off")
except (OSError, IOError):
pass
return True
def debug_log(message: str) -> None:
if not debug_enabled():
return
log_dir = os.path.expanduser("~/.agentdiff/logs")
os.makedirs(log_dir, exist_ok=True)
path = os.path.join(log_dir, "capture-claude.log")
ts = datetime.now(timezone.utc).isoformat()
with open(path, "a", encoding="utf-8") as f:
f.write(f"{ts} {message}\n")
def first(payload: dict, *keys, default=None):
for key in keys:
if key in payload and payload.get(key) is not None:
return payload.get(key)
return default
def find_repo_root(cwd: str) -> str:
try:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True, text=True, cwd=cwd
)
return result.stdout.strip() if result.returncode == 0 else cwd
except Exception:
return cwd
def get_session_log(cwd: str):
override = os.environ.get("AGENTDIFF_SESSION_LOG")
if override:
parent = os.path.dirname(override)
if parent:
os.makedirs(parent, exist_ok=True)
return override
repo_root = find_repo_root(cwd)
base = os.path.join(repo_root, ".git", "agentdiff")
if os.path.isdir(base):
return os.path.join(base, "session.jsonl")
return None
def _tail_read_jsonl(path: str, chunk_size: int = 32768) -> list:
results = []
try:
size = os.path.getsize(path)
with open(path, "rb") as fh:
offset = max(0, size - chunk_size)
fh.seek(offset)
raw = fh.read()
if offset > 0:
nl = raw.find(b"\n")
raw = raw[nl + 1:] if nl >= 0 else raw
for line in reversed(raw.decode("utf-8", errors="replace").splitlines()):
line = line.strip()
if not line:
continue
try:
results.append(json.loads(line))
except Exception:
continue
except Exception:
pass
return results
def get_prompt_from_history(session_id: str) -> str:
path = os.path.expanduser("~/.claude/history.jsonl")
entries = _tail_read_jsonl(path)
for entry in entries:
if entry.get("sessionId") != session_id:
continue
display = entry.get("display", "").strip()
if not display or display.startswith("/"):
continue
extra_parts = []
for pasted in (entry.get("pastedContents") or {}).values():
if isinstance(pasted, dict) and pasted.get("type") == "text":
content = pasted.get("content", "")
if content:
extra_parts.append(content[:200])
if extra_parts:
display = display + " [pasted: " + " | ".join(extra_parts) + "]"
return display[:500]
return "unknown"
def get_model_and_prompt(cwd: str, session_id: str) -> tuple:
import glob as _glob
model = "unknown"
try:
home = os.path.expanduser("~")
pattern = os.path.join(home, ".claude", "projects", "**", f"{session_id}.jsonl")
debug_log(f"glob pattern: {pattern}")
matches = _glob.glob(pattern, recursive=True)
debug_log(f"glob matches: {matches}")
if matches:
session_path = matches[0]
debug_log(f"session_path: {session_path}")
for entry in _tail_read_jsonl(session_path):
if entry.get("type") == "assistant":
m = entry.get("message", {}).get("model", "")
if m and m != "<synthetic>":
model = m
debug_log(f"model found: {model}")
break
except Exception as exc:
debug_log(f"model lookup error: {exc}")
prompt = get_prompt_from_history(session_id)
if prompt == "unknown":
env_prompt = os.environ.get("AGENTDIFF_PROMPT", "")
if env_prompt:
prompt = env_prompt
debug_log(f"prompt from AGENTDIFF_PROMPT env var")
debug_log(f"prompt: {prompt[:80]!r}")
return model, prompt
def is_in_repo(abs_file: str, repo_root: str) -> bool:
if not abs_file.startswith(repo_root):
return False
rel = abs_file[len(repo_root):]
if rel.startswith("/.git"):
return False
return True
def compute_line_range(abs_file: str, old_content: str, new_content: str) -> list:
try:
with open(abs_file) as f:
current = f.read()
old_lines = set()
new_lines = set()
for i, line in enumerate(current.split("\n"), 1):
if old_content in line:
old_lines.add(i)
if new_content in line:
new_lines.add(i)
return sorted(list(old_lines | new_lines))
except Exception:
return list(range(1, new_content.count("\n") + 2))
def main():
input_data = sys.stdin.read()
if not input_data.strip():
sys.exit(0)
debug_log(f"raw={input_data[:2000]}")
try:
payload = json.loads(input_data)
except json.JSONDecodeError:
sys.exit(0)
tool = first(payload, "tool", "tool_name", "toolName", default="")
tool_input = payload.get("tool_input") if isinstance(payload.get("tool_input"), dict) else payload
if tool not in ["Edit", "Write", "MultiEdit"]:
debug_log(f"skip: unknown tool={tool!r}")
sys.exit(0)
cwd = first(payload, "cwd", default=os.getcwd())
repo_root = find_repo_root(cwd)
abs_file = first(tool_input, "file_path", "filePath", "path", default="")
if not abs_file:
debug_log("skip: missing abs_file")
sys.exit(0)
in_repo = os.path.exists(os.path.join(repo_root, ".git"))
if in_repo and not is_in_repo(abs_file, repo_root):
sys.exit(0)
session_id = first(payload, "session_id", "sessionId", default="unknown")
debug_log(f"before get_model_and_prompt session_id={session_id}")
model, prompt = get_model_and_prompt(cwd, session_id)
timestamp = datetime.now(timezone.utc).isoformat()
entry = {
"timestamp": timestamp,
"agent": "claude-code",
"model": model,
"session_id": session_id,
"tool": tool,
"file": abs_file[len(repo_root):].lstrip("/") if in_repo else abs_file,
"abs_file": abs_file,
"acceptance": "verbatim",
}
if capture_prompts_enabled():
entry["prompt"] = prompt
if tool == "Edit":
entry["old"] = first(tool_input, "old_string", "oldString", default="")[:200]
entry["new"] = first(tool_input, "new_string", "newString", default="")[:200]
entry["lines"] = compute_line_range(abs_file, entry["old"], entry["new"])
elif tool == "Write":
content = first(tool_input, "content", default="")
entry["content_preview"] = content[:200]
entry["total_lines"] = content.count("\n") + 1
entry["lines"] = list(range(1, content.count("\n") + 2))
elif tool == "MultiEdit":
edits = first(tool_input, "edits", default=[])
entry["edit_count"] = len(edits)
entry["edits"] = [{"old": e.get("old_string", "")[:100], "new": e.get("new_string", "")[:100]} for e in edits]
total_lines = sum(e.get("new_string", "").count("\n") for e in edits)
entry["lines"] = list(range(1, total_lines + 2))
session_log = get_session_log(cwd)
if session_log is None:
debug_log("skip: agentdiff init not run in this repo")
sys.exit(0)
with open(session_log, "a") as f:
f.write(json.dumps(entry) + "\n")
debug_log(f"wrote entry tool={tool} file={entry['file']} lines={entry.get('lines')}")
if __name__ == "__main__":
main()