agentdiff 0.1.30

Audit and trace autonomous AI code contributions in git repositories
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
#!/usr/bin/env python3
"""
Prepare commit-scoped ledger snapshot before commit.

Usage:
  prepare-ledger.py <repo_root> <session_log> <pending_context> <pending_ledger>
"""
import hashlib
import json
import os
import re
import subprocess
import sys
from datetime import datetime, timezone
from typing import Dict, List, Tuple


def run(cmd: List[str], cwd: str) -> subprocess.CompletedProcess:
    return subprocess.run(cmd, cwd=cwd, text=True, capture_output=True)


def parse_staged_changes(repo_root: str) -> Dict[str, List[Tuple[int, int]]]:
    diff = run(["git", "diff", "--cached", "--unified=0", "--no-color"], cwd=repo_root)
    if diff.returncode != 0 or not diff.stdout.strip():
        return {}

    changed: Dict[str, set] = {}
    current_file = ""

    for raw in diff.stdout.splitlines():
        line = raw.rstrip("\n")
        if line.startswith("diff --git "):
            parts = line.split()
            if len(parts) >= 4:
                path = parts[3]
                if path.startswith("b/"):
                    path = path[2:]
                current_file = path
                changed.setdefault(current_file, set())
            continue

        if not line.startswith("@@") or not current_file:
            continue

        # @@ -old_start,old_count +new_start,new_count @@
        m = re.search(r"\+(\d+)(?:,(\d+))?", line)
        if not m:
            continue

        start = int(m.group(1))
        count = int(m.group(2) or "1")
        if count <= 0:
            continue

        for ln in range(start, start + count):
            changed[current_file].add(ln)

    out: Dict[str, List[Tuple[int, int]]] = {}
    for path, lines in changed.items():
        sorted_lines = sorted(lines)
        if not sorted_lines:
            out[path] = []
            continue
        ranges: List[Tuple[int, int]] = []
        start = sorted_lines[0]
        prev = sorted_lines[0]
        for ln in sorted_lines[1:]:
            if ln == prev + 1:
                prev = ln
                continue
            ranges.append((start, prev))
            start = ln
            prev = ln
        ranges.append((start, prev))
        out[path] = ranges
    return out


def read_json_file(path: str):
    if not os.path.exists(path):
        return None
    try:
        with open(path, "r", encoding="utf-8") as f:
            return json.load(f)
    except Exception:
        return None


def parse_event_ts(event_ts: str) -> int:
    try:
        normalized = event_ts.replace("Z", "+00:00")
        dt = datetime.fromisoformat(normalized)
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        return int(dt.timestamp())
    except Exception:
        return 0


def head_commit_ts(repo_root: str) -> int:
    out = run(["git", "show", "-s", "--format=%ct", "HEAD"], cwd=repo_root)
    if out.returncode != 0:
        return 0
    try:
        return int(out.stdout.strip())
    except Exception:
        return 0


def event_overlaps_staged(event: dict, lines_by_file: Dict[str, List[Tuple[int, int]]]) -> bool:
    file_path = str(event.get("file", "")).strip()
    if not file_path or file_path not in lines_by_file:
        return False

    event_lines = event.get("lines")
    if not isinstance(event_lines, list) or not event_lines:
        return True

    line_values = set()
    for v in event_lines:
        try:
            n = int(v)
            if n > 0:
                line_values.add(n)
        except Exception:
            continue
    if not line_values:
        return True

    for (start, end) in lines_by_file.get(file_path, []):
        lo = min(start, end)
        hi = max(start, end)
        for ln in line_values:
            if lo <= ln <= hi:
                return True
    return False


# Agents completely excluded from winning per-file attribution.
# Copilot fires on every VS Code document change — including edits by other
# AI agents running in the terminal and normal human typing. Letting it win
# attribution would mask real sources. We keep capturing its events in
# session.jsonl (for usage stats) but never let it claim a file.
_EXCLUDED_AGENTS = {"copilot"}


