Skip to main content

pond/adapter/
claude_code.rs

1//! Claude Code CLI adapter.
2//!
3//! Source path: `~/.claude/projects/<encoded-project-path>/<session-uuid>.jsonl`.
4//! Each `.jsonl` file is one session; lines are typed entries linked via a
5//! `parentUuid` -> `uuid` chain. Tool results arrive as `user` entries whose
6//! `message.content[]` contains `tool_result` blocks with a parallel
7//! `toolUseResult` field carrying structured data.
8
9use std::{
10    collections::{HashMap, HashSet},
11    hash::{Hash, Hasher},
12    path::{Path, PathBuf},
13};
14
15use anyhow::Context as _;
16use chrono::{DateTime, SecondsFormat, Utc};
17use serde_json::{Value, json};
18
19use crate::{
20    sessions::IngestEvent,
21    wire::{FileData, Message, Part, PartKind, Provenance, ProviderOptions, Session},
22};
23
24use super::{
25    Adapter, AdapterError, AdapterFactory, AdapterYieldStream, DiscoverFuture, Env,
26    RestoreFidelity, RestoredFile, SkipOracle, by_timestamp_then_id, compact_json, config_path,
27    empty_options,
28    extract::{
29        Extracted, Source, extract_compact_repr, extract_raw_record, extract_self_str, extract_str,
30    },
31    extracted_text,
32    jsonl::{
33        BoundedRow, JsonlTree, jsonl_tree_discover, jsonl_tree_events, peek_last_mapped,
34        source_line,
35    },
36    jsonl_bytes, part_id, part_ordinal, raw_record,
37};
38
39/// Per-file streaming state that persists across rows of one JSONL file.
40/// Lives inside [`Adapter::events`]'s per-file loop and is reset whenever
41/// the loop advances to the next file.
42///
43/// Two responsibilities:
44///
45/// 1. **Replay dedup.** Claude Code's `/resume` and `/compact` paths
46///    occasionally re-emit byte-identical rows with the same `uuid` (the
47///    stale-`messageSet`-cache bug in claude-code, see
48///    `utils/sessionStorage.ts`). The adapter dedupes only byte-identical
49///    replays; same-uuid/different-content reaches the validator visibly
50///    (spec.md#adapter-integrity-dedup).
51///
52/// 2. **`tool_use_id -> tool name` resolution.** The raw `tool_result` row
53///    carries only `tool_use_id`, not the tool name; the name lives on the
54///    prior `tool_use` row in the same file. We populate this map when we
55///    see a `tool_use` part, then look it up when we see the matching
56///    `tool_result` part. Misses (e.g. compaction pruned the tool_use)
57///    surface as `name: None` in `PartKind::ToolResult` rather than the
58///    old `"unknown"` sentinel - faithful to the source rather than
59///    inventing a value.
60#[derive(Debug, Default)]
61pub(crate) struct FileState {
62    seen_records: HashSet<(String, u64)>,
63    tool_call_names: HashMap<String, Extracted<String>>,
64}
65
66/// Stable adapter name. Surfaces as the `[adapters.claude-code]` config key,
67/// the `pond sync claude-code` CLI arg, and `Session.source_agent` on every
68/// emitted row.
69const NAME: &str = "claude-code";
70
71/// Stateless factory: opens [`ClaudeCodeAdapter`] instances from config and
72/// probes for the canonical install location under `~/.claude/projects`.
73pub struct ClaudeCodeFactory;
74
75impl AdapterFactory for ClaudeCodeFactory {
76    fn name(&self) -> &'static str {
77        NAME
78    }
79
80    fn open(&self, config: Value) -> Result<Box<dyn Adapter>, AdapterError> {
81        Ok(Box::new(ClaudeCodeAdapter::new(config_path(NAME, config)?)))
82    }
83
84    fn probe_default(&self, env: &Env) -> Option<Value> {
85        let path = env.home.join(".claude").join("projects");
86        path.exists().then(|| json!({ "path": path }))
87    }
88
89    fn serialize(
90        &self,
91        session: &crate::sessions::SessionWithMessages,
92        fidelity: RestoreFidelity,
93    ) -> Result<Vec<RestoredFile>, AdapterError> {
94        serialize_session(session, fidelity)
95    }
96}
97
98fn serialize_session(
99    session: &crate::sessions::SessionWithMessages,
100    fidelity: RestoreFidelity,
101) -> Result<Vec<RestoredFile>, AdapterError> {
102    let mut messages = session.messages.clone();
103    if fidelity == RestoreFidelity::Native {
104        messages.sort_by(|left, right| {
105            source_line(left.message.options())
106                .cmp(&source_line(right.message.options()))
107                .then_with(|| by_timestamp_then_id(left, right))
108        });
109    } else {
110        messages.sort_by(by_timestamp_then_id);
111    }
112    // Native replays the verbatim `options.source.raw_record`; `claude_record`
113    // below is foreign-only. Replay echoes a frozen snapshot - safe only while
114    // canonical is append-only (spec.md#adapter-integrity-additive-sync).
115    let mut records = Vec::with_capacity(messages.len());
116    let mut parent_uuid = None::<String>;
117    for message in &messages {
118        if fidelity == RestoreFidelity::Native
119            && let Some(raw) = raw_record(message.message.options())
120        {
121            parent_uuid = raw
122                .get("uuid")
123                .and_then(Value::as_str)
124                .map(ToOwned::to_owned)
125                .or(parent_uuid);
126            records.push(raw);
127            continue;
128        }
129        // `claude_record` returns `None` for a dropped System message;
130        // `parent_uuid` then stays put so the chain skips over the gap.
131        let Some(record) = claude_record(session, message, parent_uuid.as_deref()) else {
132            continue;
133        };
134        parent_uuid = record
135            .get("uuid")
136            .and_then(Value::as_str)
137            .map(ToOwned::to_owned);
138        records.push(record);
139    }
140
141    let mut files = vec![RestoredFile::new(
142        claude_relative_path(session),
143        jsonl_bytes(NAME, &records)?,
144        fidelity,
145    )];
146    if session.session.parent_session_id.is_some()
147        && let Some(meta) = subagent_meta_record(session)
148    {
149        let mut meta_path = files[0].relative_path.clone();
150        meta_path.set_extension("meta.json");
151        files.push(RestoredFile::new(
152            meta_path,
153            serde_json::to_vec(&meta).map_err(|err| {
154                AdapterError::schema(
155                    NAME,
156                    &session.session.id,
157                    format!("json encode failed: {err}"),
158                )
159            })?,
160            fidelity,
161        ));
162    }
163    Ok(files)
164}
165
166fn claude_relative_path(session: &crate::sessions::SessionWithMessages) -> PathBuf {
167    let encoded_project = session
168        .session
169        .options
170        .get("source")
171        .and_then(|source| source.get("project_dir"))
172        .and_then(Value::as_str)
173        .map(ToOwned::to_owned)
174        .unwrap_or_else(|| encode_project(&session.session.project));
175    if let Some(parent) = &session.session.parent_session_id {
176        // The child id is `<parent>/<child_suffix>`; the suffix is the file's
177        // path under `subagents/` (`agent-<hash>` flat, or
178        // `workflows/<wf-id>/agent-<hash>` nested), so stripping the parent
179        // prefix reconstructs the on-disk path verbatim.
180        let child_suffix = session
181            .session
182            .id
183            .strip_prefix(&format!("{parent}/"))
184            .unwrap_or(&session.session.id);
185        return PathBuf::from(encoded_project)
186            .join(parent)
187            .join("subagents")
188            .join(format!("{child_suffix}.jsonl"));
189    }
190    PathBuf::from(encoded_project).join(format!("{}.jsonl", session.session.id))
191}
192
193fn encode_project(project: &str) -> String {
194    project.replace(['/', '.'], "-")
195}
196
197fn subagent_meta_record(session: &crate::sessions::SessionWithMessages) -> Option<Value> {
198    // Restore the sidecar `.meta.json` verbatim from the stored copy. A
199    // subagent ingested without a meta file stored `meta: null` - nothing
200    // to write back.
201    let meta = session.session.options.get("subagent")?.get("meta")?;
202    meta.is_object().then(|| meta.clone())
203}
204
205fn claude_record(
206    session: &crate::sessions::SessionWithMessages,
207    message: &crate::sessions::MessageWithParts,
208    parent_uuid: Option<&str>,
209) -> Option<Value> {
210    // Foreign restore into Claude Code (native restore re-emits the stored
211    // `raw_record` and never reaches here). Claude Code's transcript has only
212    // `user` and `assistant` rows: a tool result is a `user` row, and there
213    // is no in-transcript system turn - a System message (a rule-3 carrier or
214    // a source's own system/developer turn) has no idiomatic home and is
215    // dropped; the content stays in canonical (spec.md#adapter-native-restore-lossless,
216    // foreign clause).
217    let row_role = match &message.message {
218        Message::System { .. } => return None,
219        Message::User { .. } | Message::Tool { .. } => "user",
220        Message::Assistant { .. } => "assistant",
221    };
222    let mut envelope = serde_json::Map::new();
223    envelope.insert("role".to_owned(), Value::String(row_role.to_owned()));
224    if row_role == "assistant" {
225        // `type:"message"` is the Anthropic Messages API object discriminator
226        // - a constant, always present on a real assistant row.
227        envelope.insert("type".to_owned(), Value::String("message".to_owned()));
228    }
229    envelope.insert(
230        "content".to_owned(),
231        Value::Array(message.parts.iter().map(claude_part).collect()),
232    );
233    Some(json!({
234        "parentUuid": parent_uuid,
235        "isSidechain": false,
236        "userType": "external",
237        "cwd": &*session.session.project,
238        "sessionId": &session.session.id,
239        "type": row_role,
240        "message": Value::Object(envelope),
241        "uuid": message.message.id(),
242        "timestamp": message.message.timestamp().to_rfc3339_opts(SecondsFormat::Millis, true),
243    }))
244}
245
246fn claude_part(part: &Part) -> Value {
247    match &part.kind {
248        PartKind::Text { text } => json!({"type": "text", "text": extracted_text(text)}),
249        PartKind::Reasoning { text } => {
250            json!({"type": "thinking", "thinking": extracted_text(text)})
251        }
252        PartKind::ToolCall {
253            call_id,
254            name,
255            params,
256            provider_executed,
257        } => json!({
258            "type": if *provider_executed { "server_tool_use" } else { "tool_use" },
259            "id": extracted_text(call_id),
260            "name": extracted_text(name),
261            "input": params,
262        }),
263        PartKind::ToolResult {
264            call_id,
265            is_failure,
266            result,
267            ..
268        } => json!({
269            "type": "tool_result",
270            "tool_use_id": extracted_text(call_id),
271            "is_error": is_failure,
272            "content": result,
273        }),
274        PartKind::File {
275            media_type,
276            file_name,
277            data,
278        } => json!({
279            "type": "file",
280            "media_type": media_type,
281            "file_name": file_name,
282            "source": file_source(data),
283        }),
284        other => {
285            json!({"type": "text", "text": compact_json(&serde_json::to_value(other).unwrap_or(Value::Null))})
286        }
287    }
288}
289
290fn file_source(data: &FileData) -> Value {
291    match data {
292        FileData::String(value) => json!({"type": "text", "data": value}),
293        FileData::Bytes(value) => json!({"type": "base64", "data": value}),
294        FileData::Url(value) => json!({"type": "url", "url": value}),
295    }
296}
297
298/// Configured claude-code reader. Walks a tree of `*.jsonl` files under
299/// [`Self::root`] and yields canonical events in source order per session.
300#[derive(Debug, Clone)]
301pub struct ClaudeCodeAdapter {
302    root: PathBuf,
303}
304
305impl ClaudeCodeAdapter {
306    pub fn new(root: impl Into<PathBuf>) -> Self {
307        Self { root: root.into() }
308    }
309}
310
311impl Adapter for ClaudeCodeAdapter {
312    fn discover(&self) -> DiscoverFuture<'_> {
313        jsonl_tree_discover(self)
314    }
315
316    fn events_with<'a>(&'a self, oracle: &'a dyn SkipOracle) -> AdapterYieldStream<'a> {
317        jsonl_tree_events(self, oracle)
318    }
319}
320
321impl JsonlTree for ClaudeCodeAdapter {
322    type State = FileState;
323
324    fn name(&self) -> &'static str {
325        NAME
326    }
327
328    fn root(&self) -> &Path {
329        &self.root
330    }
331
332    fn peek_session_id(&self, path: &Path, first_line: &str) -> Option<String> {
333        // A file under `subagents/` takes its id from the path, never from the
334        // row's content `sessionId` (that's the parent's). A recognized child
335        // peeks to its child id; an unrecognized one returns `None` so it stays
336        // out of the freshness gate and its `unsupported_reason` failure
337        // re-surfaces on every sync rather than being skipped as `Fresh` under
338        // the parent's borrowed watermark. See spec.md#datasets.
339        if subagents_dir(path).is_some() {
340            let (parent_uuid, child_suffix, _) = subagent_ids(path)?;
341            return Some(format!("{parent_uuid}/{child_suffix}"));
342        }
343        let row: Value = serde_json::from_str(first_line).ok()?;
344        row.get("sessionId")?.as_str().map(ToOwned::to_owned)
345    }
346
347    fn peek_last_ts(&self, path: &Path) -> Option<i64> {
348        // Claude Code appends trailing metadata rows (`last-prompt`,
349        // `permission-mode`, `bridge-session`, ...) with no timestamp after the
350        // conversation, so the literal last line is usually not a message. Walk
351        // back to the latest row that carries a timestamp - the real watermark.
352        // Taking only the last line stranded ~2k sessions perpetually un-fresh,
353        // re-decoding ~1.2M already-stored rows every sync.
354        peek_last_mapped(path, |line| {
355            let row: Value = serde_json::from_str(line).ok()?;
356            Some(parse_timestamp(&row).ok()?.timestamp_micros())
357        })
358    }
359
360    fn session(&self, path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
361        session_from_rows(path, rows)
362    }
363
364    fn events_from_row(
365        &self,
366        session: &Session,
367        row: &BoundedRow,
368        state: &mut Self::State,
369    ) -> Result<Vec<IngestEvent>, String> {
370        if let Some(uuid) = row.value.get("uuid").and_then(Value::as_str)
371            && !state
372                .seen_records
373                .insert((uuid.to_owned(), source_record_hash(&row.value)))
374        {
375            return Ok(Vec::new());
376        }
377        capture_tool_call_names(&row.value, &mut state.tool_call_names);
378        events_from_row(&session.id, row.line, &row.value, session.created_at, state)
379    }
380
381    fn unsupported_reason(&self, path: &Path) -> Option<String> {
382        // A `.jsonl` under a `subagents/` ancestor that we can't resolve to a
383        // child id (its leaf isn't `agent-<hash>.jsonl`) must NOT fall back to
384        // its content `sessionId` - that id is the parent's, so it would
385        // silently merge into the parent session. Fail visibly and wait for an
386        // adapter update instead - EXCEPT the Workflow runner's `journal.jsonl`,
387        // a known control file that carries no `sessionId`, so it falls through
388        // to `session()` and is dropped as a benign Empty skip rather than
389        // flagged as an unrecognized transcript layout. See spec.md#datasets.
390        if subagents_dir(path).is_some()
391            && subagent_ids(path).is_none()
392            && !is_workflow_control_file(path)
393        {
394            return Some(format!(
395                "{}: subagent transcript layout not recognized by this pond version; \
396                 skipped so it is not merged into the parent session - update pond and \
397                 re-run `pond sync`",
398                path.display()
399            ));
400        }
401        None
402    }
403}
404
405// spec.md#adapter-integrity-dedup: hash only semantic fields so noise-field
406// replays (timestamp, requestId, isMeta, gitBranch, version, ...) dedupe;
407// real content diffs still reach the validator.
408fn source_record_hash(value: &Value) -> u64 {
409    let mut hasher = std::collections::hash_map::DefaultHasher::new();
410    let pick = |path: &[&str]| -> &Value {
411        let mut cur = value;
412        for key in path {
413            match cur.get(*key) {
414                Some(next) => cur = next,
415                None => return &Value::Null,
416            }
417        }
418        cur
419    };
420    for path in [
421        &["type"][..],
422        &["parentUuid"][..],
423        &["message", "role"][..],
424        &["message", "content"][..],
425        &["toolUseResult"][..],
426    ] {
427        compact_json(pick(path)).hash(&mut hasher);
428    }
429    hasher.finish()
430}
431
432/// The Workflow runner writes `journal.jsonl` (its resume/cache journal of agent
433/// `started`/`result` events) beside the `agent-<hash>.jsonl` transcripts under
434/// `subagents/workflows/<wf-id>/`. It carries no `sessionId` and only duplicates
435/// content already in those transcripts, so it is a control file to ignore (a
436/// benign Empty skip), not an unrecognized transcript layout. See spec.md#datasets.
437fn is_workflow_control_file(path: &Path) -> bool {
438    subagents_dir(path).is_some()
439        && path.file_name().and_then(|n| n.to_str()) == Some("journal.jsonl")
440}
441
442/// Walk one raw row's `message.content[]` array (if any) and stash every
443/// `tool_use` part's `id -> name` mapping into the per-file map. Idempotent
444/// and safe to call on every row regardless of role; non-assistant rows
445/// just don't contribute entries.
446fn capture_tool_call_names(row: &Value, map: &mut HashMap<String, Extracted<String>>) {
447    let Some(items) = row
448        .get("message")
449        .and_then(|message| message.get("content"))
450        .and_then(Value::as_array)
451    else {
452        return;
453    };
454    for item in items {
455        let kind = item.get("type").and_then(Value::as_str);
456        if !matches!(kind, Some("tool_use") | Some("server_tool_use")) {
457            continue;
458        }
459        let (Some(id), Some(name)) = (item.str_field("id"), extract_str(item, "name")) else {
460            continue;
461        };
462        map.insert(id.to_owned(), name);
463    }
464}
465
466fn session_from_rows(path: &Path, rows: &[BoundedRow]) -> Result<Session, AdapterError> {
467    let path_display = path.display().to_string();
468    // A non-agent leaf under `subagents/` (e.g. the Workflow runner's
469    // journal.jsonl) would borrow the parent's content `sessionId` and silently
470    // merge; refuse structurally rather than rely on the row lacking one.
471    // spec.md#datasets.
472    if subagents_dir(path).is_some() && subagent_ids(path).is_none() {
473        return Err(AdapterError::schema(
474            NAME,
475            path_display,
476            "sidecar/control file under subagents/ has no session of its own",
477        ));
478    }
479    let mut created_at = None;
480    let mut project: Option<Extracted<String>> = None;
481    let mut version = None;
482    for row in rows {
483        if created_at.is_none() {
484            created_at = parse_timestamp(&row.value).ok();
485        }
486        if project.is_none() {
487            project = extract_str(&row.value, "cwd");
488        }
489        if version.is_none() {
490            version = row
491                .value
492                .get("version")
493                .and_then(Value::as_str)
494                .map(ToOwned::to_owned);
495        }
496    }
497
498    let first = rows
499        .first()
500        .ok_or_else(|| AdapterError::schema(NAME, path_display.clone(), "empty jsonl session"))?;
501    let at_first = format!("{path_display}:{}", first.line);
502    let raw_session_id = first
503        .value
504        .get("sessionId")
505        .and_then(Value::as_str)
506        .ok_or_else(|| {
507            AdapterError::schema(
508                NAME,
509                at_first.clone(),
510                format!("line {} missing sessionId", first.line),
511            )
512        })?
513        .to_owned();
514    let created_at = created_at.ok_or_else(|| {
515        AdapterError::schema(NAME, at_first, "session has no parseable timestamp")
516    })?;
517
518    // Subagent detection. Claude Code stores each subagent's transcript under
519    // the session's `subagents/` sidecar - either flat
520    // (`<parent_dir>/<parent_uuid>/subagents/agent-<hash>.jsonl`) or, for the
521    // workflow runner, nested
522    // (`.../subagents/workflows/<wf-id>/agent-<hash>.jsonl`) - with a sibling
523    // `agent-<hash>.meta.json` carrying `{agentType, description}`. Every such
524    // file shares the parent's `sessionId` in row content, so ingesting it under
525    // that id collides with the parent (the validator's "project is immutable"
526    // rule rejects a cwd-shifted one, and a same-cwd one silently merges). The
527    // fix is to derive a child id from the path - keyed off the `subagents/`
528    // ancestor at any depth - and link back via `parent_session_id`. See
529    // spec.md#datasets.
530    let subagent = subagent_descriptor(path);
531    let project_dir = source_project_dir(path, subagent.is_some());
532    let (session_id, parent_session_id, source_agent, subagent_options) = match subagent {
533        Some(SubagentDescriptor {
534            parent_uuid,
535            child_suffix,
536            agent_hash,
537            agent_type,
538            meta,
539        }) => {
540            let child_id = format!("{parent_uuid}/{child_suffix}");
541            let agent_label = agent_type
542                .as_deref()
543                .map(|t| format!("claude-code/{t}"))
544                .unwrap_or_else(|| "claude-code/subagent".to_owned());
545            // `meta` is the verbatim `.meta.json`; `hash` and `raw_session_id`
546            // are pond-derived (filename hash + parent sessionId). Storing the
547            // whole meta keeps native restore of the sidecar lossless.
548            let metadata = json!({
549                "hash": agent_hash,
550                "raw_session_id": raw_session_id,
551                "meta": meta,
552            });
553            (child_id, Some(parent_uuid), agent_label, Some(metadata))
554        }
555        None => (raw_session_id, None, "claude-code".to_owned(), None),
556    };
557
558    let project = match project {
559        Some(value) => value,
560        None => {
561            let decoded = path
562                .parent()
563                .and_then(|p| p.file_name())
564                .and_then(|n| n.to_str())
565                .map(|s| s.replace('-', "/"))
566                .ok_or_else(|| {
567                    AdapterError::schema(
568                        NAME,
569                        path_display.clone(),
570                        "no `cwd` field in any row and source path is not UTF-8",
571                    )
572                })?;
573            extract_self_str(&Value::String(decoded)).ok_or_else(|| {
574                AdapterError::schema(
575                    NAME,
576                    path_display.clone(),
577                    "internal: Value::String produced None from Source::as_str",
578                )
579            })?
580        }
581    };
582
583    let mut options = ProviderOptions::new();
584    options.insert(
585        "source".to_owned(),
586        json!({
587            "adapter": "claude-code",
588            "version": version,
589            "project_dir": project_dir,
590            "workspace_path": &*project,
591        }),
592    );
593    if let Some(metadata) = subagent_options {
594        options.insert("subagent".to_owned(), metadata);
595    }
596
597    Ok(Session {
598        id: session_id,
599        parent_session_id,
600        parent_message_id: None,
601        source_agent,
602        created_at,
603        project,
604        options,
605    })
606}
607
608fn source_project_dir(path: &Path, is_subagent: bool) -> Option<String> {
609    // The project dir is the grandparent of `subagents/` regardless of how
610    // deeply the transcript nests below it (`.../<project>/<parent_uuid>/
611    // subagents/...`), so climb from the `subagents/` ancestor rather than a
612    // fixed number of `.parent()` hops.
613    let project_dir = if is_subagent {
614        subagents_dir(path)?.parent()?.parent()
615    } else {
616        path.parent()
617    };
618    project_dir
619        .and_then(|p| p.file_name())
620        .and_then(|n| n.to_str())
621        .map(ToOwned::to_owned)
622}
623
624/// The `subagents/` directory in `path`'s ancestry, if any. Depth-independent:
625/// matches both the flat `<parent_uuid>/subagents/agent-<hash>.jsonl` and the
626/// nested workflow `<parent_uuid>/subagents/workflows/<wf-id>/agent-<hash>.jsonl`
627/// layouts. The directory directly above it is the parent session uuid.
628fn subagents_dir(path: &Path) -> Option<&Path> {
629    let mut cur = path.parent();
630    while let Some(dir) = cur {
631        if dir.file_name().and_then(|n| n.to_str()) == Some("subagents") {
632            return Some(dir);
633        }
634        cur = dir.parent();
635    }
636    None
637}
638
639/// Resolved metadata for one subagent JSONL file. `agent_type` is read from
640/// the sibling `.meta.json` for the `source_agent` label; `meta` keeps that
641/// file's full verbatim content so native restore reproduces it
642/// (spec.md#adapter-native-restore-lossless). Both are `None` when the meta file is
643/// absent or unreadable (the label falls back to `claude-code/subagent`).
644struct SubagentDescriptor {
645    parent_uuid: String,
646    child_suffix: String,
647    agent_hash: String,
648    agent_type: Option<String>,
649    meta: Option<Value>,
650}
651
652/// `(parent_uuid, child_suffix, agent_hash)` for a subagent transcript, or
653/// `None` for any path without a `subagents/` ancestor or a non-`agent-<hash>`
654/// leaf (the common case: top-level session files). `child_suffix` is the file's
655/// path relative to its `subagents/` ancestor with `.jsonl` stripped -
656/// `agent-<hash>` flat, `workflows/<wf-id>/agent-<hash>` nested - so the derived
657/// child id `<parent_uuid>/<child_suffix>` round-trips back to the on-disk path
658/// on native restore. `agent_hash` keys the sibling `.meta.json` lookup.
659fn subagent_ids(path: &Path) -> Option<(String, String, String)> {
660    let file_name = path.file_name()?.to_str()?;
661    let agent_hash = file_name
662        .strip_prefix("agent-")?
663        .strip_suffix(".jsonl")?
664        .to_owned();
665    let subagents = subagents_dir(path)?;
666    let parent_uuid = subagents.parent()?.file_name()?.to_str()?.to_owned();
667    // The child id must be `/`-canonical on every platform (the rest of the
668    // adapter and `claude_relative_path` assume `/`), but a relative path carries
669    // the OS separator - normalize it. No-op on POSIX.
670    let child_suffix = path
671        .strip_prefix(subagents)
672        .ok()?
673        .with_extension("")
674        .to_str()?
675        .replace(std::path::MAIN_SEPARATOR, "/");
676    Some((parent_uuid, child_suffix, agent_hash))
677}
678
679/// [`subagent_ids`] plus the sibling `agent-<hash>.meta.json` - `agentType` for
680/// the `source_agent` label, the whole file for lossless sidecar restore.
681fn subagent_descriptor(path: &Path) -> Option<SubagentDescriptor> {
682    let (parent_uuid, child_suffix, agent_hash) = subagent_ids(path)?;
683    let meta_path = path.parent()?.join(format!("agent-{agent_hash}.meta.json"));
684    let (agent_type, meta) = match std::fs::read(&meta_path) {
685        Ok(bytes) => match serde_json::from_slice::<Value>(&bytes) {
686            Ok(value) => (
687                value
688                    .get("agentType")
689                    .and_then(Value::as_str)
690                    .map(ToOwned::to_owned),
691                Some(value),
692            ),
693            Err(error) => {
694                tracing::debug!(
695                    target: "pond::adapter::claude_code",
696                    meta = %meta_path.display(),
697                    %error,
698                    "subagent .meta.json present but unparseable; falling back to 'claude-code/subagent'",
699                );
700                (None, None)
701            }
702        },
703        Err(error) if error.kind() == std::io::ErrorKind::NotFound => (None, None),
704        Err(error) => {
705            tracing::debug!(
706                target: "pond::adapter::claude_code",
707                meta = %meta_path.display(),
708                %error,
709                "subagent .meta.json IO error; falling back to 'claude-code/subagent'",
710            );
711            (None, None)
712        }
713    };
714
715    Some(SubagentDescriptor {
716        parent_uuid,
717        child_suffix,
718        agent_hash,
719        agent_type,
720        meta,
721    })
722}
723
724fn events_from_row(
725    session_id: &str,
726    line: usize,
727    row: &Value,
728    default_timestamp: DateTime<Utc>,
729    state: &FileState,
730) -> Result<Vec<IngestEvent>, String> {
731    let timestamp = parse_timestamp(row).unwrap_or(default_timestamp);
732    let uuid = row
733        .get("uuid")
734        .and_then(Value::as_str)
735        .map_or_else(|| format!("{session_id}:{line}"), ToOwned::to_owned);
736
737    if let Some(message_value) = row.get("message") {
738        return message_events(
739            session_id,
740            &uuid,
741            timestamp,
742            row,
743            message_value,
744            state,
745            line,
746        );
747    }
748
749    // Rows with no `message` field are session-metadata records:
750    // `queue-operation`, `permission-mode`, `last-prompt`, `attachment`,
751    // `progress`, `system`, `custom-title`, etc. We preserve them as
752    // System messages with the row's compact JSON in `content` so a future
753    // exporter could reconstruct the original transcript; the `subtype`
754    // becomes the human label via `options.source.raw_type`.
755    let raw_type = row.get("type").and_then(Value::as_str);
756    let content = if raw_type == Some("attachment") {
757        row.get("attachment")
758            .and_then(attachment_content)
759            .or_else(|| Some(extract_compact_repr(row)))
760    } else {
761        extract_str(row, "subtype").or_else(|| extract_str(row, "type"))
762    };
763    let message = Message::System {
764        id: uuid,
765        session_id: session_id.to_owned(),
766        timestamp,
767        content,
768        options: row_options(row, line),
769    };
770    Ok(vec![IngestEvent::Message(message)])
771}
772
773fn message_events(
774    session_id: &str,
775    uuid: &str,
776    timestamp: DateTime<Utc>,
777    row: &Value,
778    message_value: &Value,
779    state: &FileState,
780    line: usize,
781) -> Result<Vec<IngestEvent>, String> {
782    let role = message_value
783        .get("role")
784        .and_then(Value::as_str)
785        .ok_or_else(|| "message missing role".to_owned())?;
786    let content = message_value.get("content").unwrap_or(&Value::Null);
787    let mut parts = Vec::new();
788    let message = match (role, content) {
789        ("user", Value::String(text)) => {
790            // spec.md#model-part-provenance: a user-slot turn is conversation only
791            // when it is a genuine human prompt; harness-injected wrappers and
792            // `isMeta` rows are scaffolding.
793            let provenance = user_text_provenance(row, text);
794            parts.push(text_part(
795                session_id,
796                uuid,
797                0,
798                extract_self_str(content),
799                provenance,
800            ));
801            Message::User {
802                id: uuid.to_owned(),
803                session_id: session_id.to_owned(),
804                timestamp,
805                options: row_options(row, line),
806            }
807        }
808        ("user", Value::Array(items)) if items.iter().all(is_tool_result) => {
809            let source_tool_result = row.get("toolUseResult").cloned();
810            parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
811                tool_result_part(
812                    session_id,
813                    uuid,
814                    ordinal,
815                    item,
816                    source_tool_result.as_ref(),
817                    state,
818                )
819            }));
820            Message::Tool {
821                id: uuid.to_owned(),
822                session_id: session_id.to_owned(),
823                timestamp,
824                options: row_options(row, line),
825            }
826        }
827        ("user", Value::Array(items)) => {
828            // Classify the whole user message once: v1 claude-code never mixes
829            // provenance within a single message (spec.md#model-part-provenance).
830            let provenance = user_array_provenance(row, items);
831            parts.extend(items.iter().enumerate().map(|(ordinal, item)| {
832                user_part(session_id, uuid, ordinal, item, state, provenance)
833            }));
834            Message::User {
835                id: uuid.to_owned(),
836                session_id: session_id.to_owned(),
837                timestamp,
838                options: row_options(row, line),
839            }
840        }
841        ("assistant", Value::Array(items)) => {
842            parts.extend(
843                items
844                    .iter()
845                    .enumerate()
846                    .map(|(ordinal, item)| assistant_part(session_id, uuid, ordinal, item)),
847            );
848            Message::Assistant {
849                id: uuid.to_owned(),
850                session_id: session_id.to_owned(),
851                timestamp,
852                options: assistant_options(row, message_value, line),
853            }
854        }
855        ("system", Value::String(_)) => Message::System {
856            id: uuid.to_owned(),
857            session_id: session_id.to_owned(),
858            timestamp,
859            content: extract_self_str(content),
860            options: row_options(row, line),
861        },
862        ("system", _) => Message::System {
863            id: uuid.to_owned(),
864            session_id: session_id.to_owned(),
865            timestamp,
866            // Fallback for system messages without a string content: serialize
867            // the structured body as JSON. This is not a synthesized value
868            // (the row genuinely had this content), just a lossless string
869            // encoding of structured data.
870            content: Some(extract_compact_repr(message_value)),
871            options: row_options(row, line),
872        },
873        // spec.md#adapters rule-3: a record that maps to no typed Message is
874        // carried whole as a system-role Message, not rejected, so an unknown or
875        // future role stays lossless (the full row lives in options.raw_record).
876        _ => Message::System {
877            id: uuid.to_owned(),
878            session_id: session_id.to_owned(),
879            timestamp,
880            content: Some(extract_compact_repr(message_value)),
881            options: row_options(row, line),
882        },
883    };
884
885    let mut events = Vec::with_capacity(parts.len() + 1);
886    events.push(IngestEvent::Message(message));
887    events.extend(parts.into_iter().map(IngestEvent::Part));
888    Ok(events)
889}
890
891fn text_part(
892    session_id: &str,
893    message_id: &str,
894    ordinal: usize,
895    text: Option<Extracted<String>>,
896    provenance: Provenance,
897) -> Part {
898    Part {
899        session_id: session_id.to_owned(),
900        id: part_id(message_id, ordinal),
901        message_id: message_id.to_owned(),
902        ordinal: part_ordinal(ordinal),
903        provenance,
904        options: empty_options(),
905        kind: PartKind::Text { text },
906    }
907}
908
909fn user_part(
910    session_id: &str,
911    message_id: &str,
912    ordinal: usize,
913    value: &Value,
914    state: &FileState,
915    provenance: Provenance,
916) -> Part {
917    match value.get("type").and_then(Value::as_str) {
918        Some("text") => text_part(
919            session_id,
920            message_id,
921            ordinal,
922            extract_str(value, "text"),
923            provenance,
924        ),
925        Some("image") | Some("file") => {
926            file_part(session_id, message_id, ordinal, value, provenance)
927        }
928        Some("tool_result") => {
929            tool_result_part(session_id, message_id, ordinal, value, None, state)
930        }
931        // Unknown user part shapes: preserve the raw JSON in the Text slot
932        // rather than dropping. This is not a synthesized value - it's a
933        // lossless encoding of structured data the schema doesn't model.
934        _ => text_part(
935            session_id,
936            message_id,
937            ordinal,
938            Some(extract_compact_repr(value)),
939            provenance,
940        ),
941    }
942}
943
944fn assistant_part(session_id: &str, message_id: &str, ordinal: usize, value: &Value) -> Part {
945    // spec.md#model-part-provenance: assistant content - text, reasoning, tool calls -
946    // is model-authored, hence conversational. `tool_result` parts never appear
947    // on an assistant message.
948    match value.get("type").and_then(Value::as_str) {
949        Some("text") => text_part(
950            session_id,
951            message_id,
952            ordinal,
953            extract_str(value, "text"),
954            Provenance::Conversational,
955        ),
956        Some("thinking") => Part {
957            session_id: session_id.to_owned(),
958            id: part_id(message_id, ordinal),
959            message_id: message_id.to_owned(),
960            ordinal: part_ordinal(ordinal),
961            provenance: Provenance::Conversational,
962            options: signature_options(value),
963            kind: PartKind::Reasoning {
964                text: extract_str(value, "thinking"),
965            },
966        },
967        Some("tool_use") => Part {
968            session_id: session_id.to_owned(),
969            id: part_id(message_id, ordinal),
970            message_id: message_id.to_owned(),
971            ordinal: part_ordinal(ordinal),
972            provenance: Provenance::Conversational,
973            options: empty_options(),
974            kind: PartKind::ToolCall {
975                call_id: extract_str(value, "id"),
976                name: extract_str(value, "name"),
977                params: value.get("input").cloned().unwrap_or(Value::Null),
978                provider_executed: false,
979            },
980        },
981        Some("server_tool_use") => Part {
982            session_id: session_id.to_owned(),
983            id: part_id(message_id, ordinal),
984            message_id: message_id.to_owned(),
985            ordinal: part_ordinal(ordinal),
986            provenance: Provenance::Conversational,
987            options: empty_options(),
988            kind: PartKind::ToolCall {
989                call_id: extract_str(value, "id"),
990                name: extract_str(value, "name"),
991                params: value.get("input").cloned().unwrap_or(Value::Null),
992                provider_executed: true,
993            },
994        },
995        Some("image") | Some("file") => file_part(
996            session_id,
997            message_id,
998            ordinal,
999            value,
1000            Provenance::Conversational,
1001        ),
1002        // Same rationale as `user_part`'s fallback: lossless encoding of
1003        // an unrecognised structured shape, not synthesised data.
1004        _ => text_part(
1005            session_id,
1006            message_id,
1007            ordinal,
1008            Some(extract_compact_repr(value)),
1009            Provenance::Conversational,
1010        ),
1011    }
1012}
1013
1014fn tool_result_part(
1015    session_id: &str,
1016    message_id: &str,
1017    ordinal: usize,
1018    value: &Value,
1019    source_tool_result: Option<&Value>,
1020    state: &FileState,
1021) -> Part {
1022    let call_id = extract_str(value, "tool_use_id");
1023    // `tool_result` source rows don't carry the tool name; it's resolved
1024    // via the per-file `tool_use_id -> name` map. Misses (compaction pruned
1025    // the originating `tool_use`) surface as `None` per spec.md#model-no-synthesis
1026    // (schema-honesty: the field is `Option<Extracted<T>>`, not a fabricated
1027    // string).
1028    let name = value
1029        .str_field("tool_use_id")
1030        .and_then(|id| state.tool_call_names.get(id))
1031        .cloned();
1032    let result = value
1033        .get("content")
1034        .cloned()
1035        .or_else(|| source_tool_result.cloned())
1036        .unwrap_or(Value::Null);
1037    Part {
1038        session_id: session_id.to_owned(),
1039        id: part_id(message_id, ordinal),
1040        message_id: message_id.to_owned(),
1041        ordinal: part_ordinal(ordinal),
1042        // spec.md#model-part-provenance: tool output is runtime-produced, not
1043        // conversation.
1044        provenance: Provenance::Injected,
1045        options: empty_options(),
1046        kind: PartKind::ToolResult {
1047            call_id,
1048            name,
1049            is_failure: value
1050                .get("is_error")
1051                .and_then(Value::as_bool)
1052                .unwrap_or(false),
1053            result,
1054        },
1055    }
1056}
1057
1058fn file_part(
1059    session_id: &str,
1060    message_id: &str,
1061    ordinal: usize,
1062    value: &Value,
1063    provenance: Provenance,
1064) -> Part {
1065    let media_type = value
1066        .get("media_type")
1067        .or_else(|| value.get("mime_type"))
1068        .and_then(Value::as_str)
1069        .map(ToOwned::to_owned);
1070    let file_name = value
1071        .get("file_name")
1072        .or_else(|| value.get("name"))
1073        .and_then(Value::as_str)
1074        .map(ToOwned::to_owned);
1075    let data = if let Some(source) = value.get("source") {
1076        if let Some(url) = source.get("url").and_then(Value::as_str) {
1077            FileData::Url(url.to_owned())
1078        } else if let Some(bytes) = source.get("data").and_then(Value::as_str) {
1079            FileData::String(bytes.to_owned())
1080        } else {
1081            FileData::String(compact_json(source))
1082        }
1083    } else if let Some(url) = value.get("url").and_then(Value::as_str) {
1084        FileData::Url(url.to_owned())
1085    } else {
1086        FileData::String(compact_json(value))
1087    };
1088
1089    Part {
1090        session_id: session_id.to_owned(),
1091        id: part_id(message_id, ordinal),
1092        message_id: message_id.to_owned(),
1093        ordinal: part_ordinal(ordinal),
1094        provenance,
1095        options: empty_options(),
1096        kind: PartKind::File {
1097            media_type,
1098            file_name,
1099            data,
1100        },
1101    }
1102}
1103
1104fn row_options(row: &Value, line: usize) -> ProviderOptions {
1105    let mut options = ProviderOptions::new();
1106    let source = json!({
1107        "line": line,
1108        "parent_uuid": row.get("parentUuid"),
1109        "is_sidechain": row.get("isSidechain"),
1110        "user_type": row.get("userType"),
1111        "entrypoint": row.get("entrypoint"),
1112        "cwd": row.get("cwd"),
1113        "version": row.get("version"),
1114        "git_branch": row.get("gitBranch"),
1115        "request_id": row.get("requestId"),
1116        "raw_type": row.get("type"),
1117        "raw_record": extract_raw_record(row),
1118    });
1119    options.insert("source".to_owned(), source);
1120    options
1121}
1122
1123fn assistant_options(row: &Value, message_value: &Value, line: usize) -> ProviderOptions {
1124    let mut options = row_options(row, line);
1125    let anthropic = json!({
1126        "id": message_value.get("id"),
1127        "model": message_value.get("model"),
1128        "stop_reason": message_value.get("stop_reason"),
1129        "stop_sequence": message_value.get("stop_sequence"),
1130        "usage": message_value.get("usage"),
1131    });
1132    options.insert("anthropic".to_owned(), anthropic);
1133    options
1134}
1135
1136fn signature_options(value: &Value) -> ProviderOptions {
1137    let mut options = ProviderOptions::new();
1138    if let Some(signature) = value.get("signature").and_then(Value::as_str) {
1139        options.insert("anthropic".to_owned(), json!({"signature": signature}));
1140    }
1141    options
1142}
1143
1144fn attachment_content(value: &Value) -> Option<Extracted<String>> {
1145    extract_str(value, "content").or_else(|| extract_str(value, "stdout"))
1146}
1147
1148fn parse_timestamp(value: &Value) -> anyhow::Result<DateTime<Utc>> {
1149    let timestamp = value
1150        .get("timestamp")
1151        .and_then(Value::as_str)
1152        .context("missing timestamp")?;
1153    Ok(DateTime::parse_from_rfc3339(timestamp)
1154        .context("invalid timestamp")?
1155        .with_timezone(&Utc))
1156}
1157
1158fn is_tool_result(value: &Value) -> bool {
1159    value.get("type").and_then(Value::as_str) == Some("tool_result")
1160}
1161
1162/// True when the row carries `isMeta: true` - claude-code's marker for an
1163/// expanded skill or command body injected into a user slot.
1164fn is_meta_row(row: &Value) -> bool {
1165    row.get("isMeta").and_then(Value::as_bool) == Some(true)
1166}
1167
1168/// Harness-injected wrappers claude-code places inside a user-slot turn
1169/// (spec.md#model-part-provenance): task notifications, slash-command echoes,
1170/// local-command caveats, interrupt notices.
1171fn is_injected_user_text(text: &str) -> bool {
1172    let trimmed = text.trim_start();
1173    trimmed.starts_with("<task-notification>")
1174        || trimmed.starts_with("<command-name>")
1175        || trimmed.starts_with("<command-message>")
1176        || trimmed.starts_with("<command-args>")
1177        || trimmed.starts_with("<local-command-caveat>")
1178        || trimmed.starts_with("<local-command-stdout>")
1179        || trimmed.starts_with("[Request interrupted by user")
1180}
1181
1182/// Provenance of a string-content user message: `injected` for an `isMeta`
1183/// row or a harness wrapper, `conversational` for a genuine human prompt.
1184fn user_text_provenance(row: &Value, text: &str) -> Provenance {
1185    if is_meta_row(row) || is_injected_user_text(text) {
1186        Provenance::Injected
1187    } else {
1188        Provenance::Conversational
1189    }
1190}
1191
1192/// Provenance of an array-content user message. `isMeta` flags the whole row;
1193/// otherwise a leading text item carrying a harness wrapper marks it injected.
1194/// v1 claude-code never interleaves both within one message.
1195fn user_array_provenance(row: &Value, items: &[Value]) -> Provenance {
1196    if is_meta_row(row) {
1197        return Provenance::Injected;
1198    }
1199    let wrapped = items.iter().any(|item| {
1200        item.get("type").and_then(Value::as_str) == Some("text")
1201            && item
1202                .get("text")
1203                .and_then(Value::as_str)
1204                .is_some_and(is_injected_user_text)
1205    });
1206    if wrapped {
1207        Provenance::Injected
1208    } else {
1209        Provenance::Conversational
1210    }
1211}
1212
1213#[cfg(test)]
1214mod tests {
1215    //! Conformance tests for the claude-code adapter's data-shape contract:
1216    //! subagent path derivation, replay dedup, tool-name resolution, and the
1217    //! "no synthesized values" invariant (spec.md#model-no-synthesis, spec.md#model-schema-honesty, and spec.md#model-lossless-projection).
1218    //!
1219    //! Each test builds a tiny synthetic corpus under a `TempDir` so the
1220    //! assertions exercise the real adapter end-to-end without depending on
1221    //! committed fixtures.
1222    #![allow(clippy::expect_used, clippy::unwrap_used)]
1223
1224    use super::*;
1225    use crate::{handlers::ingest_adapter, sessions::Store, wire::PartKind};
1226    use tempfile::TempDir;
1227
1228    // Manifest-dir anchored: unit tests must not depend on the process cwd
1229    // (figment::Jail chdirs the whole test process while config tests run).
1230    const FIXTURE_ROOT: &str = concat!(
1231        env!("CARGO_MANIFEST_DIR"),
1232        "/tests/fixtures/adapter/claude_code/projects"
1233    );
1234
1235    #[test]
1236    fn probe_default_finds_claude_projects_under_home() -> anyhow::Result<()> {
1237        crate::adapter::test_support::assert_probe_default(
1238            &ClaudeCodeFactory,
1239            &[".claude", "projects"],
1240        )
1241    }
1242
1243    /// `source_record_hash` must dedupe noise-field replays (whitespace,
1244    /// `timestamp`, `requestId`) and let semantic-content differences through
1245    /// so a same-uuid row with a different `message.content` still reaches
1246    /// the validator (spec.md#adapter-integrity-dedup).
1247    #[test]
1248    fn source_record_hash_ignores_noise_keeps_semantic_diffs() {
1249        let base = serde_json::json!({
1250            "uuid": "u1",
1251            "type": "user",
1252            "parentUuid": null,
1253            "message": {"role": "user", "content": "hi"},
1254            "timestamp": "2026-06-17T00:00:00Z",
1255            "requestId": "req-A",
1256            "isMeta": false,
1257            "gitBranch": "main",
1258            "version": "2.1.56",
1259        });
1260        let noise_diff = serde_json::json!({
1261            "uuid": "u1",
1262            "type": "user",
1263            "parentUuid": null,
1264            "message": {"role": "user", "content": "hi"},
1265            "timestamp": "2026-06-17T00:00:05Z",
1266            "requestId": "req-B",
1267            "isMeta": true,
1268            "gitBranch": "feat/x",
1269            "version": "2.1.57",
1270        });
1271        let content_diff = serde_json::json!({
1272            "uuid": "u1",
1273            "type": "user",
1274            "parentUuid": null,
1275            "message": {"role": "user", "content": "different"},
1276            "timestamp": "2026-06-17T00:00:00Z",
1277        });
1278        assert_eq!(
1279            source_record_hash(&base),
1280            source_record_hash(&noise_diff),
1281            "noise-field differences must dedupe",
1282        );
1283        assert_ne!(
1284            source_record_hash(&base),
1285            source_record_hash(&content_diff),
1286            "semantic content differences must not dedupe",
1287        );
1288    }
1289
1290    #[tokio::test(flavor = "multi_thread")]
1291    async fn native_restore_is_value_equal_to_fixture_corpus() -> anyhow::Result<()> {
1292        let adapter = ClaudeCodeAdapter::new(FIXTURE_ROOT);
1293        crate::adapter::test_support::assert_native_restore(
1294            &ClaudeCodeFactory,
1295            &adapter,
1296            std::path::Path::new(FIXTURE_ROOT),
1297        )
1298        .await
1299    }
1300
1301    /// `<root>/<encoded-cwd>/<parent_uuid>.jsonl` plus
1302    /// `<root>/<encoded-cwd>/<parent_uuid>/subagents/agent-<hash>.jsonl` plus
1303    /// `agent-<hash>.meta.json`. The subagent file must:
1304    ///   - emit a Session whose `id = "{parent_uuid}/agent-{hash}"`
1305    ///   - have `parent_session_id = Some(parent_uuid)`
1306    ///   - have `source_agent = "claude-code/{agentType}"` from the meta file
1307    ///   - have `options.subagent` carrying the hash + agent_type + description
1308    #[tokio::test(flavor = "multi_thread")]
1309    async fn subagent_file_derives_child_session_with_parent_link() -> anyhow::Result<()> {
1310        let corpus = TempDir::new()?;
1311        let project_dir = corpus.path().join("-tmp-pond-test");
1312        let parent_uuid = "11111111-1111-1111-1111-111111111111";
1313        let agent_hash = "abc123def456";
1314        std::fs::create_dir_all(project_dir.join(parent_uuid).join("subagents"))?;
1315
1316        // Parent session file (one user row to anchor a Session).
1317        let parent_row = serde_json::json!({
1318            "type": "user",
1319            "uuid": "u-parent-1",
1320            "sessionId": parent_uuid,
1321            "cwd": "/tmp/pond-test",
1322            "timestamp": "2026-05-16T00:00:00.000Z",
1323            "version": "2.1.121",
1324            "message": {"role": "user", "content": "hi parent"},
1325        });
1326        std::fs::write(
1327            project_dir.join(format!("{parent_uuid}.jsonl")),
1328            format!("{parent_row}\n"),
1329        )?;
1330
1331        // Subagent file + sibling meta. Carries the SAME sessionId as the parent
1332        // in row content; the adapter must derive a child id from the path.
1333        let subagent_row = serde_json::json!({
1334            "type": "user",
1335            "uuid": "u-sub-1",
1336            "sessionId": parent_uuid,
1337            "cwd": "/tmp/pond-test",
1338            "isSidechain": true,
1339            "agentId": agent_hash,
1340            "timestamp": "2026-05-16T00:01:00.000Z",
1341            "version": "2.1.121",
1342            "message": {"role": "user", "content": "subagent prompt"},
1343        });
1344        std::fs::write(
1345            project_dir
1346                .join(parent_uuid)
1347                .join("subagents")
1348                .join(format!("agent-{agent_hash}.jsonl")),
1349            format!("{subagent_row}\n"),
1350        )?;
1351        std::fs::write(
1352            project_dir
1353                .join(parent_uuid)
1354                .join("subagents")
1355                .join(format!("agent-{agent_hash}.meta.json")),
1356            r#"{"agentType":"general-purpose","description":"do a thing"}"#,
1357        )?;
1358
1359        let store_dir = TempDir::new()?;
1360        let store = Store::open_local(store_dir.path()).await?;
1361        let adapter = ClaudeCodeAdapter::new(corpus.path());
1362
1363        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1364        assert_eq!(
1365            summary.dropped_sessions, 0,
1366            "subagent file must NOT collide with parent (pre-fix this was the project-immutable rejection)"
1367        );
1368
1369        let parent = store
1370            .get_session(parent_uuid)
1371            .await?
1372            .expect("parent session should ingest as the bare uuid");
1373        assert_eq!(parent.session.source_agent, "claude-code");
1374        assert_eq!(parent.session.parent_session_id, None);
1375
1376        let child_id = format!("{parent_uuid}/agent-{agent_hash}");
1377        let child = store
1378            .get_session(&child_id)
1379            .await?
1380            .expect("subagent session must surface under the derived id");
1381        assert_eq!(
1382            child.session.source_agent, "claude-code/general-purpose",
1383            "agent_type from .meta.json should suffix the source_agent label"
1384        );
1385        assert_eq!(
1386            child.session.parent_session_id.as_deref(),
1387            Some(parent_uuid),
1388            "subagent must link back to parent via parent_session_id",
1389        );
1390        let subagent_meta = child
1391            .session
1392            .options
1393            .get("subagent")
1394            .expect("options.subagent must carry the hash + verbatim meta.json");
1395        assert_eq!(subagent_meta["hash"], serde_json::json!(agent_hash));
1396        assert_eq!(
1397            subagent_meta["meta"]["agentType"],
1398            serde_json::json!("general-purpose")
1399        );
1400        assert_eq!(
1401            subagent_meta["meta"]["description"],
1402            serde_json::json!("do a thing")
1403        );
1404        Ok(())
1405    }
1406
1407    /// Subagent file present but the sibling `.meta.json` is missing. The
1408    /// adapter must still derive a child session (so it doesn't collide with
1409    /// the parent) and fall back to `source_agent = "claude-code/subagent"`.
1410    #[tokio::test(flavor = "multi_thread")]
1411    async fn subagent_without_meta_falls_back_to_generic_label() -> anyhow::Result<()> {
1412        let corpus = TempDir::new()?;
1413        let project_dir = corpus.path().join("-tmp-pond-test");
1414        let parent_uuid = "22222222-2222-2222-2222-222222222222";
1415        let agent_hash = "deadbeef";
1416        std::fs::create_dir_all(project_dir.join(parent_uuid).join("subagents"))?;
1417        let row = serde_json::json!({
1418            "type": "user",
1419            "uuid": "u-sub-only",
1420            "sessionId": parent_uuid,
1421            "cwd": "/tmp/pond-test",
1422            "timestamp": "2026-05-16T00:00:00.000Z",
1423            "message": {"role": "user", "content": "no meta sibling here"},
1424        });
1425        std::fs::write(
1426            project_dir
1427                .join(parent_uuid)
1428                .join("subagents")
1429                .join(format!("agent-{agent_hash}.jsonl")),
1430            format!("{row}\n"),
1431        )?;
1432
1433        let store_dir = TempDir::new()?;
1434        let store = Store::open_local(store_dir.path()).await?;
1435        let adapter = ClaudeCodeAdapter::new(corpus.path());
1436        let _summary =
1437            ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1438
1439        let child = store
1440            .get_session(&format!("{parent_uuid}/agent-{agent_hash}"))
1441            .await?
1442            .expect("derived child id even without meta");
1443        assert_eq!(child.session.source_agent, "claude-code/subagent");
1444        Ok(())
1445    }
1446
1447    /// Nested workflow-runner subagent:
1448    ///   `<parent_uuid>/subagents/workflows/<wf-id>/agent-<hash>.jsonl`.
1449    /// Same parent `sessionId` in row content AND a shifted `cwd`. The adapter
1450    /// must derive a distinct child id from the FULL path under `subagents/`
1451    /// (not collapse onto the parent), so it neither collides on the immutable
1452    /// `project` nor silently merges into the parent. Regression for the
1453    /// workflow-layout sync rejection. See spec.md#datasets.
1454    #[tokio::test(flavor = "multi_thread")]
1455    async fn workflow_nested_subagent_derives_distinct_child_not_parent_collision()
1456    -> anyhow::Result<()> {
1457        let corpus = TempDir::new()?;
1458        let project_dir = corpus.path().join("-tmp-pond-test");
1459        let parent_uuid = "44444444-4444-4444-4444-444444444444";
1460        let wf_id = "wf_abcd1234-ef0";
1461        let agent_hash = "cafef00dbaadf00d1";
1462        let wf_dir = project_dir
1463            .join(parent_uuid)
1464            .join("subagents")
1465            .join("workflows")
1466            .join(wf_id);
1467        std::fs::create_dir_all(&wf_dir)?;
1468
1469        let parent_row = serde_json::json!({
1470            "type": "user",
1471            "uuid": "u-parent-1",
1472            "sessionId": parent_uuid,
1473            "cwd": "/tmp/pond-test",
1474            "timestamp": "2026-05-20T00:00:00.000Z",
1475            "message": {"role": "user", "content": "hi parent"},
1476        });
1477        std::fs::write(
1478            project_dir.join(format!("{parent_uuid}.jsonl")),
1479            format!("{parent_row}\n"),
1480        )?;
1481
1482        // Shifted cwd: pre-fix this collided with the parent's immutable project.
1483        let subagent_row = serde_json::json!({
1484            "type": "user",
1485            "uuid": "u-wf-sub-1",
1486            "sessionId": parent_uuid,
1487            "cwd": "/tmp/pond-test/packages/sub",
1488            "isSidechain": true,
1489            "agentId": agent_hash,
1490            "timestamp": "2026-05-20T00:01:00.000Z",
1491            "message": {"role": "user", "content": "workflow subagent prompt"},
1492        });
1493        std::fs::write(
1494            wf_dir.join(format!("agent-{agent_hash}.jsonl")),
1495            format!("{subagent_row}\n"),
1496        )?;
1497        std::fs::write(
1498            wf_dir.join(format!("agent-{agent_hash}.meta.json")),
1499            r#"{"agentType":"general-purpose","description":"workflow child"}"#,
1500        )?;
1501
1502        let store_dir = TempDir::new()?;
1503        let store = Store::open_local(store_dir.path()).await?;
1504        let adapter = ClaudeCodeAdapter::new(corpus.path());
1505        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1506        assert_eq!(
1507            summary.dropped_sessions, 0,
1508            "nested workflow subagent must NOT collide with the parent project",
1509        );
1510
1511        let parent = store
1512            .get_session(parent_uuid)
1513            .await?
1514            .expect("parent session ingests under the bare uuid");
1515        assert_eq!(&*parent.session.project, "/tmp/pond-test");
1516        assert_eq!(parent.session.parent_session_id, None);
1517
1518        let child_id = format!("{parent_uuid}/workflows/{wf_id}/agent-{agent_hash}");
1519        let child = store
1520            .get_session(&child_id)
1521            .await?
1522            .expect("workflow subagent surfaces under the full nested child id");
1523        assert_eq!(child.session.source_agent, "claude-code/general-purpose");
1524        assert_eq!(
1525            child.session.parent_session_id.as_deref(),
1526            Some(parent_uuid)
1527        );
1528        assert_eq!(
1529            &*child.session.project, "/tmp/pond-test/packages/sub",
1530            "child keeps its own cwd-derived project, distinct from the parent",
1531        );
1532        let subagent_meta = child
1533            .session
1534            .options
1535            .get("subagent")
1536            .expect("options.subagent present");
1537        assert_eq!(subagent_meta["hash"], serde_json::json!(agent_hash));
1538        Ok(())
1539    }
1540
1541    /// A `.jsonl` under `subagents/` whose leaf is NOT `agent-<hash>.jsonl` (a
1542    /// layout this pond version doesn't understand) must FAIL VISIBLY rather
1543    /// than fall back to its content `sessionId` (the parent's) and silently
1544    /// merge into the parent session. It is counted as an unsupported skip and
1545    /// contributes no rows. See spec.md#datasets.
1546    #[tokio::test(flavor = "multi_thread")]
1547    async fn unrecognized_subagents_file_fails_visibly_not_merged() -> anyhow::Result<()> {
1548        let corpus = TempDir::new()?;
1549        let project_dir = corpus.path().join("-tmp-pond-test");
1550        let parent_uuid = "55555555-5555-5555-5555-555555555555";
1551        let unknown_dir = project_dir
1552            .join(parent_uuid)
1553            .join("subagents")
1554            .join("workflows")
1555            .join("wf_future01-aaa");
1556        std::fs::create_dir_all(&unknown_dir)?;
1557
1558        let parent_row = serde_json::json!({
1559            "type": "user",
1560            "uuid": "u-parent-only",
1561            "sessionId": parent_uuid,
1562            "cwd": "/tmp/pond-test",
1563            "timestamp": "2026-05-20T00:00:00.000Z",
1564            "message": {"role": "user", "content": "parent message"},
1565        });
1566        std::fs::write(
1567            project_dir.join(format!("{parent_uuid}.jsonl")),
1568            format!("{parent_row}\n"),
1569        )?;
1570
1571        // Same parent sessionId AND same cwd: pre-guard this would have merged
1572        // silently into the parent. The leaf name is not `agent-<hash>.jsonl`.
1573        let unknown_row = serde_json::json!({
1574            "type": "user",
1575            "uuid": "u-should-not-merge",
1576            "sessionId": parent_uuid,
1577            "cwd": "/tmp/pond-test",
1578            "timestamp": "2026-05-20T00:02:00.000Z",
1579            "message": {"role": "user", "content": "must not land under parent"},
1580        });
1581        std::fs::write(
1582            unknown_dir.join("transcript-001.jsonl"),
1583            format!("{unknown_row}\n"),
1584        )?;
1585
1586        let store_dir = TempDir::new()?;
1587        let store = Store::open_local(store_dir.path()).await?;
1588        let adapter = ClaudeCodeAdapter::new(corpus.path());
1589        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1590
1591        assert_eq!(
1592            summary.skipped_files, 1,
1593            "the unrecognized subagents/ transcript must be a visible, counted skip",
1594        );
1595        let parent = store
1596            .get_session(parent_uuid)
1597            .await?
1598            .expect("parent session ingests");
1599        assert_eq!(
1600            parent.messages.len(),
1601            1,
1602            "the unrecognized file's row must NOT be merged into the parent session",
1603        );
1604        assert!(
1605            parent
1606                .messages
1607                .iter()
1608                .all(|m| m.message.id() != "u-should-not-merge"),
1609            "parent must not absorb the unrecognized file's message",
1610        );
1611        Ok(())
1612    }
1613
1614    /// Re-sync visibility: an unrecognized `subagents/` file must STILL surface as
1615    /// a visible `Unsupported` skip when the parent already carries a freshness
1616    /// watermark. Its content `sessionId` is the parent's, so peeking it would let
1617    /// the freshness gate skip the file as `Fresh` under the parent's watermark and
1618    /// hide the failure. `peek_session_id` returns `None` for it instead, keeping
1619    /// it out of the gate. Regression for the re-sync visibility leak. See
1620    /// spec.md#datasets.
1621    #[tokio::test(flavor = "multi_thread")]
1622    async fn unrecognized_subagents_file_stays_visible_under_parent_watermark() -> anyhow::Result<()>
1623    {
1624        struct ParentAlreadyFresh;
1625        impl crate::adapter::SkipOracle for ParentAlreadyFresh {
1626            fn session_max_ts(&self, _session_id: &str) -> Option<i64> {
1627                // Far-future watermark: the parent file WOULD trip the freshness
1628                // gate (source ts <= watermark). The guard must keep the
1629                // unrecognized file out of the gate regardless.
1630                Some(i64::MAX)
1631            }
1632            fn is_empty(&self) -> bool {
1633                false
1634            }
1635        }
1636
1637        let corpus = TempDir::new()?;
1638        let project_dir = corpus.path().join("-tmp-pond-test");
1639        let parent_uuid = "66666666-6666-6666-6666-666666666666";
1640        let unknown_dir = project_dir
1641            .join(parent_uuid)
1642            .join("subagents")
1643            .join("workflows")
1644            .join("wf_future02-bbb");
1645        std::fs::create_dir_all(&unknown_dir)?;
1646
1647        let parent_row = serde_json::json!({
1648            "type": "user",
1649            "uuid": "u-parent-fresh",
1650            "sessionId": parent_uuid,
1651            "cwd": "/tmp/pond-test",
1652            "timestamp": "2026-05-20T00:00:00.000Z",
1653            "message": {"role": "user", "content": "parent message"},
1654        });
1655        std::fs::write(
1656            project_dir.join(format!("{parent_uuid}.jsonl")),
1657            format!("{parent_row}\n"),
1658        )?;
1659
1660        // Same parent sessionId, leaf not `agent-<hash>.jsonl`: pre-fix this would
1661        // peek the parent's id and be fresh-skipped under the far-future watermark.
1662        let unknown_row = serde_json::json!({
1663            "type": "user",
1664            "uuid": "u-resync-should-stay-visible",
1665            "sessionId": parent_uuid,
1666            "cwd": "/tmp/pond-test",
1667            "timestamp": "2026-05-20T00:02:00.000Z",
1668            "message": {"role": "user", "content": "must stay visible"},
1669        });
1670        std::fs::write(
1671            unknown_dir.join("transcript-002.jsonl"),
1672            format!("{unknown_row}\n"),
1673        )?;
1674
1675        let store_dir = TempDir::new()?;
1676        let store = Store::open_local(store_dir.path()).await?;
1677        let adapter = ClaudeCodeAdapter::new(corpus.path());
1678        let summary = ingest_adapter(&store, &adapter, &ParentAlreadyFresh, |_| {}).await?;
1679
1680        assert_eq!(
1681            summary.skipped_files, 1,
1682            "the unrecognized transcript must stay a visible Unsupported skip, not be fresh-skipped under the parent's watermark",
1683        );
1684        // The parent file legitimately fresh-skips under the far-future watermark;
1685        // the unrecognized file must NOT join it (pre-fix `skipped_fresh` would be 2).
1686        assert_eq!(
1687            summary.skipped_fresh, 1,
1688            "only the parent may fresh-skip; the unrecognized file must not borrow its watermark",
1689        );
1690        Ok(())
1691    }
1692
1693    /// Three rows with the same `uuid` (the claude-code `/resume` replay
1694    /// pattern). The adapter must dedupe at the file-state level so the
1695    /// validator never sees the duplicates; `dropped_events` stays 0 and
1696    /// `inserted` covers the single canonical row.
1697    #[tokio::test(flavor = "multi_thread")]
1698    async fn replay_duplicates_are_dedup_at_adapter_layer() -> anyhow::Result<()> {
1699        let corpus = TempDir::new()?;
1700        let project_dir = corpus.path().join("-tmp-pond-test");
1701        std::fs::create_dir_all(&project_dir)?;
1702        let session_uuid = "33333333-3333-3333-3333-333333333333";
1703        let dup_uuid = "u-shared-1";
1704        let row = serde_json::json!({
1705            "type": "user",
1706            "uuid": dup_uuid,
1707            "sessionId": session_uuid,
1708            "cwd": "/tmp/pond-test",
1709            "timestamp": "2026-05-16T00:00:00.000Z",
1710            "message": {"role": "user", "content": "replayed three times"},
1711        });
1712        // Three identical rows back-to-back, same uuid.
1713        let body = format!("{row}\n{row}\n{row}\n");
1714        std::fs::write(project_dir.join(format!("{session_uuid}.jsonl")), body)?;
1715
1716        let store_dir = TempDir::new()?;
1717        let store = Store::open_local(store_dir.path()).await?;
1718        let adapter = ClaudeCodeAdapter::new(corpus.path());
1719        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1720
1721        assert_eq!(
1722            summary.dropped_events, 0,
1723            "adapter must dedupe replays before they reach the validator"
1724        );
1725        assert!(
1726            !summary
1727                .drop_reasons
1728                .contains_key(crate::sessions::DROP_REASON_DUPLICATE_MESSAGE_ID),
1729            "duplicate_message_id bucket stays empty when adapter does its job"
1730        );
1731        Ok(())
1732    }
1733
1734    #[tokio::test(flavor = "multi_thread")]
1735    async fn same_uuid_different_content_is_visible_duplicate_not_adapter_drop()
1736    -> anyhow::Result<()> {
1737        let corpus = TempDir::new()?;
1738        let project_dir = corpus.path().join("-tmp-pond-test");
1739        std::fs::create_dir_all(&project_dir)?;
1740        let session_uuid = "33333333-3333-3333-3333-333333333334";
1741        let dup_uuid = "u-shared-different";
1742        let first = serde_json::json!({
1743            "type": "user",
1744            "uuid": dup_uuid,
1745            "sessionId": session_uuid,
1746            "cwd": "/tmp/pond-test",
1747            "timestamp": "2026-05-16T00:00:00.000Z",
1748            "message": {"role": "user", "content": "first content"},
1749        });
1750        let second = serde_json::json!({
1751            "type": "user",
1752            "uuid": dup_uuid,
1753            "sessionId": session_uuid,
1754            "cwd": "/tmp/pond-test",
1755            "timestamp": "2026-05-16T00:00:01.000Z",
1756            "message": {"role": "user", "content": "changed content"},
1757        });
1758        std::fs::write(
1759            project_dir.join(format!("{session_uuid}.jsonl")),
1760            format!("{first}\n{second}\n"),
1761        )?;
1762
1763        let store_dir = TempDir::new()?;
1764        let store = Store::open_local(store_dir.path()).await?;
1765        let adapter = ClaudeCodeAdapter::new(corpus.path());
1766        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1767
1768        assert_eq!(
1769            summary
1770                .drop_reasons
1771                .get(crate::sessions::DROP_REASON_DUPLICATE_MESSAGE_ID)
1772                .copied(),
1773            Some(1),
1774            "same uuid with changed content must reach the visible duplicate-id path",
1775        );
1776        Ok(())
1777    }
1778
1779    #[tokio::test(flavor = "multi_thread")]
1780    async fn session_row_without_messages_does_not_fresh_skip_source() -> anyhow::Result<()> {
1781        let corpus = TempDir::new()?;
1782        let project_dir = corpus.path().join("-tmp-pond-test");
1783        std::fs::create_dir_all(&project_dir)?;
1784        let session_uuid = "33333333-3333-3333-3333-333333333335";
1785        let row = serde_json::json!({
1786            "type": "user",
1787            "uuid": "u-after-partial",
1788            "sessionId": session_uuid,
1789            "cwd": "/tmp/pond-test",
1790            "timestamp": "2026-05-16T00:00:00.000Z",
1791            "message": {"role": "user", "content": "healed by replay"},
1792        });
1793        std::fs::write(
1794            project_dir.join(format!("{session_uuid}.jsonl")),
1795            format!("{row}\n"),
1796        )?;
1797
1798        let store_dir = TempDir::new()?;
1799        let store = Store::open_local(store_dir.path()).await?;
1800        store
1801            .upsert_sessions(&[Session {
1802                id: session_uuid.to_owned(),
1803                parent_session_id: None,
1804                parent_message_id: None,
1805                source_agent: "claude-code".to_owned(),
1806                created_at: DateTime::parse_from_rfc3339("2026-05-16T00:00:00.000Z")?
1807                    .with_timezone(&Utc),
1808                project: Extracted::from_test_value("/tmp/pond-test".to_owned()),
1809                options: ProviderOptions::new(),
1810            }])
1811            .await?;
1812
1813        let last_ids = store.session_last_message_ids().await?;
1814        assert!(
1815            !last_ids.contains_key(session_uuid),
1816            "a session row without messages must not produce a freshness key",
1817        );
1818        let adapter = ClaudeCodeAdapter::new(corpus.path());
1819        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1820        assert_eq!(summary.skipped_fresh, 0);
1821        let session = store
1822            .get_session(session_uuid)
1823            .await?
1824            .expect("session row exists");
1825        assert_eq!(session.messages.len(), 1, "replay must heal messages");
1826        Ok(())
1827    }
1828
1829    /// Claude Code appends trailing metadata rows (`last-prompt`,
1830    /// `permission-mode`, ...) with no timestamp after the conversation. The
1831    /// freshness peek must walk back past them to the last real message's
1832    /// timestamp - taking only the literal last line returned None and stranded
1833    /// ~2k sessions perpetually un-fresh, re-decoding ~1.2M stored rows every sync.
1834    #[test]
1835    fn peek_last_ts_walks_back_past_trailing_metadata_rows() {
1836        let corpus = TempDir::new().unwrap();
1837        let project_dir = corpus.path().join("-tmp-pond-test");
1838        std::fs::create_dir_all(&project_dir).unwrap();
1839        let session_uuid = "44444444-4444-4444-4444-444444444444";
1840        let message = serde_json::json!({
1841            "type": "user",
1842            "uuid": "u-1",
1843            "sessionId": session_uuid,
1844            "cwd": "/tmp/pond-test",
1845            "timestamp": "2026-05-16T00:00:00.000Z",
1846            "message": {"role": "user", "content": "hello"},
1847        });
1848        // Metadata rows Claude Code writes after the conversation - no timestamp.
1849        let last_prompt =
1850            serde_json::json!({"type": "last-prompt", "sessionId": session_uuid, "prompt": "hi"});
1851        let permission = serde_json::json!({"type": "permission-mode", "sessionId": session_uuid});
1852        let path = project_dir.join(format!("{session_uuid}.jsonl"));
1853        std::fs::write(&path, format!("{message}\n{last_prompt}\n{permission}\n")).unwrap();
1854
1855        let adapter = ClaudeCodeAdapter::new(corpus.path());
1856        let expected = DateTime::parse_from_rfc3339("2026-05-16T00:00:00.000Z")
1857            .unwrap()
1858            .timestamp_micros();
1859        assert_eq!(
1860            adapter.peek_last_ts(&path),
1861            Some(expected),
1862            "walk back past trailing metadata to the last message's timestamp",
1863        );
1864    }
1865
1866    /// One assistant `tool_use` followed by a user `tool_result` in the same
1867    /// file. The adapter's per-file `tool_use_id -> name` map must resolve the
1868    /// result's tool name to the call's name. Pre-fix: synthesized `"unknown"`.
1869    #[tokio::test(flavor = "multi_thread")]
1870    async fn tool_result_name_resolves_from_prior_tool_use_in_same_file() -> anyhow::Result<()> {
1871        let corpus = TempDir::new()?;
1872        let project_dir = corpus.path().join("-tmp-pond-test");
1873        std::fs::create_dir_all(&project_dir)?;
1874        let session_uuid = "44444444-4444-4444-4444-444444444444";
1875        let call_id = "toolu_test_01";
1876
1877        let tool_use_row = serde_json::json!({
1878            "type": "assistant",
1879            "uuid": "u-call",
1880            "sessionId": session_uuid,
1881            "cwd": "/tmp/pond-test",
1882            "timestamp": "2026-05-16T00:00:00.000Z",
1883            "message": {
1884                "role": "assistant",
1885                "content": [{
1886                    "type": "tool_use",
1887                    "id": call_id,
1888                    "name": "Edit",
1889                    "input": {"file_path": "/tmp/foo"},
1890                }],
1891            },
1892        });
1893        let tool_result_row = serde_json::json!({
1894            "type": "user",
1895            "uuid": "u-result",
1896            "sessionId": session_uuid,
1897            "cwd": "/tmp/pond-test",
1898            "timestamp": "2026-05-16T00:00:01.000Z",
1899            "message": {
1900                "role": "user",
1901                "content": [{
1902                    "type": "tool_result",
1903                    "tool_use_id": call_id,
1904                    "content": "ok",
1905                }],
1906            },
1907        });
1908        std::fs::write(
1909            project_dir.join(format!("{session_uuid}.jsonl")),
1910            format!("{tool_use_row}\n{tool_result_row}\n"),
1911        )?;
1912
1913        let store_dir = TempDir::new()?;
1914        let store = Store::open_local(store_dir.path()).await?;
1915        let adapter = ClaudeCodeAdapter::new(corpus.path());
1916        let _summary =
1917            ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
1918        let session = store
1919            .get_session(session_uuid)
1920            .await?
1921            .expect("session ingests");
1922
1923        let mut saw_call = false;
1924        let mut saw_result = false;
1925        for stored in &session.messages {
1926            for part in &stored.parts {
1927                match &part.kind {
1928                    PartKind::ToolCall {
1929                        call_id: cid, name, ..
1930                    } => {
1931                        assert_eq!(cid.as_ref().map(|e| e.as_str()), Some(call_id));
1932                        assert_eq!(
1933                            name.as_ref().map(|e| e.as_str()),
1934                            Some("Edit"),
1935                            "tool_use carries the name directly"
1936                        );
1937                        saw_call = true;
1938                    }
1939                    PartKind::ToolResult {
1940                        call_id: cid, name, ..
1941                    } => {
1942                        assert_eq!(cid.as_ref().map(|e| e.as_str()), Some(call_id));
1943                        assert_eq!(
1944                            name.as_ref().map(|e| e.as_str()),
1945                            Some("Edit"),
1946                            "tool_result resolves the name via the per-file map (was 'unknown' pre-2026-05-16)"
1947                        );
1948                        saw_result = true;
1949                    }
1950                    _ => {}
1951                }
1952            }
1953        }
1954        assert!(saw_call && saw_result, "both parts must be present");
1955        Ok(())
1956    }
1957
1958    /// spec.md#model-part-provenance: a genuine human prompt classifies
1959    /// `conversational`; a harness `<task-notification>` user-slot turn and an
1960    /// `isMeta` row classify `injected`.
1961    #[test]
1962    fn user_text_provenance_separates_prompts_from_harness_injection() {
1963        let prompt = json!({"type": "user", "uuid": "u1"});
1964        assert_eq!(
1965            user_text_provenance(&prompt, "please refactor the parser"),
1966            Provenance::Conversational,
1967        );
1968
1969        let notification = json!({"type": "user", "uuid": "u2"});
1970        assert_eq!(
1971            user_text_provenance(
1972                &notification,
1973                "<task-notification>background task done</task-notification>",
1974            ),
1975            Provenance::Injected,
1976        );
1977
1978        let meta = json!({"type": "user", "uuid": "u3", "isMeta": true});
1979        assert_eq!(
1980            user_text_provenance(&meta, "expanded skill body"),
1981            Provenance::Injected,
1982        );
1983    }
1984
1985    /// Ingest a session carrying a `<task-notification>` user message and a
1986    /// genuine prompt; the notification's part must be `injected` and the
1987    /// prompt's `conversational` (spec.md#model-part-provenance).
1988    #[tokio::test(flavor = "multi_thread")]
1989    async fn task_notification_message_yields_injected_parts() -> anyhow::Result<()> {
1990        let corpus = TempDir::new()?;
1991        let project_dir = corpus.path().join("-tmp-pond-test");
1992        std::fs::create_dir_all(&project_dir)?;
1993        let session_uuid = "66666666-6666-6666-6666-666666666666";
1994        let prompt = serde_json::json!({
1995            "type": "user",
1996            "uuid": "u-prompt",
1997            "sessionId": session_uuid,
1998            "cwd": "/tmp/pond-test",
1999            "timestamp": "2026-05-16T00:00:00.000Z",
2000            "message": {"role": "user", "content": "genuine human prompt"},
2001        });
2002        let notification = serde_json::json!({
2003            "type": "user",
2004            "uuid": "u-notify",
2005            "sessionId": session_uuid,
2006            "cwd": "/tmp/pond-test",
2007            "timestamp": "2026-05-16T00:00:01.000Z",
2008            "message": {
2009                "role": "user",
2010                "content": "<task-notification>a background task finished</task-notification>",
2011            },
2012        });
2013        std::fs::write(
2014            project_dir.join(format!("{session_uuid}.jsonl")),
2015            format!("{prompt}\n{notification}\n"),
2016        )?;
2017
2018        let store_dir = TempDir::new()?;
2019        let store = Store::open_local(store_dir.path()).await?;
2020        let adapter = ClaudeCodeAdapter::new(corpus.path());
2021        ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
2022
2023        let session = store
2024            .get_session(session_uuid)
2025            .await?
2026            .expect("session ingests");
2027        let mut saw_prompt = false;
2028        let mut saw_notification = false;
2029        for stored in &session.messages {
2030            for part in &stored.parts {
2031                if stored.message.id() == "u-prompt" {
2032                    assert_eq!(part.provenance, crate::wire::Provenance::Conversational);
2033                    saw_prompt = true;
2034                }
2035                if stored.message.id() == "u-notify" {
2036                    assert_eq!(part.provenance, crate::wire::Provenance::Injected);
2037                    saw_notification = true;
2038                }
2039            }
2040        }
2041        assert!(saw_prompt && saw_notification, "both messages present");
2042        Ok(())
2043    }
2044
2045    /// Orphan tool_result with no earlier tool_use in the same file: the
2046    /// per-file map can't resolve. The adapter must emit `name: None`, NOT
2047    /// the old `"unknown"` sentinel. Invariant 15 (no synthesized values).
2048    #[tokio::test(flavor = "multi_thread")]
2049    async fn orphan_tool_result_yields_name_none_not_unknown_sentinel() -> anyhow::Result<()> {
2050        let corpus = TempDir::new()?;
2051        let project_dir = corpus.path().join("-tmp-pond-test");
2052        std::fs::create_dir_all(&project_dir)?;
2053        let session_uuid = "55555555-5555-5555-5555-555555555555";
2054
2055        // tool_result with no earlier tool_use (simulates a compaction-pruned call).
2056        let row = serde_json::json!({
2057            "type": "user",
2058            "uuid": "u-orphan",
2059            "sessionId": session_uuid,
2060            "cwd": "/tmp/pond-test",
2061            "timestamp": "2026-05-16T00:00:00.000Z",
2062            "message": {
2063                "role": "user",
2064                "content": [{
2065                    "type": "tool_result",
2066                    "tool_use_id": "toolu_orphan",
2067                    "content": "result body, no matching call",
2068                }],
2069            },
2070        });
2071        std::fs::write(
2072            project_dir.join(format!("{session_uuid}.jsonl")),
2073            format!("{row}\n"),
2074        )?;
2075
2076        let store_dir = TempDir::new()?;
2077        let store = Store::open_local(store_dir.path()).await?;
2078        let adapter = ClaudeCodeAdapter::new(corpus.path());
2079        let _summary =
2080            ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
2081        let session = store
2082            .get_session(session_uuid)
2083            .await?
2084            .expect("session ingests");
2085        let mut found = false;
2086        for stored in &session.messages {
2087            for part in &stored.parts {
2088                if let PartKind::ToolResult { name, call_id, .. } = &part.kind {
2089                    assert_eq!(call_id.as_ref().map(|e| e.as_str()), Some("toolu_orphan"));
2090                    assert!(
2091                        name.is_none(),
2092                        "orphan tool_result must be name=None, not synthesized 'unknown'",
2093                    );
2094                    found = true;
2095                }
2096            }
2097        }
2098        assert!(found, "orphan tool_result part must be present");
2099        // Sanity: even an orphan should not be reported as a drop.
2100        Ok(())
2101    }
2102
2103    #[tokio::test(flavor = "multi_thread")]
2104    async fn unknown_message_role_becomes_lossless_carrier() -> anyhow::Result<()> {
2105        let corpus = TempDir::new()?;
2106        let project_dir = corpus.path().join("-tmp-pond-test");
2107        std::fs::create_dir_all(&project_dir)?;
2108        let session_uuid = "66666666-6666-6666-6666-666666666666";
2109
2110        // A role pond has no typed variant for must be carried whole, not
2111        // rejected (spec.md#adapters rule-3).
2112        let row = serde_json::json!({
2113            "type": "user",
2114            "uuid": "u-future",
2115            "sessionId": session_uuid,
2116            "cwd": "/tmp/pond-test",
2117            "timestamp": "2026-05-16T00:00:00.000Z",
2118            "message": {
2119                "role": "future_role",
2120                "content": "keep me",
2121            },
2122        });
2123        std::fs::write(
2124            project_dir.join(format!("{session_uuid}.jsonl")),
2125            format!("{row}\n"),
2126        )?;
2127
2128        let store_dir = TempDir::new()?;
2129        let store = Store::open_local(store_dir.path()).await?;
2130        let adapter = ClaudeCodeAdapter::new(corpus.path());
2131        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
2132        assert!(
2133            summary.drop_reasons.is_empty(),
2134            "an unknown role must be carried, not dropped: {:?}",
2135            summary.drop_reasons,
2136        );
2137        let session = store
2138            .get_session(session_uuid)
2139            .await?
2140            .expect("session with the carried record ingests");
2141        let carrier = session
2142            .messages
2143            .iter()
2144            .find(|stored| stored.message.id() == "u-future")
2145            .expect("the unknown-role record lands as a message");
2146        assert!(
2147            matches!(&carrier.message, Message::System { content, .. }
2148                if content.as_deref().is_some_and(|c| c.contains("future_role"))),
2149            "unmapped role must become a System carrier preserving the record",
2150        );
2151        Ok(())
2152    }
2153
2154    /// The Workflow runner's `journal.jsonl` under `subagents/workflows/<wf>/`
2155    /// is a known control file, not an unrecognized transcript - it must NOT be
2156    /// flagged unsupported (it falls through to a benign Empty skip).
2157    #[test]
2158    fn workflow_journal_is_a_control_file_not_unsupported() {
2159        let adapter = ClaudeCodeAdapter::new("/tmp/pond-test-root");
2160        let journal = std::path::Path::new(
2161            "/root/-proj/55555555-5555-5555-5555-555555555555/subagents/workflows/wf_030e6487-da6/journal.jsonl",
2162        );
2163        assert!(is_workflow_control_file(journal));
2164        assert!(
2165            adapter.unsupported_reason(journal).is_none(),
2166            "journal.jsonl is a known control file, not an unsupported layout",
2167        );
2168    }
2169
2170    /// Regression guard against narrowing the net too far: a genuinely unknown
2171    /// leaf under `subagents/` is still flagged unsupported, while a recognized
2172    /// `agent-<hash>.jsonl` is not.
2173    #[test]
2174    fn unknown_subagents_leaf_is_still_unsupported() {
2175        let adapter = ClaudeCodeAdapter::new("/tmp/pond-test-root");
2176        let unknown = std::path::Path::new(
2177            "/root/-proj/PARENT/subagents/workflows/wf_x/transcript-001.jsonl",
2178        );
2179        assert!(
2180            adapter.unsupported_reason(unknown).is_some(),
2181            "an unrecognized non-agent, non-journal leaf must still fail visibly",
2182        );
2183        assert!(!is_workflow_control_file(unknown));
2184
2185        let agent = std::path::Path::new("/root/-proj/PARENT/subagents/agent-abc123def456.jsonl");
2186        assert!(
2187            adapter.unsupported_reason(agent).is_none(),
2188            "a recognized agent transcript is resolvable, not unsupported",
2189        );
2190    }
2191
2192    /// End-to-end: a workflow dir holding both a real `agent-<hash>.jsonl`
2193    /// transcript and the runner's `journal.jsonl`. The agent transcript
2194    /// ingests as a child session; the journal is a benign skip (no
2195    /// `skipped_files` failure) and its rows never merge into the parent.
2196    #[tokio::test(flavor = "multi_thread")]
2197    async fn workflow_journal_skipped_benignly_while_sibling_agent_ingests() -> anyhow::Result<()> {
2198        let corpus = TempDir::new()?;
2199        let project_dir = corpus.path().join("-tmp-pond-test");
2200        let parent_uuid = "77777777-7777-7777-7777-777777777777";
2201        let wf_id = "wf_030e6487-da6";
2202        let agent_hash = "a38f4724ef3864da8";
2203        let wf_dir = project_dir
2204            .join(parent_uuid)
2205            .join("subagents")
2206            .join("workflows")
2207            .join(wf_id);
2208        std::fs::create_dir_all(&wf_dir)?;
2209
2210        let parent_row = serde_json::json!({
2211            "type": "user",
2212            "uuid": "u-parent-1",
2213            "sessionId": parent_uuid,
2214            "cwd": "/tmp/pond-test",
2215            "timestamp": "2026-06-04T00:00:00.000Z",
2216            "message": {"role": "user", "content": "hi parent"},
2217        });
2218        std::fs::write(
2219            project_dir.join(format!("{parent_uuid}.jsonl")),
2220            format!("{parent_row}\n"),
2221        )?;
2222
2223        let agent_row = serde_json::json!({
2224            "type": "user",
2225            "uuid": "u-agent-1",
2226            "sessionId": parent_uuid,
2227            "cwd": "/tmp/pond-test",
2228            "timestamp": "2026-06-04T00:01:00.000Z",
2229            "message": {"role": "user", "content": "workflow agent prompt"},
2230        });
2231        std::fs::write(
2232            wf_dir.join(format!("agent-{agent_hash}.jsonl")),
2233            format!("{agent_row}\n"),
2234        )?;
2235
2236        // The Workflow journal: control events only, no sessionId.
2237        std::fs::write(
2238            wf_dir.join("journal.jsonl"),
2239            "{\"type\":\"started\",\"key\":\"v2:abc\",\"agentId\":\"a38f\"}\n\
2240             {\"type\":\"result\",\"key\":\"v2:abc\",\"agentId\":\"a38f\",\"result\":{}}\n",
2241        )?;
2242
2243        let store_dir = TempDir::new()?;
2244        let store = Store::open_local(store_dir.path()).await?;
2245        let adapter = ClaudeCodeAdapter::new(corpus.path());
2246        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
2247        assert_eq!(
2248            summary.skipped_files, 0,
2249            "journal.jsonl is a control file (benign Empty skip), not an unsupported failure",
2250        );
2251
2252        let child = store
2253            .get_session(&format!(
2254                "{parent_uuid}/workflows/{wf_id}/agent-{agent_hash}"
2255            ))
2256            .await?
2257            .expect("the sibling agent transcript still ingests as a child session");
2258        assert_eq!(
2259            child.session.parent_session_id.as_deref(),
2260            Some(parent_uuid)
2261        );
2262
2263        let parent = store
2264            .get_session(parent_uuid)
2265            .await?
2266            .expect("parent session ingests");
2267        assert_eq!(
2268            parent.messages.len(),
2269            1,
2270            "journal rows must NOT merge into the parent session",
2271        );
2272        Ok(())
2273    }
2274
2275    /// Hardening: even a journal.jsonl whose rows DO carry the parent
2276    /// `sessionId` must not merge - the guard is structural, not contingent on
2277    /// the journal lacking one.
2278    #[tokio::test(flavor = "multi_thread")]
2279    async fn workflow_journal_with_parent_sessionid_still_not_merged() -> anyhow::Result<()> {
2280        let corpus = TempDir::new()?;
2281        let project_dir = corpus.path().join("-tmp-pond-test");
2282        let parent_uuid = "88888888-8888-8888-8888-888888888888";
2283        let wf_dir = project_dir
2284            .join(parent_uuid)
2285            .join("subagents")
2286            .join("workflows")
2287            .join("wf_abc01234-def");
2288        std::fs::create_dir_all(&wf_dir)?;
2289
2290        let parent_row = serde_json::json!({
2291            "type": "user",
2292            "uuid": "u-parent",
2293            "sessionId": parent_uuid,
2294            "cwd": "/tmp/pond-test",
2295            "timestamp": "2026-06-04T00:00:00.000Z",
2296            "message": {"role": "user", "content": "parent only"},
2297        });
2298        std::fs::write(
2299            project_dir.join(format!("{parent_uuid}.jsonl")),
2300            format!("{parent_row}\n"),
2301        )?;
2302
2303        // A journal carrying the PARENT sessionId (hypothetical future shape):
2304        // the structural guard must still refuse to merge it.
2305        let journal_row = serde_json::json!({
2306            "type": "started",
2307            "key": "v2:abc",
2308            "agentId": "a1",
2309            "sessionId": parent_uuid,
2310            "message": {"role": "user", "content": "must not merge"},
2311        });
2312        std::fs::write(wf_dir.join("journal.jsonl"), format!("{journal_row}\n"))?;
2313
2314        let store_dir = TempDir::new()?;
2315        let store = Store::open_local(store_dir.path()).await?;
2316        let adapter = ClaudeCodeAdapter::new(corpus.path());
2317        let summary = ingest_adapter(&store, &adapter, &crate::adapter::NoopOracle, |_| {}).await?;
2318        assert_eq!(
2319            summary.skipped_files, 0,
2320            "journal is a benign Empty skip, not an unsupported failure",
2321        );
2322        let parent = store
2323            .get_session(parent_uuid)
2324            .await?
2325            .expect("parent session ingests");
2326        assert_eq!(
2327            parent.messages.len(),
2328            1,
2329            "journal row must NOT merge even when it carries the parent sessionId",
2330        );
2331        Ok(())
2332    }
2333}