def read_events_per_file(
    path: str,
    touched_files: List[str],
    min_ts: int,
    lines_by_file: Dict[str, List[Tuple[int, int]]],
) -> Dict[str, dict]:
    """Return the most-recent overlapping session event for each staged file.

    Returns a dict keyed by repo-relative file path. When multiple agents
    touched different files in the same session window each file gets its
    own correct attribution instead of a single winner-takes-all event.

    Excluded agents (copilot) are skipped entirely — the most recent
    non-excluded agent wins per file. If only excluded agents have events
    for a file, no event is returned and the file gets human attribution.
    """
    if not os.path.exists(path):
        return {}
    best: Dict[str, Tuple[int, dict]] = {}  # file -> (ts, event)
    try:
        with open(path, "r", encoding="utf-8") as f:
            for raw in f:
                line = raw.strip()
                if not line:
                    continue
                try:
                    event = json.loads(line)
                except Exception:
                    continue
                if not isinstance(event, dict):
                    continue
                event_ts = parse_event_ts(str(event.get("timestamp") or ""))
                if event_ts < min_ts:
                    continue
                file_path = str(event.get("file", "")).strip()
                if not file_path or file_path not in touched_files:
                    continue
                if not event_overlaps_staged(event, lines_by_file):
                    continue

                agent = str(event.get("agent") or "human")
                # Never attribute to excluded agents, even as a last resort.
                if agent in _EXCLUDED_AGENTS:
                    continue

                prev_ts, _ = best.get(file_path, (0, None))
                if event_ts >= prev_ts:
                    # Most recent non-excluded agent wins per file.
                    best[file_path] = (event_ts, event)
    except Exception:
        return {}
    return {fp: ev for fp, (_, ev) in best.items()}


def read_intent_events(
    path: str,
    min_ts: int,
) -> List[dict]:
    """Read intent events from session.jsonl (type=intent, written by set_intent MCP tool).

    Returns all matching intent events sorted by timestamp (most recent last).
    """
    if not os.path.exists(path):
        return []
    results = []
    try:
        with open(path, "r", encoding="utf-8") as f:
            for raw in f:
                line = raw.strip()
                if not line:
                    continue
                try:
                    event = json.loads(line)
                except Exception:
                    continue
                if not isinstance(event, dict):
                    continue
                if event.get("type") != "intent":
                    continue
                event_ts = parse_event_ts(str(event.get("timestamp") or ""))
                if event_ts < min_ts:
                    continue
                results.append(event)
    except Exception:
        pass
    results.sort(key=lambda e: parse_event_ts(str(e.get("timestamp") or "")))
    return results


def dominant_event(events_by_file: Dict[str, dict], lines_by_file: Dict[str, List[Tuple[int, int]]]) -> dict:
    """Pick the agent/model to use as the top-level record field.

    Chooses the agent that wrote the most lines across all staged files.
    Falls back to most-recently-touched file if line counts are equal.
    """
    if not events_by_file:
        return {}
    line_count: Dict[str, int] = {}
    for fp, event in events_by_file.items():
        agent = str(event.get("agent") or "human")
        ranges = lines_by_file.get(fp, [])
        count = sum(max(0, b - a + 1) for a, b in ranges)
        line_count[agent] = line_count.get(agent, 0) + count
    dominant_agent = max(line_count, key=lambda a: line_count[a])
    # Return the most recent event for the dominant agent
    for fp, event in reversed(list(events_by_file.items())):
        if str(event.get("agent") or "human") == dominant_agent:
            return event
    return next(iter(events_by_file.values()))


def get_git_username(repo_root: str) -> str:
    """Return git user.name; falls back to 'human' if unset."""
    try:
        out = run(["git", "config", "user.name"], cwd=repo_root)
        name = out.stdout.strip()
        return name if name else "human"
    except Exception:
        return "human"


def prompt_excerpt(prompt: str, limit: int = 160) -> str:
    text = " ".join((prompt or "").split())
    if not text:
        return ""
    if len(text) <= limit:
        return text
    return text[: max(0, limit - 3)] + "..."


def sha256_text(text: str) -> str:
    return hashlib.sha256((text or "").encode("utf-8")).hexdigest()


def main() -> int:
    if len(sys.argv) < 5:
        print(
            "usage: prepare-ledger.py <repo_root> <session_log> <pending_context> <pending_ledger>",
            file=sys.stderr,
        )
        return 2

    repo_root = os.path.abspath(sys.argv[1])
    session_log = os.path.abspath(sys.argv[2])
    pending_context_path = os.path.abspath(sys.argv[3])
    pending_ledger_path = os.path.abspath(sys.argv[4])

    if not os.path.exists(os.path.join(repo_root, ".git")):
        return 0

    lines_by_file = parse_staged_changes(repo_root)
    if not lines_by_file:
        return 0

    # Exclude agentdiff metadata from attribution — it's auto-generated, not AI code.
    lines_by_file = {
        f: r for f, r in lines_by_file.items()
        if not f.startswith(".agentdiff/")
    }
    if not lines_by_file:
        return 0

    files_touched = sorted(lines_by_file.keys())
    pending = read_json_file(pending_context_path)
    if not isinstance(pending, dict):
        pending = {}

    min_ts = head_commit_ts(repo_root)
    events_by_file = read_events_per_file(
        session_log,
        files_touched,
        min_ts,
        lines_by_file,
    )

    # Read agent-stated intent events from session.jsonl (written by set_intent MCP tool)
    intent_events = read_intent_events(session_log, min_ts)

    # Top-level agent/model/session come from the dominant event (most lines written)
    event = dominant_event(events_by_file, lines_by_file) or {}

    prompt = str(pending.get("prompt") or event.get("prompt") or "")
    session_id = str(pending.get("session_id") or event.get("session_id") or "unknown")
    agent = str(pending.get("agent") or event.get("agent") or "human")
    git_author = get_git_username(repo_root)
    model = str(pending.get("model_id") or pending.get("model") or event.get("model") or "human")
    files_read = pending.get("files_read")
    if not isinstance(files_read, list):
        files_read = []
    files_read = [str(p) for p in files_read]

    trust = pending.get("trust")
    if isinstance(trust, bool) or not isinstance(trust, int):
        trust = None
    if isinstance(trust, int):
        trust = max(0, min(100, trust))

    flags = pending.get("flags")
    if not isinstance(flags, list):
        flags = []
    flags = [str(f) for f in flags]

    # Intent priority: agent-stated intent event > pending context > fallback
    intent = None
    intent_type = None
    if intent_events:
        # Use the most recent intent event (prefer matching session, else latest)
        best_intent = None
        if session_id and session_id != "unknown":
            for ie in reversed(intent_events):
                if str(ie.get("session_id") or "") == session_id:
                    best_intent = ie
                    break
        if not best_intent:
            best_intent = intent_events[-1]
        intent = str(best_intent.get("description") or "").strip() or None
        intent_type = str(best_intent.get("intent_type") or "").strip() or None

    if not intent:
        raw = pending.get("intent")
        if raw is not None:
            intent = str(raw).strip() or None

    if not intent_type:
        raw = pending.get("intent_type")
        if raw is not None:
            intent_type = str(raw).strip() or None

    # Per-file attribution — each file maps to the agent/model that wrote it.
    # Files with a session event that matches the dominant agent are omitted (finalize
    # falls back to the top-level agent). Files with NO session evidence at all are
    # explicitly marked "human" so they are never incorrectly claimed by the dominant agent.
    attribution: Dict[str, dict] = {}
    for fp, ev in events_by_file.items():
        file_agent = str(ev.get("agent") or "human")
        file_model = str(ev.get("model") or "human")
        if file_agent != agent or file_model != model:
            attribution[fp] = {
                "agent": file_agent,
                "model": file_model,
                "session_id": str(ev.get("session_id") or "unknown"),
                "tool": str(ev.get("tool") or "commit"),
            }

    # Files committed with no captured session event → attribute to human.
    # Exception: if the file appears in files_read from the MCP pending context and
    # there is a non-human top-level agent, the MCP context is sufficient evidence —
    # use the MCP agent/model rather than falling back to human.
    files_read_rel = {
        os.path.relpath(f, repo_root) if os.path.isabs(f) and f.startswith(repo_root) else f
        for f in files_read
    }
    for fp in files_touched:
        if fp not in events_by_file:
            if agent != "human" and fp in files_read_rel:
                attribution[fp] = {"agent": agent, "model": model}
            else:
                attribution[fp] = {"agent": "human", "model": "human"}

    payload = {
        "captured_at": datetime.now(timezone.utc).isoformat(),
        "agent": agent,
        "git_author": git_author,
        "model": model,
        "session_id": session_id,
        "files_touched": files_touched,
        "lines": {
            file_path: [[int(a), int(b)] for (a, b) in ranges]
            for file_path, ranges in lines_by_file.items()
        },
        "prompt_excerpt": prompt_excerpt(prompt),
        "prompt_hash": sha256_text(prompt),
        "files_read": files_read,
        "flags": flags,
        "tool": str(event.get("tool") or "commit"),
        "mode": event.get("mode"),
    }
    if attribution:
        payload["attribution"] = attribution
    if intent:
        payload["intent"] = intent
    if intent_type:
        payload["intent_type"] = intent_type
    if trust is not None:
        payload["trust"] = trust

    parent = os.path.dirname(pending_ledger_path)
    if parent:
        os.makedirs(parent, exist_ok=True)
    with open(pending_ledger_path, "w", encoding="utf-8") as f:
        json.dump(payload, f, separators=(",", ":"))
    return 0


if __name__ == "__main__":
    raise SystemExit(main())